spotipy-dev / spotipy

A light weight Python library for the Spotify Web API
http://spotipy.readthedocs.org
MIT License
5.03k stars 957 forks source link

Backend terminal shows: " Enter the URL you were redirected to: " #1125

Closed wassay13 closed 5 months ago

wassay13 commented 5 months ago

So I have build backend and frontend sapreatly and test them on localhost in which everything works perfectly fine, and because I'm using FastAPI for backend so I just build CacheHandler for it too:

from fastapi import Request
class FastAPISessionCacheHandler(CacheHandler):
    """
    A cache handler that stores the token info in the session framework
    provided by FastAPI.
    """

    def __init__(self, request: Request):
        """
        Parameters:
            * request: Request object provided by FastAPI for every
            incoming request
        """
        self.request = request

    def get_cached_token(self):
        token_info = None
        try:
            token_info = self.request.session['token_info']
        except KeyError:
            logger.debug("Token not found in the session")

        return token_info

    def save_token_to_cache(self, token_info):
        try:
            self.request.session['token_info'] = token_info
        except Exception as e:
            logger.warning("Error saving token to cache: " + str(e))

But as soon as I deployed both on DigitalOcean VM I fall into lot of errors. (please look at return statement in root endpoint)

@app.get("/")
def index(sp_oauth: SpotifyOAuth = Depends(get_spotify_oauth)):
    if not sp_oauth.validate_token(sp_oauth.cache_handler.get_cached_token()):
        auth_url = sp_oauth.get_authorize_url()
        return RedirectResponse(auth_url) **// I were using this in my localhost.**
        return {"url": auth_url} **// And this in vm because RedirectResponse is not possible as per CORS policy, ss attached below.**
    return RedirectResponse(url='/user-info')

@app.get("/callback")
def callback(request: Request, sp_oauth: SpotifyOAuth = Depends(get_spotify_oauth)):
    sp_oauth.get_access_token(request.query_params['code'])
    return RedirectResponse(url='/user-info')

TERMINAL:
(venv) say@ubuntu-xyz:~/app/src$ uvicorn --host 0.0.0.0 main:app
INFO:     137.59.220.8:12607 - "GET /spotify/ HTTP/1.1" 200 OK
Spotify ID: fm9z9syahyd............
INFO:     137.59.220.8:12614 - "GET /spotify/callback?code=AQBoiQiqufrNiKvstckYhncSoyA4DHA HTTP/1.1" 307 Temporary Redirect
Enter the URL you were redirected to: 

image Till yet I've conclude two major scenarios: first that I cannot let user to authorize from my backend IP directly because of CORS. And second is that if I send auth_url and let frontend open it in new window then in my backend I'll get interactive terminal to enter url manually on behalf of them (what on earth?) also all the endpoints will get freeze till I don't stop server or put anything else.

Last Option:

Now I dig into Spotipy library to find out why I'm getting interactive terminal and find this:

class SpotifyOAuth(SpotifyAuthBase):
    def get_access_token(self, code=None, as_dict=True, check_cache=True):
        payload = {
            "redirect_uri": self.redirect_uri,
            "code": code or self.get_auth_response(), #Excatly here, get_auth_response is responsible for: " Enter the URL you were redirected to: "
            "grant_type": "authorization_code",
        }

So I thought why not just give the code manually and get rid of "or self.get_auth_response()" right? NO! But I tried:

def get_cache_handler(request: Request):
    return FastAPISessionCacheHandler(request=request)

def get_spotify_oauth(cache_handler: FastAPISessionCacheHandler = Depends(get_cache_handler)):
    sp_oauth = SpotifyOAuth(
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uri,
        scope=scope,
        cache_handler=cache_handler,
        show_dialog=False,
        open_browser=True
    )
    # sp_oauth.cache_handler.save_token_to_cache(token_info)
    return sp_oauth

def get_spotify_one(code: str, sp_oauth: SpotifyOAuth = Depends(get_spotify_oauth)):
    sp = Spotify(auth_manager=sp_oauth, auth=code)
    return sp

