firebase / firebase-admin-python

Firebase Admin Python SDK
https://firebase.google.com/docs/admin/setup
Apache License 2.0
1.03k stars 320 forks source link

Firebase db listener cannot handle with internet disconnetions #287

Open jeffersonkr opened 5 years ago

jeffersonkr commented 5 years ago

[READ] Step 1: Are you in the right place?

[REQUIRED] Step 2: Describe your environment

[REQUIRED] Step 3: Describe the problem

db.reference().listen() hang all code, do not run the rest of code, if when during connection of listener get internet disconnected, or if streamming allready connected and get disconnected for more than 2~3min and reconnect, do not get any updates from firebase. i was searching for the root of this problem, and realised SSEclient have timeout default set by 3000, and the Event class retry time is set by None. Im not sure about this but maybe this is the problem?

Steps to reproduce:

To simulate the error we can just pull the cable or simply disconnect the internet from the system, wait 2 minutes to 3 minutes to reconnect it, after the reconnection, update any end-point that the streamming is listening. We can notice that we will not receive any updates.

It is also possible to simulate it during the streaming connection opening disconnect the internet and wait 2 to 3 minutes again and reconnect. We will not receive any updates from the firebase, and it will also be noticed that when this occurs, the codes below the opening stream do not run even wait for long time.

Relevant Code:

class SSEClient(object): '''SSE client implementation.'''

def __init__(self, url, session, retry=3000, **kwargs):
    """Initializes the SSEClient.
    Args:
      url: The remote url to connect to.
      session: The requests session.
      retry: The retry interval in milliseconds (optional).
      **kwargs: Extra kwargs that will be sent to ``requests.get()`` (optional).
    """

class Event(object): '''Event represents the events fired by SSE.'''

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

def __init__(self, data='', event_type='message', event_id=None, retry=None):
    self.data = data
    self.event_type = event_type
    self.event_id = event_id
    self.retry = retry

// TODO(you): code here to reproduce the problem

import firebase_admin
from firebase_admin import db
from firebase_admin import credentials

# Init Firebase.
dir_path = os.path.dirname(os.path.realpath(__file__))
cred = credentials.Certificate(dir_path + '/serviceAccountKey.json')
firebase_admin.initialize_app(cred, {
    'databaseURL': DB_URL
})

try:
    streaming = db.reference('path').listen(callback)
except ApiCallError:
    timer = 0

# did this code above to handle with disconnections.
timer = 100
while True:
    if internet_on():
        if timer == 0:
            try:
                streaming.close()
           except:
                #we do not have streaming connections open.
                pass
            try:
                streaming = db.reference('path').listen(callback)
                timer = 100
            except ApiCallError:
                timer = 0
    else:
        time.sleep(1)
        timer -= 1 if timer > 0 else 0
google-oss-bot commented 5 years ago

I couldn't figure out how to label this issue, so I've labeled it for a human to triage. Hang tight.

jeffersonkr commented 5 years ago

Hi, currently trying to solve the problem, I realized that was missing the timeout option in the instantiation of the SSEClient class, and figured out the requests.get() inside SSEClient._connect() has no timeout and that why i get stuck with my code when my internet is disconnected. To avoid these types of problems when internet disconnect on requests.get() method, probrably because is an streamer its no have timeout but when there is no connections its cause stuck in code, we should pass timeout as options, timeout will close a perfecly working stream threads too.. but anyway we already open new connections when token expires in 1 hour, so we should set default timeout to 3600?

db.py:

def _listen_with_session(self, callback, session, timeout=3600):
    url = self._client.base_url + self._add_suffix()
    try:
        sse = _sseclient.SSEClient(url, session, {'timeout': timeout})
        return ListenerRegistration(callback, sse)
    except requests.exceptions.RequestException as error:
        raise ApiCallError(_Client.extract_error_message(error), error)
_ssclient.py:

def _connect(self):
    """Connects to the server using requests."""
    if self.should_connect:
        if self.last_id:
            self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
            self.resp = self.session.get(self.url, stream=True, **self.requests_kwargs)
            self.resp_iterator = self.resp.iter_content(decode_unicode=True)
            self.resp.raise_for_status()
    else:
        raise StopIteration()
