man-group / ArcticDB

ArcticDB is a high performance, serverless DataFrame database built for the Python Data Science ecosystem.
http://arcticdb.io
Other
1.53k stars 94 forks source link

Huge CSV File import #598

Closed balajipitchumani closed 1 year ago

balajipitchumani commented 1 year ago

Hi Team I am trying to read 100mil CSV into a symbol, notebook as well as as .py program dies due to memory issues as expected. I tried dask and then compute() before trying to save into ArcticDB, this fails as well. Is there any other best practice to ingest millions of OHLC rows into ArcticDB? I could not find any other source to get the answer to this query.

Thanks for your help.

jamesmunro commented 1 year ago

Hi,

I was able to create a 100M row 4 column data frame and write to ArcticDB in a 12Gig VM.

If you're in a lower memory environment then you could use the pandas 'chunksize' feature combined with ArcticDB's append.

chunksize = 10_000_000
with pd.read_csv('100m.csv', chunksize=chunksize, index_col=0, parse_dates=True) as reader:
    for chunk in reader:
        library.append('data', chunk)

That used just a few gigabytes of memory in my test.

jamesmunro commented 1 year ago

This is not an issue with ArcticDB itself, if you would like general advice then you could join our Slack. ArcticDB.io and follow 'Community' link.

balajipitchumani commented 1 year ago

Hi,

I could not find the slack channel to join to raise this, hence raising it here. I agree could not be an engine issue but something wrong here causing the issue.

This is the code I am trying to ingest data into ArcticDB table in a laptop with 32GB RAM, 8 cores CPU and 1TB NVMe SSD. For a smaller table it runs without issue. For a larger CSV of more than 100MB or so, It runs in jupyter notebook for few seconds and then stops with no messages. Subsequently, I only see storageexception ' Composite: d:c00:27:0x1f4089875aab887a@1689382890061091600[2700000,2800000], ' and similar.

from arcticdb import Arctic import pandas as pd import datetime as dt from arcticdb import QueryBuilder ac = Arctic('lmdb://c:/py/db') ac.create_library('bhu') l = ac['bhu']

tab='c00' l.delete(tab) cols = ['s','d','o','h','l','c','a','v'] chunksize = 1000000 with pd.read_csv('c:/users/bala/py/' + tab + '.csv', chunksize=chunksize, header=None, parse_dates=['d'],names=cols, dtype={'s':'str', 'o': 'float64', 'h': 'float64', 'l': 'float64', 'c': 'float64', 'a': 'float64', 'v': 'float64'}) as reader: for chunk in reader: chunk.set_index('d') l.append(tab, chunk)

q = QueryBuilder()

q = q[(q['d'] == dt.datetime(2023,1,3)) & (q['s'] == 'MMM')]

q = q.groupby('s').agg({'d':'max'}) df = l.read(tab,query_builder=q).data df

Many thanks Balaji

jamesmunro commented 1 year ago

I was able to create an example that does error but not with storageexception ' Composite: d:c00:27:0x1f4089875aab887a@1689382890061091600[2700000,2800000],. I opened #607 for my test case.

If you can create an example (including data) that reproduces your error then I will create an issue for that.

jamesmunro commented 1 year ago

To request slack access:

  1. go to our website: https://arcticdb.io/
  2. click "Community"
  3. click the Slack logo image
  4. fill out the form
  5. you should receive an invite within a day or two
balajipitchumani commented 1 year ago

Hello James,

Thanks for the support. I tried ingestion with some debug statements as below and ingestion goes fine. However, when I try to read, then facing storage exception error.

pandas 2.0.3 arcticdb 1.5.0 <class 'pandas.core.frame.DataFrame'> DatetimeIndex: 1000000 entries, 2011-01-03 to 2013-10-01 Data columns (total 7 columns):

Column Non-Null Count Dtype


0 s 1000000 non-null object 1 o 1000000 non-null float64 2 h 1000000 non-null float64 3 l 1000000 non-null float64 4 c 1000000 non-null float64 5 a 1000000 non-null float64 6 v 1000000 non-null float64 dtypes: float64(6), object(1) memory usage: 61.0+ MB None <class 'pandas.core.frame.DataFrame'> DatetimeIndex: 1000000 entries, 2013-10-01 to 2016-07-04 Data columns (total 7 columns):

Column Non-Null Count Dtype


0 s 1000000 non-null object 1 o 1000000 non-null float64 2 h 1000000 non-null float64 3 l 1000000 non-null float64 4 c 1000000 non-null float64 5 a 1000000 non-null float64 6 v 1000000 non-null float64 dtypes: float64(6), object(1) memory usage: 61.0+ MB None <class 'pandas.core.frame.DataFrame'> DatetimeIndex: 1000000 entries, 2016-07-04 to 2019-01-18 Data columns (total 7 columns):

Column Non-Null Count Dtype


