In IngestionPipeline, cache the transformations on individual nodes, so that the same node+transformation can be retrieved from the cache, instead of requiring the whole list of nodes to be the same.
Reason
The hash used to key the cache is based on the whole list of Nodes (e.g. Documents), instead of individual Nodes. Even if only a single Node has changed, the transformations are executed on the whole list again, since the hash is different.
import random
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.ingestion.cache import DEFAULT_CACHE_NAME
pipeline = IngestionPipeline(
# Use only 1 transformation for simplicity
transformations=[SentenceSplitter(chunk_size=50, chunk_overlap=0)]
)
docs = [
Document(id_=str(i), text=" ".join([random.choice("abcdefg") for _ in range(100)]))
for i in range(3)
]
# Before running pipeline, cache is empty
pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME)
# {}
# Run with docs 1 and 2
pipeline.run(documents=docs[0:3])
# Cache contains a single entry, keyed by the hash combining first 2 docs, with output of 6 nodes
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
print(f"Hash: {k}")
print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6
# Run with doc 1 only
_ = pipeline.run(documents=docs[0:1])
# A new cache entry is added representing only doc 1, even though it has been transformed before.
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
print(f"Hash: {k}")
print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6
# Hash: d790f504a8904444c4bd4a10c21cabf367d6b8f4dcaa29207dda54d47887f763
# # Output Nodes: 2
# Run with docs 2 (already transformed) and 3 (new)
_ = pipeline.run(documents=list(docs[1:]))
# A new cache entry is added for this combination, even though doc 2 has already been transformed.
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
print(f"Hash: {k}")
print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6
# Hash: d790f504a8904444c4bd4a10c21cabf367d6b8f4dcaa29207dda54d47887f763
# # Output Nodes: 2
# Hash: 4c989444d6af891d03550461b21741eb80215ba4e94d80834bb6a0388d75eaa9
# # Output Nodes: 4
Value of Feature
I would like to use IngestionPipeline as part of a data preparation step in an ML pipeline. The inputs are Documents, and outputs are embeddings that are passed to an ML model. There is no need for storage or retrieval. For example:
In cases where incoming data represents a full snapshot (e.g. daily snaphots of Documents in a production system), the set of Documents comprises:
New documents
Updated documents
Unchanged documents
It would be more efficient if the transformations can be run only on (1) and (2), and retrieve the cached results for (3), especially if (3) represents a large proportion of the batch of data.
Currently, the cache works on the whole list of Documents passed to IngestionPIpeline.run(), so even if only 1 Document has changed, all the other Documents are processed again.
Furthermore, the cache will become bloated over time, as each batch of data is likely to be unique (albeit with many overlapping Documents). It is extremely unlikely that the exact same list of Documents is encountered again, which diminishes the utility of the cache.
Feature Description
In
IngestionPipeline
, cache the transformations on individual nodes, so that the same node+transformation can be retrieved from the cache, instead of requiring the whole list of nodes to be the same.Reason
The hash used to key the cache is based on the whole list of Nodes (e.g. Documents), instead of individual Nodes. Even if only a single Node has changed, the transformations are executed on the whole list again, since the hash is different.
https://github.com/run-llama/llama_index/blob/535d0a4655df8de89501e2aa60a0afa61571ba10/llama-index-core/llama_index/core/ingestion/pipeline.py#L55-L102
Code example:
Value of Feature
I would like to use
IngestionPipeline
as part of a data preparation step in an ML pipeline. The inputs are Documents, and outputs are embeddings that are passed to an ML model. There is no need for storage or retrieval. For example:In cases where incoming data represents a full snapshot (e.g. daily snaphots of Documents in a production system), the set of Documents comprises:
It would be more efficient if the transformations can be run only on (1) and (2), and retrieve the cached results for (3), especially if (3) represents a large proportion of the batch of data.
Currently, the cache works on the whole list of Documents passed to
IngestionPIpeline.run()
, so even if only 1 Document has changed, all the other Documents are processed again.Furthermore, the cache will become bloated over time, as each batch of data is likely to be unique (albeit with many overlapping Documents). It is extremely unlikely that the exact same list of Documents is encountered again, which diminishes the utility of the cache.