hiranya911 commented 5 years ago

I think the first thing to try would be to pass the httpTimeout option to the request session started by the SSE client. @jeffersonkr perhaps you can test this and provide a pull request?

jeffersonkr commented 5 years ago

saying httpTimeout do you mean initializate_app ** kwargs? or requests.Session timeout options. because better I can do is to create a new parameter for the .listen (callback, timeout) method and pass this parameter to request.Session, but it will end up causing a streamer to close after a timeout time. I was trying to trace where httpTimeout of initialize_app goes to get it, but I could not. Im new programmer so it's hard for me to know how this great library works.. Maybe with your help i could do it.

hiranya911 commented 5 years ago

httpTimeout is an option that developers can specify when calling initialize_app(). We ought to somehow capture this value (when specified by the developer), and pass it into KeepAuthSession.

Check these links to starts with:

https://github.com/firebase/firebase-admin-python/blob/4488f5367c78bba66dd6d7a4147a5e9cf0ad84d0/firebase_admin/__init__.py#L52-L55

https://github.com/firebase/firebase-admin-python/blob/4488f5367c78bba66dd6d7a4147a5e9cf0ad84d0/firebase_admin/db.py#L361

https://github.com/firebase/firebase-admin-python/blob/4488f5367c78bba66dd6d7a4147a5e9cf0ad84d0/firebase_admin/db.py#L783

hiranya911 commented 5 years ago

You're setting a refresh_timeout, which is not the right field to set as far as I can tell. You need to set a connect or read timeout, but it appears the parent class AuthorizedSession does not really allow this. You will have to pass the value further down the stack, and pass it to the session.get() call made by SSEClient. The timeout parameter mentioned in this doc is what you need to set: https://2.python-requests.org/en/master/api/#requests.request

jeffersonkr commented 5 years ago

yes i realized what i did and then i removed comment above, for now i am passing httptimeout to session.get() method, i dont know why if i set httptimeout as 10sec it throws timeout as expected but if i set 100sec it do not give a timeout

def _connect(self):
        """Connects to the server using requests."""
        if self.should_connect:
            if self.last_id:
                self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
            self.resp = self.session.get(self.url, stream=True, timeout=self.session.http_timeout, **self.requests_kwargs)
            self.resp_iterator = self.resp.iter_content(decode_unicode=True)
            self.resp.raise_for_status()
        else:
            raise StopIteration()
hiranya911 commented 5 years ago

self.session.http_timeout doesn't look correct. Session in this case is an AuthorizedSession from google.auth, and I doubt they have a timeout attribute.

jeffersonkr commented 5 years ago

Here super(AuthorizedSession, self).request() this request method have a timeout parameter. which .request() came from requests lib

def request(self, method, url, data=None, headers=None, **kwargs):
        """Implementation of Requests' request."""
        # pylint: disable=arguments-differ
        # Requests has a ton of arguments to request, but only two
        # (method, url) are required. We pass through all of the other
        # arguments to super, so no need to exhaustively list them here.

        # Use a kwarg for this instead of an attribute to maintain
        # thread-safety.
        _credential_refresh_attempt = kwargs.pop(
            '_credential_refresh_attempt', 0)

        # Make a copy of the headers. They will be modified by the credentials
        # and we want to pass the original headers if we recurse.
        request_headers = headers.copy() if headers is not None else {}

        self.credentials.before_request(
            self._auth_request, method, url, request_headers)

        response = super(AuthorizedSession, self).request(
            method, url, data=data, headers=request_headers, **kwargs)

and AuthorizedSession inherit from requests.Session, then i belive there is a timeout .