spotify_router = APIRouter(prefix="/spotify")

@spotify_router.get("/")
async def index(sp_oauth: SpotifyOAuth = Depends(get_spotify_oauth), sp: Spotify = Depends(get_spotify)):
    if not sp_oauth.validate_token(sp_oauth.cache_handler.get_cached_token()):
        auth_url = sp_oauth.get_authorize_url()
        return {"url": auth_url}

    current_user = sp.current_user()
    user_id = current_user['id']

    return RedirectResponse(url=f"http://161.zzz.z..z1.120?spotify_id={user_id}")

@spotify_router.get("/callback") #response_model=UserSpotifyID)
async def callback(request: Request, sp_oauth: SpotifyOAuth = Depends(get_spotify_oauth)):
    code = request.query_params.get('code')
    if not code:
        return {"error": "Authorization code not found"}

    token_info = sp_oauth.get_access_token(code)
    user_data = sp.current_user()
    user_info = User(display_name=user_data['display_name'], id=user_data['id'], email=user_data['email'])

    return RedirectResponse(url=f"http://161.zzz.z..z120?spotify_id={user_info.id}")

This approach solve " Enter the URL you were redirected to: " issue but who knows what coming..

Then from my frontend I call another endpoint:

  const handleLogin = () => {
    setLoading(true);
    fetch('http://143.zzz.z..z:8000/spotify/', { credentials: 'include' })
      .then(response => response.json())
      .then(data => {
        const authWindow = window.open(data.url, '_blank');
        if (authWindow) {
          authWindow.onload = function() {
            const checkUrlInterval = setInterval(() => {
              try {
                if (authWindow.location.href.includes('http://161.zzzzz120')) {
                  const url = new URL(authWindow.location.href);
                  const spotifyId = url.searchParams.get('spotify_id');
                  if (spotifyId) {
                    // Set the Spotify ID as a cookie
                    document.cookie = `spotify_id=${spotifyId}; path=/`;

                    clearInterval(checkUrlInterval);
                    authWindow.close();
                    console.log('Spotify ID:', spotifyId);

                    fetch(`http://143.xxxxx:8000/user-info/${spotifyId}`, { 
                      credentials: 'include'
                    })
                      .then(response => response.json())
                      .then(data => {
                        localStorage.setItem('userInfo', JSON.stringify(data)); // Store the user info in local storage
                        window.location.href = '/dashboard'; // Redirect to dashboard
                      })
                      .catch(error => {
                        console.error('Error fetching user info:', error);
                        setLoading(false);
                      });
                  }
                }
              } catch (e) {
                // Ignore errors related to cross-origin requests
              }
            }, 1000);
          };
        } else {
          console.error('Failed to open auth window');
          setLoading(false);
        }
      })
      .catch(error => {
        console.error('Error fetching Spotify auth URL:', error);
        setLoading(false);
      });
  };

Then the endpoint I call:

def get_spotify_access_token(request, sp_oauth: SpotifyOAuth = Depends(get_spotify)):

    token_info = get_cache_handler(request).get_cached_token()
    if not token_info:
        print("Token not found: ", token_info)
        return {"error": "Token not found"}
    print("Token found: ", token_info)

    # access_token = token_info["access_token"]
    valid_token = sp_oauth.validate_token(token_info)

    return valid_token

@app.get("/user-info/{spotify_id}")
async def user_info(spotify_id: str, db: Session = Depends(get_db), sp: Spotify = Depends(get_spotify)):
    # Query the database for the user with the given spotify_id
    user = db.query(UserDB).filter(UserDB.spotify_id == spotify_id).first()

    # If the user was not found, return a 404 error
    if user is None:
        raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="User not found")

    access_token = get_spotify_access_token

    # Store the user data and followed artists in the database
    store_user_data(db, user, access_token, sp)

    # Get the user's display_name before closing the session
    display_name = user.display_name
    image_url = user.image_url

    # Generate a greeting message with the user's display_name
    ai_res = await generate_greeting(display_name)

    return {"response": ai_res, "image_url": image_url}

