jeppe742 / DeltaLakeReader

Read Delta tables without any Spark
Apache License 2.0
47 stars 14 forks source link

Getting AttributeError: 'DeltaTable' object has no attribute 'schema' #37

Closed theyaa closed 2 years ago

theyaa commented 2 years ago

Seems like the self.schema is not getting defined in the init function. Would you please check the code and see how you would define its default value?

jeppe742 commented 2 years ago

Hey @theyaa

Would you be able to provide some more information? the self.schema is set during parsing the transaction logs, so I would expect there is something funky with your logs. Maybe you could provide one of the logs or checkpoints from the _delta_log folder?

I see that I don't handle it very gracefully when we don't find the transaction logs, so perhaps there is something wrong with your path?

theyaa commented 2 years ago

Hi @jeppe742 I have tried couple way to use the library to connect to my azure blob storage and both did not work. Each gives different error.

The first one I tried: df = DeltaTable('abfss://MyContainer@MyAccount.blob.core.windows.net/path...',file_system=fs)

This one gives me the below error: `C:\Anaconda3\envs\theyaa4\lib\site-packages\azure\storage\blob\aio_list_blobs_helper.py in _get_next_cb(self, continuation_token) 76 use_location=self.location_mode) 77 except HttpResponseError as error: ---> 78 process_storage_error(error) 79 80 async def _extract_data_cb(self, get_next_return):

C:\Anaconda3\envs\theyaa4\lib\site-packages\azure\storage\blob_shared\response_handlers.py in process_storage_error(storage_error) 175 try: 176 # from None prevents us from double printing the exception (suppresses generated layer error context) --> 177 exec("raise error from None") # pylint: disable=exec-used # nosec 178 except SyntaxError: 179 raise error

C:\Anaconda3\envs\theyaa4\lib\site-packages\azure\storage\blob_shared\response_handlers.py in

HttpResponseError: The specifed resource name contains invalid characters. RequestId:1dc1ddaa-601e-004e-8045-238f75000000 Time:2022-02-16T14:54:20.8870765Z ErrorCode:InvalidResourceName Content: <?xml version="1.0" encoding="utf-8"?>InvalidResourceNameThe specifed resource name contains invalid characters. RequestId:1dc1ddaa-601e-004e-8045-238f75000000 Time:2022-02-16T14:54:20.8870765Z`

The second method: df = DeltaTable('wasbs://MyAccount/path...',file_system=fs)

And this one gives the below error. `AttributeError Traceback (most recent call last) C:\Users\THEYAA~1.MAT\AppData\Local\Temp/ipykernel_25844/133933574.py in 1 #df = DeltaTable('abfss://datalake@dlsdanadfprod.blob.core.windows.net/curated/vehicle/paccar_2_0/PROG00005025/clt-17/telemetry/delta/data/DST_Year=2022/DST_Month=2',file_system=fs) ----> 2 df = DeltaTable('wasbs://dlsdanadfprod/curated/vehicle/paccar_2_0/PROG00005025/clt-17/telemetry/delta/data/DST_Year=2022/DST_Month=2',file_system=fs)

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in init(self, path, file_system) 42 # which makes it hard to inherit from it directly. 43 # Instead we will just have the dataset as an attribute and expose the important methods. ---> 44 self.pyarrow_dataset = self._pyarrow_dataset() 45 46 def _pyarrow_dataset(self):

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in _pyarrow_dataset(self) 50 partitioning="hive", 51 format="parquet", ---> 52 schema=self.schema, 53 ) 54

AttributeError: 'DeltaTable' object has no attribute 'schema'`

When I try with DataBricks I am able to connect with the first method and read the data. Anything I am doing wrong here?

theyaa commented 2 years ago