class AuthorizedSession(requests.Session):
    """A Requests Session class with credentials.

    This class is used to perform requests to API endpoints that require
    authorization::

        from google.auth.transport.requests import AuthorizedSession

        authed_session = AuthorizedSession(credentials)

        response = authed_session.request(
            'GET', 'https://www.googleapis.com/storage/v1/b')

    The underlying :meth:`request` implementation handles adding the
    credentials' headers to the request and refreshing credentials as needed.
# request.sessions.py
def request(self, method, url,
            params=None, data=None, headers=None, cookies=None, files=None,
            auth=None, timeout=None, allow_redirects=True, proxies=None,
            hooks=None, stream=None, verify=None, cert=None, json=None):
        """Constructs a :class:`Request <Request>`, prepares it and sends it.
        Returns :class:`Response <Response>` object.

        :param method: method for the new :class:`Request` object.
        :param url: URL for the new :class:`Request` object.
        :param params: (optional) Dictionary or bytes to be sent in the query
            string for the :class:`Request`.
        :param data: (optional) Dictionary, list of tuples, bytes, or file-like
            object to send in the body of the :class:`Request`.
        :param json: (optional) json to send in the body of the
            :class:`Request`.
        :param headers: (optional) Dictionary of HTTP Headers to send with the
            :class:`Request`.
        :param cookies: (optional) Dict or CookieJar object to send with the
            :class:`Request`.
        :param files: (optional) Dictionary of ``'filename': file-like-objects``
            for multipart encoding upload.
        :param auth: (optional) Auth tuple or callable to enable
            Basic/Digest/Custom HTTP Auth.
        :param timeout: (optional) How long to wait for the server to send
            data before giving up, as a float, or a :ref:`(connect timeout,
            read timeout) <timeouts>` tuple.
        :type timeout: float or tuple
        :param allow_redirects: (optional) Set to True by default.
        :type allow_redirects: bool
        :param proxies: (optional) Dictionary mapping protocol or protocol and
            hostname to the URL of the proxy.
        :param stream: (optional) whether to immediately download the response
            content. Defaults to ``False``.
        :param verify: (optional) Either a boolean, in which case it controls whether we verify
            the server's TLS certificate, or a string, in which case it must be a path
            to a CA bundle to use. Defaults to ``True``.
        :param cert: (optional) if String, path to ssl client cert file (.pem).
            If Tuple, ('cert', 'key') pair.
        :rtype: requests.Response
        """

and i testing now i think this really works, when there is no connections this throw exceptions like NewConnectionError, ConnectionError, ReadTimeoutError , etc in time of timeout parameter, and not preventing from rest of the code execute.

Im not sure about this.. but still working, and i would like you guide me in making test for those implementation, of course if you accept it is correctly implementation.

hiranya911 commented 5 years ago

The request() method indeed takes a timeout argument. But there's no attribute named http_timeout on Session. So my previous comment about the following line should still stand:

self.resp = self.session.get(self.url, stream=True, timeout=self.session.http_timeout, **self.requests_kwargs)
jeffersonkr commented 5 years ago
session = _sseclient.KeepAuthSession(self._client.credential, self._client.timeout)
return self._listen_with_session(callback, session)
class KeepAuthSession(transport.requests.AuthorizedSession):
    """A session that does not drop authentication on redirects between domains."""

    def __init__(self, credential, http_timeout):
        super(KeepAuthSession, self).__init__(credential)
            self.http_timeout = http_timeout

I cant just pass throw KeepAuthSession ?

skywritergr commented 4 years ago

I've been having a similar (?) issue and I was wondering if this is related. I have an a Python app in Google Cloud Run. That container is sometimes stopped (if not enough usage) I guess in a way that could classify as a disconnection of sorts.

I am instantiating the firebase connection like so:

from firebase_admin import firestore, initialize_app, credentials

cred = credentials.ApplicationDefault()
initialize_app(cred, {
  'projectId': 'my-project-id-1234',
})

db = firestore.client()

def function_A:
 ...
 data = db.collection(u'data').where(u'title', u'==', name).stream()
 ...

def function_B:
 ....

The weird thing is that my application is working fine for some time and then randomly starts failing. After catching the exception i see the following:

Exception: 401 Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.

It looks very similar to what is being talked here. Is there a solution to this?

hiranya911 commented 4 years ago

@skywritergr you're using Firestore and therefore is unrelated to this issue. I'd suggest reporting directly at https://github.com/googleapis/python-firestore