Now till getting back access token everything seems working perfectly fine but when I call "store_user_data(db, user, access_token, sp)" this function, my code gets broke, ss attached below :( image image

This is the problematic function which works perfectly fine on localhost but since I changed code to give it manually I don't even know what's happening under the hood:

def store_user_data(db, user: UserDB, access_token: str, sp: Spotify = Depends(get_spotify)):
    # Create a Spotify API client
    # sp = Spotify(auth_manager=auth_manager)

    # Initialize the 'after' parameter for pagination
    after = None

    while True:
        # Call the Spotify API to get the followed artists
        response = sp.current_user_followed_artists(limit=50, after=after)

After digging more I get to know I broke hell lot of things (ig), and need to rebuild and adjust this library for my use case which I cannot as I'm still relatively new into coding and don't understand much but it'll be highly appreciated if someone guide me what to do because this is my first project which I started building on my own and not clone from some youtube video. And there is high chances that I feel I'm missing something very basic which cause me this issue (coding solely with help of AI made me feel like this). PS: And yeah I'm not giving the access token in "response = sp.current_user_followed_artists(limit=50, after=after)" because I don't understand where exactly it'll need to go, maybe in "self._get"?

    def current_user_followed_artists(self, limit=20, after=None):
        """ Gets a list of the artists followed by the current authorized user

            Parameters:
                - limit - the number of artists to return
                - after - the last artist ID retrieved from the previous
                          request

        """
        return self._get(
            "me/following", type="artist", limit=limit, after=after
        )
    def _get(self, url, args=None, payload=None, **kwargs):
        if args:
            kwargs.update(args)

        return self._internal_call("GET", url, payload, kwargs)
    def _internal_call(self, method, url, payload, params):
        args = dict(params=params)
        if not url.startswith("http"):
            url = self.prefix + url
        headers = self._auth_headers()

        if "content_type" in args["params"]:
            headers["Content-Type"] = args["params"]["content_type"]
            del args["params"]["content_type"]
            if payload:
                args["data"] = payload
        else:
            headers["Content-Type"] = "application/json"
            if payload:
                args["data"] = json.dumps(payload)

        if self.language is not None:
            headers["Accept-Language"] = self.language

        logger.debug('Sending %s to %s with Params: %s Headers: %s and Body: %r ',
                     method, url, args.get("params"), headers, args.get('data'))

        try:
            response = self._session.request(
                method, url, headers=headers, proxies=self.proxies,
                timeout=self.requests_timeout, **args
            )

            response.raise_for_status()
            results = response.json()
        except requests.exceptions.HTTPError as http_error:
            response = http_error.response
            try:
                json_response = response.json()
                error = json_response.get("error", {})
                msg = error.get("message")
                reason = error.get("reason")
            except ValueError:
                # if the response cannot be decoded into JSON (which raises a ValueError),
                # then try to decode it into text

                # if we receive an empty string (which is falsy), then replace it with `None`
                msg = response.text or None
                reason = None

            logger.error(
                'HTTP Error for %s to %s with Params: %s returned %s due to %s',
                method, url, args.get("params"), response.status_code, msg
            )

            raise SpotifyException(
                response.status_code,
                -1,
                f"{response.url}:\n {msg}",
                reason=reason,
                headers=response.headers,
            )
        except requests.exceptions.RetryError as retry_error:
            request = retry_error.request
            logger.error('Max Retries reached')
            try:
                reason = retry_error.args[0].reason
            except (IndexError, AttributeError):
                reason = None
            raise SpotifyException(
                429,
                -1,
                f"{request.path_url}:\n Max Retries",
                reason=reason
            )
        except ValueError:
            results = None

        logger.debug('RESULTS: %s', results)
        return results
    def _auth_headers(self):
        if self._auth:
            return {"Authorization": f"Bearer {self._auth}"}
        if not self.auth_manager:
            return {}
        try:
            token = self.auth_manager.get_access_token(as_dict=False)
        except TypeError:
            token = self.auth_manager.get_access_token()
        return {"Authorization": f"Bearer {token}"}
    @property
    def auth_manager(self):
        return self._auth_manager

    @auth_manager.setter
    def auth_manager(self, auth_manager):
        if auth_manager is not None:
            self._auth_manager = auth_manager
        else:
            self._auth_manager = (
                self.client_credentials_manager or self.oauth_manager
            )
    def get_access_token(self, code=None, as_dict=True, check_cache=True):
        """ Gets the access token for the app given the code

            Parameters:
                - code - the response code
                - as_dict - a boolean indicating if returning the access token
                            as a token_info dictionary, otherwise it will be returned
                            as a string.
        """
        if as_dict:
            warnings.warn(
                "You're using 'as_dict = True'."
                "get_access_token will return the token string directly in future "
                "versions. Please adjust your code accordingly, or use "
                "get_cached_token instead.",
                DeprecationWarning,
                stacklevel=2,
            )
        if check_cache:
            token_info = self.validate_token(self.cache_handler.get_cached_token())
            if token_info is not None:
                if self.is_token_expired(token_info):
                    token_info = self.refresh_access_token(
                        token_info["refresh_token"]
                    )
                return token_info if as_dict else token_info["access_token"]

        payload = {
            "redirect_uri": self.redirect_uri,
            "code": code, # or self.get_auth_response(),
            "grant_type": "authorization_code",
        }
        if self.scope:
            payload["scope"] = self.scope
        if self.state:
            payload["state"] = self.state

        headers = self._make_authorization_headers()

        logger.debug(
            "sending POST request to %s with Headers: %s and Body: %r",
            self.OAUTH_TOKEN_URL, headers, payload
        )

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                verify=True,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            token_info = self._add_custom_values_to_token_info(token_info)
            self.cache_handler.save_token_to_cache(token_info)
            return token_info if as_dict else token_info["access_token"]
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)
dieser-niko commented 5 months ago

