cognitedata / cognite-sdk-python

Cognite Python SDK
https://cognite-sdk-python.readthedocs-hosted.com/
Apache License 2.0
76 stars 27 forks source link

ThreadPoolExecutor locks/hangs when getting all items from client.relationships.list #1831

Closed korvalds closed 3 weeks ago

korvalds commented 2 months ago

System information:

Describe the bug ThreadPoolExecutor locks/hangs when getting all items (limit=-1) from client.relationships.list endpoint using partitions=10 and source_external_ids=[...] when len(source_external_id_list) > self._LIST_SUBQUERY_LIMIT

To Reproduce Runnable code reproducing the error.

from cognite.client import CogniteClient

client = CogniteClient()

workorders = client.events.list(
    data_set_external_ids=["some_id"],
    type="work_order",
    metadata=({"woMainAsset": "asset_name"}),
    limit=-1,
    partitions=10
).to_pandas(camel_case=True)

ops = client.relationships.list(
    data_set_external_ids=["some_id"],
    source_types=["event"],
    target_types=["event"],
    source_external_ids=workorders["externalId"].tolist(), # this is maybe 50000 ids
    limit=-1,
    partitions=10,
).to_pandas(camel_case=True)

 # Then hangs indefinitely, until killing the process:

  File "/home/dev/test_cognite_sdk_client.py", line 111, in <module>
    ops = client.relationships.list(
  File "/home/dev/.virtualenvs/core/lib/python3.10/site-packages/cognite/client/_api/relationships.py", line 353, in list
    tasks_summary = execute_tasks(
  File "/home/dev/.virtualenvs/core/lib/python3.10/site-packages/cognite/client/utils/_concurrency.py", line 300, in execute_tasks
    for fut in as_completed(futures_dct):
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 245, in as_completed
    waiter.event.wait(wait_timeout)
  File "/usr/lib/python3.10/threading.py", line 607, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.10/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

Exception ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
    atexit_call()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt:

Expected behavior Should return all relationsships.

Additional context I think the problem lies in the execute_tasks implementation, where a there is a single get_thread_pool_executor returned every time. Since the client.relationships.list has nested calls to execute_tasks: execute_tasks(..., self._list ..) -> self._list_partitioned -> execute_tasks(get_partition..) the threads lock up.

Solution can be to use a context manager inside execute_tasks: with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:

haakonvt commented 1 month ago

Hi @korvalds and thanks for the bug report! Sorry for not responding earlier (vacation šŸŒ“ ). This issue will be prioritized and I expect a fix to be out later this week.

haakonvt commented 1 month ago

@korvalds a fix is released now, would you mind retrying with 7.54.2?

korvalds commented 1 month ago

@haakonvt ok, thanks. Will try next week when I'm back from vacation šŸ˜Ž

korvalds commented 1 month ago

@haakonvt Hi! I have tested, but not sure if the fix is good enough. Long comment below šŸ˜…

The main motivation for using the partitions=10 argument is to prevent 429 and timeout errors when using the api. This lock issue was first encountered while investigation 429 errors, and I tried following the documentation . With partitions it seems to reduce the number of of 429 / retries and also is much faster.

In your fix the partitions argument is ignored when source/target external IDs have more than {self._LIST_SUBQUERY_LIMIT}. Which seems contrary to use partitions when the number of requests increases, or I'm missing something here? And is this not also prone to the same 429 errors as before? (I don't know the specifics values of current rate limits, just encountering them, do you have some facts on rate limiting?)

With your fix, how is this code now different than not using partitions at all? I.e partitions=None takes the same amount of time. The lock was an issue with the singleton threadpool, what is the purpose of that and why not use a context manager instead?

haakonvt commented 1 month ago

@korvalds First of all, I understand your concern regarding the removal of partitions, however, I do believe my reasoning for it is sound, and I'll try to explain.

Since the API allows a maximum of 1000 source- or target ids, whenever the user exceeds this amount, we need to create subqueries and merge the results of these subqueries client side. The number of subqueries scales quadratically, as each source chunk must be fetched against all target chunks. This quickly creates a lot of requests.

If each of these individual requests are to use partitions, we end up in a situation where the total number of requests grows even faster - and that's why I removed it. We still fetch these subqueries in parallel, but we don't parallelize each one further by use of partitions.

Rate limiting does not distinguish on whether partitions is used or not, only the total volume.

I can see that the new implementation is slower between the two "fetch regimens".

For maintainability, there is also a good argument to not make the fetching logic too complicated, but I'm happy to discuss other ideas and strategies for how to make this as performant as possible!

If you have a specific scenario that fetches significantly slower than it used to (I guess compared with a quite old SDK version) I would be very interested in seeing/doing some benchmarking on this!

korvalds commented 3 weeks ago

@haakonvt to summarize: will close this issue with the fix in 7.54.2

For those interested, this issue was tested with two different strategies for executing parallel requests, using the fix in master and concurrency-strategy-testing-relationships.

The results (in my use case) showed that the approach in concurrency-strategy-testing-relationships was about 7 times faster. This was because the parallelization happens at the request level and limits the amount of pagination required. That reduces sequential requests since the response is distributed between more partitions instead of waiting for nextCursor value from previous request.

Depending on cognite-sdk version, split source_external_id_list beforehand is maybe worth a try:

source_external_id_list = workorders ["externalId"].tolist() # this is maybe 50000 ids
subquery_limit = client.relationships._LIST_SUBQUERY_LIMIT
results = []
for si in range(0, max(1, len(source_external_id_list)), subquery_limit ):
    results.extend(client.relationships._list(
        list_cls=RelationshipList,
                resource_cls=Relationship,
                method="POST",
        filter={"dataSetIds": [{'externalId': "some_id"}],
        "sourceTypes":["event"],
        "targetTypes":["event"],
        "sourceExternalIds":source_external_id_list[si: si + subquery_limit ]},
        limit=-1,
        partitions=3,
    ))
ops = RelationshipList(results, cognite_client=client).to_pandas(camel_case=True)