jeppe742 / DeltaLakeReader

Read Delta tables without any Spark
Apache License 2.0
47 stars 14 forks source link

Problem with reading blobs with Deltatable from deltalake and AzureBlobFileSystem from adlfs; with python #43

Open vvandermeij opened 2 years ago

vvandermeij commented 2 years ago

Summary

I am trying to host an app online that has to read blobs with deltalake format from azure blobstorage. To do this I use: AzureBlobFileSystem from adlfs DeltaTable from deltalake. When hosting the website, (with azure App service) everything works well: When given input, the app reads the data from azure blob storage and returns the correct tables. The problem occurs when the app has been online and unused for several hours: When the app tries to read from the deltalake storage it fails and raises an HttpResponseError with a traceback that is not clear enough for me to understand what the actual issue is. When I restart the app, everything works fine again untill you wait for a few hours: The same bug keeps returning. I am not sure if the problem occurs in the adlfs package or the deltalake package. I hope that someone here can help to understand where it is failing and why and knows a solution to the problem! Thanks in advance

Traceback

HttpResponseError: The range specified is invalid for the current size of the resource. 
RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z 
ErrorCode:InvalidRange Content: <?xml version="1.0" 
encoding="utf-8"?><Error><Code>InvalidRange</Code><Message>
The range specified is invalid for the current size of the resource. 
RequestId:2988ea13-601e-0083-40f7-47b68d000000 Time:2022-04-04T07:42:05.9077599Z</Message></Error>

Traceback:
File "/opt/venv/lib/python3.9/site-packages/streamlit/script_runner.py", line 379, in _run_script
    exec(code, module.__dict__)
File "/app/src/incasso/dashboard_form.py", line 95, in <module>
    eenheid = find_eenheidnum(deelcontractnummer, fs).iloc[0]
File "/opt/venv/lib/python3.9/site-packages/sklego/pandas_utils.py", line 81, in wrapper
    result = func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/incasso/dashboard_functions.py", line 29, in find_eenheidnum
    DeltaTable("20-silver/edh/woc_contracten", file_system=fs)
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 45, in __init__
    self._as_newest_version()
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 151, in _as_newest_version
    self._apply_partial_logs(version=self.checkpoint + 9)
File "/opt/venv/lib/python3.9/site-packages/deltalake/deltatable.py", line 113, in _apply_partial_logs
    for line in log:
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1616, in __next__
    out = self.readline()
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1613, in readline
    return self.readuntil(b"\n")
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1596, in readuntil
    part = self.read(blocks or self.blocksize)
File "/opt/venv/lib/python3.9/site-packages/fsspec/spec.py", line 1565, in read
    out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/venv/lib/python3.9/site-packages/fsspec/caching.py", line 154, in _fetch
    self.cache = self.fetcher(start, end)  # new block replaces old
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 91, in wrapper
    return sync(self.loop, func, *args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 71, in sync
    raise return_result
File "/opt/venv/lib/python3.9/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
File "/opt/venv/lib/python3.9/site-packages/adlfs/spec.py", line 1804, in _async_fetch_range
    stream = await self.container_client.download_blob(
File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer
    return await func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_container_client_async.py", line 1000, in download_blob
    return await blob_client.download_blob(
File "/opt/venv/lib/python3.9/site-packages/azure/core/tracing/decorator_async.py", line 74, in wrapper_use_tracer
    return await func(*args, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 480, in download_blob
    await downloader._setup()  # pylint: disable=protected-access
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 250, in _setup
    self._response = await self._initial_request()
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/aio/_download_async.py", line 336, in _initial_request
    process_storage_error(error)
File "/opt/venv/lib/python3.9/site-packages/azure/storage/blob/_shared/response_handlers.py", line 181, in process_storage_error
    exec("raise error from None")   # pylint: disable=exec-used # nosec
File "<string>", line 1, in <module>
jeppe742 commented 2 years ago

Hey @vvandermeij, thanks for reaching out. It sounds like something that might be a bit tricky for me to reproduce, so might take some time to debug.

Would you be able to share some snippets of how you use the library in your code? Do you instantiate the AzureBlobFileSystem on startup, and then call DeltaTable(...) every time someone interacts with the app?

vvandermeij commented 2 years ago

Hi @jeppe742 Thank you for the quick response.

fs = AzureBlobFileSystem(
    account_name=account_name,
    credential=sas_key
)

Is initiated at least once every time a user uses the app. the function where we get the error the most often is the following:

def find_adres(eenheidnum, fs):
    """function that retrieves the adres of a eenheidnummer from the datalake.

    Args:
        eenheidnum: the eenheidnum that we want to know the adres from
        fs: the file system that is used to log in to the datalake

    Returns:
        a pandas dataframe with the adres
    """
    return (DeltaTable("20-silver/edh/woc_eenheden", file_system=fs)
            .to_table(columns=['huisnummer',
                               'straatnaam',
                               'huisnummertoevoeging',
                               'huisnummerletter'],
                      filter=(ds.field('eenheidnummer') == eenheidnum))
            .to_pandas())

Thank you for looking in to this!