I tried more scenarios and now with the scenario below I get a different error: df = DeltaTable('abfss://container/path...',file_system=fs)

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in init(self, path, file_system) 37 file_system = LocalFileSystem() 38 self.filesystem = file_system ---> 39 self._as_newest_version() 40 41 # The PyArrow Dataset is exposed by a factory class,

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in _as_newest_version(self) 133 try: 134 # get latest checkpoint version --> 135 with self.filesystem.open(f"{self.log_path}/_last_checkpoint") as lst_check: 136 checkpoint_info = lst_check.read() 137 checkpoint_info = json.loads(checkpoint_info)

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\spec.py in open(self, path, mode, block_size, cache_options, compression, **kwargs) 1028 else: 1029 ac = kwargs.pop("autocommit", not self._intrans) -> 1030 f = self._open( 1031 path, 1032 mode=mode,

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _open(self, path, mode, block_size, autocommit, cache_options, cache_type, metadata, **kwargs) 1605 """ 1606 logger.debug(f"_open: {path}") -> 1607 return AzureBlobFile( 1608 fs=self, 1609 path=path,

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in init(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, metadata, **kwargs) 1719 raise NotImplementedError("File mode not supported") 1720 if self.mode == "rb": -> 1721 if not hasattr(self, "details"): 1722 self.details = self.fs.info(self.path) 1723 self.size = self.details["size"]

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\spec.py in details(self) 1383 def details(self): 1384 if self._details is None: -> 1385 self._details = self.fs.info(self.path) 1386 return self._details 1387

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in info(self, path, refresh, **kwargs) 573 fetch_from_azure = True 574 if fetch_from_azure: --> 575 return sync(self.loop, self._info, path, refresh) 576 return super().info(path) 577

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\asyn.py in sync(loop, func, timeout, *args, **kwargs) 69 raise FSTimeoutError from return_result 70 elif isinstance(return_result, BaseException): ---> 71 raise return_result 72 else: 73 return return_result

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\asyn.py in _runner(event, coro, result, timeout) 23 coro = asyncio.wait_for(coro, timeout=timeout) 24 try: ---> 25 result[0] = await coro 26 except Exception as ex: 27 result[0] = ex

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _info(self, path, refresh, kwargs) 594 invalidate_cache = False 595 path = self._strip_protocol(path) --> 596 out = await self._ls( 597 self._parent(path), invalidate_cache=invalidate_cache, kwargs 598 )

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _ls(self, path, invalidate_cache, delimiter, returnglob, **kwargs) 803 elif ( 804 blob["name"].count("/") == depth --> 805 and blob["size"] == 0 806 ): 807 outblobs.append(blob)

C:\Anaconda3\envs\theyaa4\lib\site-packages\azure\storage\blob_shared\models.py in getitem(self, key) 189 190 def getitem(self, key): --> 191 return self.dict[key] 192 193 def repr(self):

KeyError: 'size'

jeppe742 commented 2 years ago

hmm. What if you don't provide the full path including protocol, but only the container and folder as in the following example ?

from deltalake import DeltaTable
from adlfs import AzureBlobFileSystem

#example url  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/somepath/mytable'
fs = AzureBlobFileSystem(
        account_name="myStorageAccount", 
        credential='...'
    )
df = DeltaTable("mycontainer/somepath/mytable", file_system=fs).to_pandas()
theyaa commented 2 years ago

sure, getting this exception

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in init(self, path, file_system) 37 file_system = LocalFileSystem() 38 self.filesystem = file_system ---> 39 self._as_newest_version() 40 41 # The PyArrow Dataset is exposed by a factory class,

C:\Anaconda3\envs\theyaa4\lib\site-packages\deltalake\deltatable.py in _as_newest_version(self) 133 try: 134 # get latest checkpoint version --> 135 with self.filesystem.open(f"{self.log_path}/_last_checkpoint") as lst_check: 136 checkpoint_info = lst_check.read() 137 checkpoint_info = json.loads(checkpoint_info)

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\spec.py in open(self, path, mode, block_size, cache_options, compression, **kwargs) 1028 else: 1029 ac = kwargs.pop("autocommit", not self._intrans) -> 1030 f = self._open( 1031 path, 1032 mode=mode,

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _open(self, path, mode, block_size, autocommit, cache_options, cache_type, metadata, **kwargs) 1605 """ 1606 logger.debug(f"_open: {path}") -> 1607 return AzureBlobFile( 1608 fs=self, 1609 path=path,

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in init(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, metadata, **kwargs) 1719 raise NotImplementedError("File mode not supported") 1720 if self.mode == "rb": -> 1721 if not hasattr(self, "details"): 1722 self.details = self.fs.info(self.path) 1723 self.size = self.details["size"]

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\spec.py in details(self) 1383 def details(self): 1384 if self._details is None: -> 1385 self._details = self.fs.info(self.path) 1386 return self._details 1387

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in info(self, path, refresh, **kwargs) 573 fetch_from_azure = True 574 if fetch_from_azure: --> 575 return sync(self.loop, self._info, path, refresh) 576 return super().info(path) 577

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\asyn.py in sync(loop, func, timeout, *args, **kwargs) 69 raise FSTimeoutError from return_result 70 elif isinstance(return_result, BaseException): ---> 71 raise return_result 72 else: 73 return return_result

C:\Anaconda3\envs\theyaa4\lib\site-packages\fsspec\asyn.py in _runner(event, coro, result, timeout) 23 coro = asyncio.wait_for(coro, timeout=timeout) 24 try: ---> 25 result[0] = await coro 26 except Exception as ex: 27 result[0] = ex

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _info(self, path, refresh, kwargs) 594 invalidate_cache = False 595 path = self._strip_protocol(path) --> 596 out = await self._ls( 597 self._parent(path), invalidate_cache=invalidate_cache, kwargs 598 )

C:\Anaconda3\envs\theyaa4\lib\site-packages\adlfs\spec.py in _ls(self, path, invalidate_cache, delimiter, returnglob, **kwargs) 803 elif ( 804 blob["name"].count("/") == depth --> 805 and blob["size"] == 0 806 ): 807 outblobs.append(blob)

C:\Anaconda3\envs\theyaa4\lib\site-packages\azure\storage\blob_shared\models.py in getitem(self, key) 189 190 def getitem(self, key): --> 191 return self.dict[key] 192 193 def repr(self):

KeyError: 'size'

theyaa commented 2 years ago

@jeppe742 I was able to get it to work by removing / from /delta/data/, something like /delta/data.

But the problem I am facing right now is when trying to convert to_pandas(). It is breaking the kernel of my notebook as it is trying to load all the data in that path, which can be huge amount of data. Under /delta/data I have more specific paths that can limit the size of the data I need. Like Year=2022/Month=02/Day=02. When I add this filter to the path it does not work.

How can I filter more, like above, to limit the amount of data I am loading.

jeppe742 commented 2 years ago

@theyaa Ah yes. You need to specify the root directory. Each of your partitions are technically not delta tables, but just a collection of parquet files. We need the transaction logs in the root, to figure out which files belong to each version.

pyarrow, which this is partially built on, should support partition pruning. I never actually tested it in depth, so would be very interesting to hear if it works for you 😉 The following example should give you a start.

import pyarrow.dataset as ds
...
df = DeltaTable("...").to_table(filter=ds.field("Year")==2022).to_pandas()

You can see a bit more in the pyarrow docs

jeppe742 commented 2 years ago

@theyaa Did you manage to test out the partition pruning. Otherwise I will close this issue 😃

theyaa commented 2 years ago

@jeppe742 I was able to test it out successfully and worked like a charm. df = DeltaTable('path/to/table',file_system=fs).to_table(columns=["col1","col2",...],filter=(ds.field("year")==datetime.now().year) & (ds.field("month")==datetime.now().month) & (ds.field("day")==str(datetime.now().day))).to_pandas()

jeppe742 commented 2 years ago

Great to hear ! I also included a more descriptive error message in #39