alexwohlbruck / covalent

Internet connected lamps! For my senior capstone project💡⚛️🎓
https://alex.wohlbruck.com/project/covalent
2 stars 0 forks source link

Implement Firebase RemoteThread class #8

Closed alexwohlbruck closed 2 years ago

alexwohlbruck commented 2 years ago

The firebase-micropython-esp32 library works with basic HTTP requests for Firebase, but the realtime subscriber is an unfinished implementation. This library is based on firebase-python which has a working subscriber.

Todo:

alexwohlbruck commented 2 years ago

Firebase sends realtime updates using the Server-Sent Events protocol. This mechanism sends to headers to the server: Connection: keep-alive Accept: text/event-stream

The server responds with Connection: keep-alive Content-Type: text/event-stream; charset=utf-8 and writes to the stream when an event comes in. New writes come in this format: event_name: payload\n\n Double newline represents the end of a message. Firebase sends JSON strings in these payloads, usually with a put event. For example: put: {"data":"test","path":"/"}\n\n

You can test this in the browser with the EventSource api, by pasting this into the console:

const url = 'https://project-friendship-lamp-default-rtdb.firebaseio.com/test.json'
const sse = new EventSource(url)
sse.addEventListener('put', function(message) {
    const data = JSON.parse(message.data)
    console.log(data.data)
})

The guy that wrote the firebase client for Micropython began to implement subscribers by adapting this SSEClient class, but the original repo's documentation describes this example, where a Response object from requests or urllib3 must be passed in:

import json
import pprint
import sseclient

def with_urllib3(url, headers):
    """Get a streaming response for the given event feed using urllib3."""
    import urllib3
    http = urllib3.PoolManager()
    return http.request('GET', url, preload_content=False, headers=headers)

def with_requests(url, headers):
    """Get a streaming response for the given event feed using requests."""
    import requests
    return requests.get(url, stream=True, headers=headers)

url = 'http://domain.com/events'
headers = {'Accept': 'text/event-stream'}
response = with_urllib3(url, headers)  # or with_requests(url, headers)
client = sseclient.SSEClient(response)
for event in client.events():
    pprint.pprint(json.loads(event.data))

I now need to figure out how to use Micropython's urequests library to get an iterable stream response object from the Firebase resource. urequests doesn't have native support for iterable streams, but this guy created a PR that adds support for them. I downloaded his code and it half works. Some messages are cut off, and Firebase's put messages only come through when I delete a resource, but not when I update it. I will go off of this code to try to correctly implement the iterable stream, possibly using the original requests library code as a reference.

alexwohlbruck commented 2 years ago

Reason for messages cut-off: ITER_CHUNK_SIZE is a constant that defines the chunk size for each message. This should be variable depending on the message size, which I think is stored at the beginning of a message write. If not, we can find the end of a chunk when we read the double newline character \n\n.

alexwohlbruck commented 2 years ago

The iter_content function and it's documentation from requests

iter_content(chunk_size=1, decode_unicode=False)[source]

Iterates over the response data. When stream=True is set on the request, this avoids reading the content at once into memory for large responses. The chunk size is the number of bytes it should read into memory. This is not necessarily the length of each item returned as decoding can take place.

chunk_size must be of type int or None. A value of None will function differently depending on the value of stream. stream=True will read data as it arrives in whatever size the chunks are received. If stream=False, data is returned as a single chunk.

If decode_unicode is True, content will be decoded using the best available encoding based on the response.

def iter_content(self, chunk_size=1, decode_unicode=False):
    """Iterates over the response data.  When stream=True is set on the
    request, this avoids reading the content at once into memory for
    large responses.  The chunk size is the number of bytes it should
    read into memory.  This is not necessarily the length of each item
    returned as decoding can take place.
    chunk_size must be of type int or None. A value of None will
    function differently depending on the value of `stream`.
    stream=True will read data as it arrives in whatever size the
    chunks are received. If stream=False, data is returned as
    a single chunk.
    If decode_unicode is True, content will be decoded using the best
    available encoding based on the response.
    """

    def generate():
        # Special case for urllib3.
        if hasattr(self.raw, 'stream'):
            try:
                for chunk in self.raw.stream(chunk_size, decode_content=True):
                    yield chunk
            except ProtocolError as e:
                raise ChunkedEncodingError(e)
            except DecodeError as e:
                raise ContentDecodingError(e)
            except ReadTimeoutError as e:
                raise ConnectionError(e)
            except SSLError as e:
                raise RequestsSSLError(e)
        else:
            # Standard file-like object.
            while True:
                chunk = self.raw.read(chunk_size)
                if not chunk:
                    break
                yield chunk

        self._content_consumed = True

    if self._content_consumed and isinstance(self._content, bool):
        raise StreamConsumedError()
    elif chunk_size is not None and not isinstance(chunk_size, int):
        raise TypeError("chunk_size must be an int, it is instead a %s." % type(chunk_size))
    # simulate reading small chunks of the content
    reused_chunks = iter_slices(self._content, chunk_size)

    stream_chunks = generate()

    chunks = reused_chunks if self._content_consumed else stream_chunks

    if decode_unicode:
        chunks = stream_decode_response_unicode(chunks, self)

    return chunks
alexwohlbruck commented 2 years ago

Update: the solution was very simple, the iter_lines function takes a chunk size and delimiter, which I've set as 1 and b'\n\n', and this correctly parses the output here is the decoded stream from Firebase:

event: put
data: {"path":"/","data":"hello world"}

event: keep-alive
data: null

...