It looks like you're on the right track.

The CORS has nothing to do with Spotify and has to be handled by FastAPI instead. You can read more about this here: https://fastapi.tiangolo.com/tutorial/cors/

Then the authentication process. I'd like you to take a look at this app.py example which shows how to get the authorise URL and how a callback from Spotify can be handled. This is a pretty good example, even though it is implemented in Flask. But there's not much difference to FastAPI in this case.

Admittedly, it's a bit tight, because the login process and the callback are implemented in the same function (and therefore the same endpoint).

But it's still possible to understand it. I'll explain it to you. The function is divided into four parts.

First, the cache_handler and auth_manager are created. They are not passed to spotipy.Spotify yet, and this is important, mainly because of the auth manager, as we don't want to trigger the internal authentication process.

cache_handler = spotipy.cache_handler.FlaskSessionCacheHandler(session)
auth_manager = spotipy.oauth2.SpotifyOAuth(scope='user-read-currently-playing playlist-modify-private',
                                           cache_handler=cache_handler,
                                           show_dialog=True)

As for the FlaskSessionCacheHandler, I'm not sure if there is something similar for FastAPI. I've seen that you've created your own FastAPISessionCacheHandler, and honestly, it's probably fine to start with. You just have to remember that the user can easily extract the token. They shouldn't be able to do much damage as the token is tied to their account anyway.

Then, in the second part, it checks if the incoming request is from a callback function by checking if "code" is one of the arguments used. The value of this argument is then checked for authenticity. If all is well, the user is logged in and the page is refreshed by redirecting to the same page.

if request.args.get("code"):
    # Step 2. Being redirected from Spotify auth page
    auth_manager.get_access_token(request.args.get("code"))
    return redirect('/')

The third part is to check if the user isn't logged in. If that's the case, then an authorize url is generated and returned to the user as a link. Yes, I know the order of the parts is a bit confusing, but bear with me. There are also comments here and in part 2 to indicate that this step is the first to be run.

if not auth_manager.validate_token(cache_handler.get_cached_token()):
    # Step 1. Display sign in link when no token
    auth_url = auth_manager.get_authorize_url()
    return f'<h2><a href="{auth_url}">Sign in</a></h2>'

