man-group / arctic

High performance datastore for time series and tick data
https://arctic.readthedocs.io/en/latest/
GNU Lesser General Public License v2.1
3.05k stars 583 forks source link

"AttributeError: 'NoneType' object has no attribute 'append'" using 4 or more threads. Fewer threads works fine. #998

Open JakobLS opened 1 year ago

JakobLS commented 1 year ago

Arctic Version

arctic 1.79.2

However, I've tested other versions as well.

Arctic Store

ChunkStore

Platform and version

MacOS Monterey, 12.6 MacBook Pro M1 Conda (miniconda) environment 4.12.0

python 3.9.16 mongodb-community@6.0 installed via brew at /opt/homebrew/var (tried mongodb-community@5.0 as well) pymongo 4.3.3 (I've tested 3.11.0 and 3.12.0 as well) pandas 1.1.5 numpy 1.24.2

Description of problem and/or code sample that reproduces the issue

The code works just fine when I use up to 3 of the M1's 8 cores. However, once I start using 4 or more, I get the following error:

  File "/Users/username/Desktop/Projects/path/to/file.py", line 248, in function1
    var1, var2, var3 = function2(
  File "/Users/username/Desktop/Projects/path/to/file2.py", line 369, in function2
    var11, var12 = function3(input1, input2, input3, input4,
  File "/Users/username/Desktop/Projects/path/to/file3.py", line 79, in function3
    close_df = get_df_1m(start, end, ticker)
  File "/Users/username/Desktop/Projects/path/to/file4.py", line 57, in get_df_1m
    arc.initialize_library(
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/decorators.py", line 49, in f_retry
    return f(*args, **kwargs)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/arctic.py", line 310, in initialize_library
    self._cache.append('list_libraries', self._sanitize_lib_name(library))
AttributeError: 'NoneType' object has no attribute 'append'

And that in turn eventually leads to the following error due to (I think) the database shutting down:

File "/Users/username/Desktop/Projects/path/to/file.py", line 214, in function1
    var21, var22, var23 = function4(
  File "/Users/username/Desktop/Projects/path/to/file2.py", line 689, in function4
    close_df = get_df_1m(start, end, ticker)
  File "/Users/username/Desktop/Projects/path/to/file3.py", line 57, in get_df_1m
    arc.initialize_library(
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/decorators.py", line 49, in f_retry
    return f(*args, **kwargs)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/arctic.py", line 297, in initialize_library
    lib = ArcticLibraryBinding(self, library)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/arctic.py", line 501, in __init__
    self._curr_conn = self.arctic._conn
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/decorators.py", line 49, in f_retry
    return f(*args, **kwargs)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/arctic.py", line 152, in _conn
    self._cache = Cache(self.__conn)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/arctic/_cache.py", line 26, in __init__
    if cache_col not in self._cachedb.list_collection_names():
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/database.py", line 959, in list_collection_names
    return [result["name"] for result in self.list_collections(session=session, **kwargs)]
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/database.py", line 911, in list_collections
    return self.__client._retryable_read(_cmd, read_pref, session)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/_csot.py", line 105, in csot_wrapper
    return func(self, *args, **kwargs)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1441, in _retryable_read
    server = self._select_server(read_pref, session, address=address)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/mongo_client.py", line 1257, in _select_server
    server = topology.select_server(server_selector)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/topology.py", line 272, in select_server
    server = self._select_server(selector, server_selection_timeout, address)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/topology.py", line 261, in _select_server
    servers = self.select_servers(selector, server_selection_timeout, address)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/topology.py", line 223, in select_servers
    server_descriptions = self._select_servers_loop(selector, server_timeout, address)
  File "/Users/username/miniconda3/envs/env1/lib/python3.9/site-packages/pymongo/topology.py", line 238, in _select_servers_loop
    raise ServerSelectionTimeoutError(
pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 61] Connection refused, Timeout: 30.0s, Topology Description: <TopologyDescription id: 6414395fce485adfeec51165, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 61] Connection refused')>]>
Exception ServerSelectionTimeoutError Occured  

Function get_df_1m looks like this:

from arctic import CHUNK_STORE, Arctic

def get_df_1m(start, end, ticker):

    arc = Arctic('localhost')

    arc.initialize_library('ts_data.one_min_chunks', lib_type=CHUNK_STORE)
    lib = arc['ts_data.one_min_chunks']

    dR = pd.date_range(start=start, end=end, freq='T')
    close_df = lib.read(ticker, chunk_range=dR)

    return close_df

And the way I initiate multiprocessing is as follows:

from multiprocessing import Process, Queue

numProcs = os.cpu_count()
outQ = Queue()
splits = int(math.ceil(len(tickers) / float(numProcs)))
procs = []

for _id in range(numProcs):
    p = Process(target=function1,
                args=(tickers[splits * _id: splits * (_id + 1)],
                      bt_args,
                      _id,
                      outQ))
    procs.append(p)
    p.start()

Any help on how I can get this working with all 8 threads is much appreciated.

JakobLS commented 1 year ago

I think there was a mistake in my code that caused the issue and not with Arctic. It's solved now. My apologies. Closing.

JakobLS commented 1 year ago

After feeding a larger chunk of data (roughly 10x more than before) the problem as appeared again. Reopening in the hope that a solution can be found.