My task is to build a program that first scraps songs from billboard.com, and then uses the Spotify API to search for these songs and create a playlist.
The scrapping part and seach parts are working. The problem arises when I try to create a playlist.
Every time the code for create_playlist runs I get a 500 error. I am not sure if the error comes from calling (in spotify.py):
requests.get('http://localhost:5000/createplaylist', params=params)
or (in spotify_service.py)
requests.post(url, headers=headers, params=params)
I don't know if the error it's from my flask server or from Spotify or if there is a mistake in the code.
I was hoping someone could spot the problem.
Here are the files (I've also attached them just in case)
main.py
from data_scrapper import DataScrapper
from spotify import Spotify
BILLBOARD_URL = 'https://www.billboard.com/charts/hot-100/'
class Application:
def __init__(self):
# self.data_scrapper = DataScrapper(BILLBOARD_URL)
# self.user_input = input("Enter a date for the top 100 (YYYY-MM-DD)")
# self.song_list = self.data_scrapper.scrap_songs('1992-08-12')
self.spotify = Spotify()
self.spotify.create_playlist('1992', 'Collection of the Greatest hits of 1992', False)
# self.spotify.get_playlist('2QUpzT3uDaRXnhzcPqLF7F')
# self.spotify.search_song()
app = Application()
spotify.py
import base64
import requests
import json
class Spotify:
def __init__(self):
pass
def create_playlist(self, year, description, public):
name = f'Greatest hits of {year}'
params = {
'name': name,
'public': public,
'description': description
}
response = requests.get('http://localhost:5000/createplaylist', params=params)
response.raise_for_status()
print(response.json())
def get_playlist(self, playlist_id):
params = {
'playlist_id': playlist_id
}
response = requests.get('http://localhost:5000/getplaylist', params=params)
print(response.json())
return response.json()
def search_song(self):
params = {
'song': 'Set fire to the rain'
}
response = requests.get('http://localhost:5000/searchsong', params=params)
print(json.loads(response.text)['tracks']['items'][0]['uri'])
return json.loads(response.text)['tracks']['items'][0]['uri']
spotify_service.py
from flask import Flask, request, redirect
from requests_oauthlib import OAuth2Session
from requests.auth import HTTPBasicAuth
import os
import requests
import json
app = Flask(__name__)
AUTH_URL = 'https://accounts.spotify.com/authorize'
TOKEN_URL = 'https://accounts.spotify.com/api/token'
SEARCH_URL = 'https://api.spotify.com/v1/search'
REDIRECT_URI = 'http://localhost:5000/callback'
CLIENT_ID = xxx
CLIENT_SECRET = xxx
USER_ID = xxx
SCOPE = [
'user-read-currently-playing',
'playlist-modify-private'
]
@app.route("/login")
def login():
spotify = OAuth2Session(client_id=CLIENT_ID, scope=SCOPE[1], redirect_uri=REDIRECT_URI)
authorization_url, state = spotify.authorization_url(AUTH_URL)
return redirect(authorization_url)
@app.route("/callback", methods=['GET'])
def callback():
code = request.args.get('code')
response = requests.post(TOKEN_URL,
auth=HTTPBasicAuth(CLIENT_ID, CLIENT_SECRET),
data={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI
})
os.environ['ACCESS_TOKEN'] = response.json()['access_token']
return "Logged in"
@app.route('/getplaylist', methods=['GET'])
def getplaylist():
playlist_id = request.args.get('playlist_id')
headers = {
'Authorization': "Bearer {}".format(os.environ['ACCESS_TOKEN'])
}
url = f'https://api.spotify.com/v1/playlists/{playlist_id}'
response = requests.get(url, headers=headers)
results = response.json()
songs = []
for item in results['tracks']['items']:
if item is not None and item['track']['artists'][0] is not None:
songs.append(item['track']['artists'][0]['name'] + " : " + item['track']['name'])
return json.dumps(songs)
@app.route("/createplaylist", methods=['GET'])
def createplaylist():
url = 'https://api.spotify.com/v1/users/{}/playlists'.format(USER_ID)
headers = {
'Authorization': "Bearer {}".format(os.environ['ACCESS_TOKEN']),
'Content-Type': 'application/json'
}
params = {
'name': request.args.get('name'),
'public': request.args.get('public'),
'description': request.args.get('description')
}
response = requests.post(url, headers=headers, params=params)
response.raise_for_status()
return response
@app.route("/searchsong", methods=['GET'])
def searchsong():
song = request.args.get('song')
headers = {
'Authorization': "Bearer {}".format(os.environ['ACCESS_TOKEN'])
}
params = {
'q': song,
'type': 'track',
'market': 'ES',
'limit': 1
}
response = requests.get(SEARCH_URL, headers=headers, params=params)
response.raise_for_status()
return response.text
if __name__ == '__main__':
app.run(port=5000, debug=True)
Thank you in advance -