0 s 1000000 non-null object 1 o 1000000 non-null float64 2 h 1000000 non-null float64 3 l 1000000 non-null float64 4 c 1000000 non-null float64 5 a 1000000 non-null float64 6 v 1000000 non-null float64 dtypes: float64(6), object(1) memory usage: 61.0+ MB None <class 'pandas.core.frame.DataFrame'> DatetimeIndex: 1000000 entries, 2019-01-18 to 2021-06-29 Data columns (total 7 columns):

Column Non-Null Count Dtype


0 s 1000000 non-null object 1 o 1000000 non-null float64 2 h 1000000 non-null float64 3 l 1000000 non-null float64 4 c 1000000 non-null float64 5 a 1000000 non-null float64 6 v 1000000 non-null float64 dtypes: float64(6), object(1) memory usage: 61.0+ MB None <class 'pandas.core.frame.DataFrame'> DatetimeIndex: 959748 entries, 2021-06-29 to 2023-07-11 Data columns (total 7 columns):

Column Non-Null Count Dtype


0 s 959748 non-null object 1 o 959748 non-null float64 2 h 959748 non-null float64 3 l 959748 non-null float64 4 c 959748 non-null float64 5 a 959748 non-null float64 6 v 959748 non-null float64 dtypes: float64(6), object(1) memory usage: 58.6+ MB None

Code ''' import pandas as pd import arcticdb from arcticdb import Arctic from arcticdb.config import set_log_level import faulthandler

faulthandler.enable()

print('pandas', pd.version) print('arcticdb', arcticdb.version)

set_log_level('DEBUG')

arctic = Arctic('lmdb://c:/py/db.lmdb') if 'bhu' not in arctic.list_libraries(): arctic.create_library('bhu') library = arctic['bhu']

tab='c00' library.delete(tab) chunksize = 1000000 cols = ['s','d','o','h','l','c','a','v'] with pd.read_csv('c:/py/' + tab + '.csv', chunksize=chunksize, index_col='d', parse_dates=['d'], header=None, names=cols, dtype={'s':'str', 'o': 'float64', 'h': 'float64', 'l': 'float64', 'c': 'float64', 'a': 'float64', 'v': 'float64'}) as reader: for chunk in reader: print(chunk.info()) library.append(tab, chunk,validate_index=False)

Code to read which now throws error: ''' import pandas as pd import arcticdb from arcticdb import Arctic from arcticdb.config import set_log_level import faulthandler

faulthandler.enable()

print('pandas', pd.version) print('arcticdb', arcticdb.version)

set_log_level('DEBUG')

arctic = Arctic('lmdb://c:/py/db.lmdb') if 'bhu' not in arctic.list_libraries(): arctic.create_library('bhu') library = arctic['bhu']

q = QueryBuilder()

q = q[(q['d'] == dt.datetime(2023,7,3)) & (q['s'] == 'ACC')]

q = q.groupby('s').agg({'d':'max'}) df = library.read('c00',query_builder=q).data df.head(1) '''

Error Thrown Now

pandas 2.0.3 arcticdb 1.5.0

StorageException Traceback (most recent call last) Cell In[8], line 22 20 #q = q[(q['d'] == dt.datetime(2023,7,3)) & (q['s'] == 'ACC')] 21 q = q.groupby('s').agg({'d':'max'}) ---> 22 df = library.read('c00',query_builder=q).data 23 df.head(1)

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\arcticdb\version_store\library.py:858, in Library.read(self, symbol, as_of, date_range, columns, query_builder) 796 def read( 797 self, 798 symbol: str, (...) 802 query_builder: Optional[QueryBuilder] = None, 803 ) -> VersionedItem: 804 """ 805 Read data for the named symbol. Returns a VersionedItem object with a data and metadata element (as passed into 806 write). (...) 856 2 7 857 """ --> 858 return self._nvs.read( 859 symbol=symbol, as_of=as_of, date_range=date_range, columns=columns, query_builder=query_builder 860 )

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\arcticdb\version_store_store.py:1525, in NativeVersionStore.read(self, symbol, as_of, date_range, row_range, columns, query_builder, kwargs) 1521 row_range = _SignedRowRange(row_range[0], row_range[1]) 1522 version_query, read_options, read_query = self._get_queries( 1523 as_of, date_range, row_range, columns, query_builder, kwargs 1524 ) -> 1525 read_result = self._read_dataframe(symbol, version_query, read_query, read_options) 1526 return self._post_process_dataframe(read_result, read_query)

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\arcticdb\version_store_store.py:1590, in NativeVersionStore._read_dataframe(self, symbol, version_query, read_query, read_options) 1589 def _read_dataframe(self, symbol, version_query, read_query, read_options): -> 1590 return ReadResult(*self.version_store.read_dataframe_version(symbol, version_query, read_query, read_options))

StorageException: Composite: d:c00:4:0xbfd1c2c1c8db6f69@1689428954223646000[1678665600000000000,1685318400000000001],

balajipitchumani commented 1 year ago

Submitted new bug #609 as well. Thanks.