-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspotify.py
More file actions
228 lines (162 loc) · 7.04 KB
/
spotify.py
File metadata and controls
228 lines (162 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import config
from PIL import Image
import numpy as np
import csv
from tqdm import tqdm
import time
import os
import re
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
trackIDtoAlbumKey = {}
# credit to gpt for this function, had to look up how to do this
def buildSession():
session = requests.Session()
# Add headers
session.headers.update(config.HEADERS)
# Add retries
retry = Retry(
total = 3,
connect = 3,
read = 3,
backoff_factor = 0.8, #exponential backoff
status_forcelist=[429, 500, 502, 503, 504], #dont retry on 404 or 403
allowed_methods=["GET"], # only GET requests
raise_on_status=False,
)
# Add adaptors
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# make custom session for requests
SESSION = buildSession()
# Remove weird characters from filenames, removes spaces w/ underscores
def sanitizeFilename(name):
name = name.strip()
name = re.sub(r'[\\/*?:"<>|]', "_", name)
name = re.sub(r'\s+', '_', name)
return name
# Needed for pseudokey
def getFirstArtist(artistNames):
return artistNames.split(";")[0].strip()
# LATER TODO: remove local spotify songs
def preProcessCSV():
global trackIDtoAlbumKey
# Do some preprocessing on the CSV, ensure only unique album names with a set
# Edit the CSV directly to remove duplicate albums
uniqueAlbumNames = set()
# each string will be Album Name + First Artist Name, so it should be unique enough
with open(config.CSV_PATH, mode="r", encoding="utf-8-sig", newline="") as infile, \
open(config.TRIMMED_CSV_PATH, mode="w", encoding="utf-8", newline="") as outfile:
reader = csv.DictReader(infile)
writer = csv.DictWriter(outfile, fieldnames=config.SPOTIFY_CSV_HEADERS)
# Make sure that our output CSV has the right headers
writer.writeheader()
for row in reader:
trackURI = row[config.TRACK_URI_LABEL]
albumName = row[config.ALBUM_NAME_LABEL]
artistNames = row[config.FIRST_ARTIST_NAME_LABEL]
if not trackURI or not albumName or not artistNames:
print(f"\nWARNING!!!! Missing fields in row: {row}")
continue
# Create a unique key for each album based on Album Name + First Artist Name
firstArtist = getFirstArtist(artistNames)
key = f"{albumName}|{firstArtist}"
if key not in uniqueAlbumNames:
uniqueAlbumNames.add(key)
writer.writerow(row)
# fill in globals for later use
trackID = grabIDfromURI(row[config.TRACK_URI_LABEL])
trackIDtoAlbumKey[trackID] = sanitizeFilename(key)
def grabIDfromURI(uri):
return uri.split(":")[-1]
# consider "perceptual averaging" with squares and square roots?? for future?
def computeAverageRGB(imgPath):
img = Image.open(imgPath).convert("RGB")
arr = np.asarray(img, dtype=np.float32)
return arr.mean(axis=(0, 1)) # (R, G, B)
def getAllSongIDs():
songIds = set()
# Grab all unique track IDs from the CSV
with open(config.TRIMMED_CSV_PATH, mode="r", encoding="utf-8-sig", newline="") as file: #As a sig to remove the "BOM" at the start of the fieldnames
reader = csv.DictReader(file)
# print(reader.fieldnames)
if config.TRACK_URI_LABEL not in reader.fieldnames:
raise ValueError(f"CSV file must contain '{config.TRACK_URI_LABEL}' column")
for row in reader:
songIds.add(grabIDfromURI(row[config.TRACK_URI_LABEL]))
if config.DEBUG_FLAG:
print(f"\nFound {len(songIds)} songs with unique albums in {config.TRIMMED_CSV_PATH}\n")
return songIds
def getAlbumCoverURL(trackId):
url = f"https://open.spotify.com/track/{trackId}"
oembed = "https://open.spotify.com/oembed"
r = SESSION.get(oembed, params={"url": url}, timeout=20)
r.raise_for_status()
data = r.json()
albumKey = trackIDtoAlbumKey[trackId]
coverURL = data["thumbnail_url"]
return albumKey, coverURL
def getAllAlbumURLs(songIds):
albumCoverURLs = {} # album key -> cover url OR local path to avoid duplicates
for songID in tqdm(songIds, desc="Fetching album covers", unit="track"):
albumKey = trackIDtoAlbumKey[songID]
fPath = os.path.join(config.ALBUM_DATA_FOLDER, f"{albumKey}.jpg")
# HUGE OPTMIZATION: if we already have the album cover, skip the request (caching type)
if os.path.exists(fPath):
albumCoverURLs[albumKey] = fPath
continue
else:
# only sleep if we are making a request
time.sleep(config.SLEEP_TIME)
try:
albumKey, coverURLorPath = getAlbumCoverURL(songID)
albumCoverURLs[albumKey] = coverURLorPath
except Exception as e:
print(f"Failed for album {albumKey}: {e}")
if config.DEBUG_FLAG:
print(f"\nFound {len(albumCoverURLs)} unique albums from {len(songIds)} songs\n")
return albumCoverURLs
# use fully deterministic name so we dont have to download if already exists
def downloadAlbumCovers(urls):
os.makedirs(config.ALBUM_DATA_FOLDER, exist_ok=True)
for albumKey, url in tqdm(urls.items(), desc="Downloading album covers", unit="album"):
fPath = os.path.join(config.ALBUM_DATA_FOLDER, f"{albumKey}.jpg")
# HUGE TIME OPTIMIZATION
if os.path.exists(fPath):
continue
#assert(url is not a local path)
#rationale: if url is a local path, we would have continued above since fPath would exist
try:
r = requests.get(url, timeout=config.TIMEOUT_SECONDS)
r.raise_for_status()
with open(fPath, "wb") as f:
f.write(r.content)
time.sleep(config.SLEEP_TIME)
except Exception as e:
tqdm.write(f"Failed to download {url} for album {albumKey}: {e}")
def isNotSaturated(imgPath):
img = Image.open(imgPath).convert("HSV")
arr = np.asarray(img, dtype=np.float32)
avgSat = arr[:, :, 1].mean()
return (avgSat < config.SATURATION_THRESHOLD)
def returnAlbumData():
rawAlbumData = [] # id, brightness, path
activeAlbumKeys = set(trackIDtoAlbumKey.values())
for filename in (os.listdir(config.ALBUM_DATA_FOLDER)):
# This part makes sure we only use albums that are in our CSV, not all albums downloaded
albumKey = filename[:-4] # remove ".jpg"
if albumKey not in activeAlbumKeys:
continue
filepath = os.path.join(config.ALBUM_DATA_FOLDER, filename)
color = computeAverageRGB(filepath)
if config.GREYSCALE_FLAG:
isSat = isNotSaturated(filepath)
if not isSat:
continue
rawAlbumData.append((len(rawAlbumData), color, filepath))
if config.DEBUG_FLAG:
print("\nAlbum data has been processed:")
return rawAlbumData