run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
37.12k stars 5.33k forks source link

[Bug]: Adding RedisCache to an ingestion pipeline makes it unparallelizable #17125

Open omrihar opened 1 day ago

omrihar commented 1 day ago

Bug Description

Having an ingestion pipeline with a redis cache makes the pipeline not compatible with multiprocessing, so setting num_workers to be nonzero causes the pipeline to fail with TypeError: cannot pickle '_thread.lock' object.

Version

0.12.1

Steps to Reproduce

Have redis running somewhere (I use localhost) and execute the following code:

from llama_index.core.ingestion import IngestionCache, IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache

documents = [
    Document(text="This is my first document", doc_id="first_doc"),
    Document(text="Second document", doc_id="second_doc"),
]

pipeline = IngestionPipeline(
    transformations=[SentenceSplitter()],
    cache=IngestionCache(
        cache=RedisCache.from_host_and_port(host="localhost", port=6379),
        collection="test-collection",
    ),
)

nodes = pipeline.run(documents=documents, num_workers=4)

Relevant Logs/Tracbacks

TypeError                                 Traceback (most recent call last)
Cell In[5], line 19
      6 documents = [
      7     Document(text="This is my first document", doc_id="first_doc"),
      8     Document(text="Second document", doc_id="second_doc"),
      9 ]
     11 pipeline = IngestionPipeline(
     12     transformations=[SentenceSplitter()],
     13     cache=IngestionCache(
   (...)
     16     ),
     17 )
---> 19 nodes = pipeline.run(documents=documents, num_workers=4)

File ~/.pyenv/versions/3.11.9/envs/kira/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py:311, in Dispatcher.span.<locals>.wrapper(func, instance, args, kwargs)
    308             _logger.debug(f"Failed to reset active_span_id: {e}")
    310 try:
--> 311     result = func(*args, **kwargs)
    312     if isinstance(result, asyncio.Future):
    313         # If the result is a Future, wrap it
    314         new_future = asyncio.ensure_future(result)

File ~/.pyenv/versions/3.11.9/envs/kira/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py:533, in IngestionPipeline.run(self, show_progress, documents, nodes, cache_collection, in_place, store_doc_text, num_workers, **kwargs)
    529     with multiprocessing.get_context("spawn").Pool(num_workers) as p:
    530         node_batches = self._node_batcher(
    531             num_batches=num_workers, nodes=nodes_to_run
    532         )
--> 533         nodes_parallel = p.starmap(
    534             run_transformations,
    535             zip(
    536                 node_batches,
    537                 repeat(self.transformations),
    538                 repeat(in_place),
    539                 repeat(self.cache if not self.disable_cache else None),
    540                 repeat(cache_collection),
    541             ),
    542         )
    543         nodes = reduce(lambda x, y: x + y, nodes_parallel, [])  # type: ignore
    544 else:

File ~/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py:375, in Pool.starmap(self, func, iterable, chunksize)
    369 def starmap(self, func, iterable, chunksize=None):
    370     '''
    371     Like `map()` method but the elements of the `iterable` are expected to
    372     be iterables as well and will be unpacked as arguments. Hence
    373     `func` and (a, b) becomes func(a, b).
    374     '''
--> 375     return self._map_async(func, iterable, starmapstar, chunksize).get()

File ~/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py:774, in ApplyResult.get(self, timeout)
    772     return self._value
    773 else:
--> 774     raise self._value

File ~/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/pool.py:540, in Pool._handle_tasks(taskqueue, put, outqueue, pool, cache)
    538     break
    539 try:
--> 540     put(task)
    541 except Exception as e:
    542     job, idx = task[:2]

File ~/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/connection.py:206, in _ConnectionBase.send(self, obj)
    204 self._check_closed()
    205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))

File ~/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
     48 @classmethod
     49 def dumps(cls, obj, protocol=None):
     50     buf = io.BytesIO()
---> 51     cls(buf, protocol).dump(obj)
     52     return buf.getbuffer()

TypeError: cannot pickle '_thread.lock' object
dosubot[bot] commented 1 day ago

The error you're encountering is due to the RedisKVStore instance not being serializable, which is required for multiprocessing. The RedisKVStore class creates a Redis client connection that includes non-serializable objects like _thread.lock, which cannot be pickled and thus cannot be shared across processes in a multiprocessing pool [1].

To work around this issue, you can consider the following approaches:

  1. Disable Caching During Multiprocessing: If caching is not critical for your multiprocessing tasks, you can disable the cache when running the pipeline with multiple workers. This can be done by setting the cache parameter to None or using a conditional to disable it when num_workers > 1.

  2. Use a Different Caching Strategy: If caching is necessary, consider using a different caching mechanism that supports serialization, such as a file-based cache or an in-memory cache that can be shared across processes.

  3. Custom Serialization: Implement custom serialization logic for the RedisKVStore if possible, though this can be complex and may not be feasible depending on the underlying library's constraints.

These approaches can help you bypass the serialization issue with RedisKVStore when using multiprocessing in your ingestion pipeline.

To continue talking to Dosu, mention @dosu.