And then the last part, where we finally get our spotipy.Spotify object by passing our auth_manager to it. Since we should already be logged in when we reach this part, there should be no prompt asking us to enter a link.

# Step 3. Signed in, display data
spotify = spotipy.Spotify(auth_manager=auth_manager)
return f'<h2>Hi {spotify.me()["display_name"]}, ' \
       f'<small><a href="/sign_out">[sign out]<a/></small></h2>' \
       f'<a href="/playlists">my playlists</a> | ' \
       f'<a href="/currently_playing">currently playing</a> | ' \
       f'<a href="/current_user">me</a>'

I'd recommend running this code on your machine, but be sure to read the comment at the top of the file. It contains some useful information.

I hope I was able to help you.

wassay13 commented 5 months ago

Hi Dieser, I tried the the exact same approach you suggest but its not working, heres why: in get_access_token we have

payload = {
            "redirect_uri": self.redirect_uri,
            "code": code, or self.get_auth_response(),
            "grant_type": "authorization_code",
        }

So when user hit '/' endpoint to my backend (which is running on different server from frontend) for very first time, this function gets called: self.get_auth_response() and the nature of this function is to open new window in computer (server) orr interactive terminal by following the flow it calls: _get_auth_response_interactive in which it actually execute the logic.

    def get_auth_response(self, open_browser=None):
        logger.info('User authentication requires interaction with your '
                    'web browser. Once you enter your credentials and '
                    'give authorization, you will be redirected to '
                    'a url.  Paste that url you were directed to to '
                    'complete the authorization.')

        redirect_info = urlparse(self.redirect_uri)
        redirect_host, redirect_port = get_host_port(redirect_info.netloc)

        if open_browser is None:
            open_browser = self.open_browser

        if (
                open_browser
                and redirect_host in ("127.0.0.1", "localhost")
                and redirect_info.scheme == "http"
        ):
            # Only start a local http server if a port is specified
            if redirect_port:
                return self._get_auth_response_local_server(redirect_port)
            else:
                logger.warning('Using `%s` as redirect URI without a port. '
                               'Specify a port (e.g. `%s:8080`) to allow '
                               'automatic retrieval of authentication code '
                               'instead of having to copy and paste '
                               'the URL your browser is redirected to.',
                               redirect_host, redirect_host)

        return self._get_auth_response_interactive(open_browser=open_browser)

    def _get_auth_response_interactive(self, open_browser=False):
        if open_browser:
            self._open_auth_url()
            prompt = "Enter the URL you were redirected to: "
        else:
            url = self.get_authorize_url()
            prompt = (
                "Go to the following URL: {}\n"
                "Enter the URL you were redirected to: ".format(url)
            )
        response = self._get_user_input(prompt)
        state, code = SpotifyOAuth.parse_auth_response_url(response)
        if self.state is not None and self.state != state:
            raise SpotifyStateError(self.state, state)
        return code

___________________________________________________________________________________

TERMINAL:
(venv) say@ubuntu-xyz:~/app/src$ uvicorn --host 0.0.0.0 main:app

INFO:     137.59.220.8:12607 - "GET /spotify/ HTTP/1.1" 200 OK

INFO:     137.59.220.8:12614 - "GET /spotify/callback?code=AQBoiQiqufrNiKvstckYhncSoyA4DHA HTTP/1.1" 307 Temporary Redirect

Enter the URL you were redirected to: 
dieser-niko commented 5 months ago

My name is Niko actually, but that's all right.

I don't think that the relevant part is included in your snippet, so that's why I straight up built a little example with FastAPI. You said that you're using it as a Backend, so the auth url will be returned as a simple JSON and not as a 307 Temporary Redirect or similar. Check it out: https://github.com/dieser-niko/spotipy-fastapi-oauth

wassay13 commented 5 months ago

Hey hey hey, ty Niko, it works. Although I've tried it manually which also working perfectly fine but I'll update my code with this approach so I don't fall under unexpected error in future.

dieser-niko commented 5 months ago

Glad to hear that. If you want to, you can change to an authorization header instead of cookies. Probably the only thing I regret about my code :D