dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.28k stars 148 forks source link

Allow for "reduce" steps in LanceDB adapter #1699

Open zilto opened 4 weeks ago

zilto commented 4 weeks ago

Feature description

Allow the LanceDB and other Vector DB adapter to specify a "contextualize" or rolling window operation to join partitioned text chunks before applying the embedding function.

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

context

The constructs of @dlt.resource and @dlt.transformer are very convenient for document ingestion for NLP/LLM use cases. The @dlt.resource returns full-text and @dlt.transformer can chunk it (into paragraphs for example). The LanceDB and other vector DB adapters make it easy to embed the full-text and the chunked text columns. We get something like this:

@dlt.resource
def full_text():
  yield {"document_id": "foo", "text": ...}

@dlt.transformer(data_from=full_text)
def chunked_text(document):
  for idx, chunk in enumerate(_split_text(document)):
    yield {
      "document_id": document["document_id"],
      "chunk_id": idx,
      "text": chunk,
    }

Full-text

| document id  | text                                          |
| "foo"        | "The quick brown fox jumps over the lazy dog" |

Chunks (3 words)

| document id | chunk id | text               |
| "foo"       | 1        |  "The quick brown" |
| "foo"       | 2        |  "fox jumps over"  |
| "foo"       | 3        |  "the lazy dog"    |

limitations

However, embedding these "partitioned" chunks is often low value for RAG. A common operation is "contextualizing" chunks, which consists of a rolling window operation (with window size and stride / overlap parameters). For instance LanceDB has contextualize(), but it requires converting the data to a pandas dataframe. Let's illustrate a "2-chunk window" based on the previous table:

Contexts

| document id | chunk id | context  id | text                              |
| "foo"       | 1, 2     | 1           |  "The quick brown fox jumps over" |
| "foo"       | 2, 3     | 2           |  "fox jumps over the lazy dog"    |

AFAIK, dlt doesn't provide a clear API for normalizing the chunk_id and the context_id columns. The "contextualize" operation could be directly implemented in a single @dlt.transformer, but it would only include document_id -> context_id and miss the fact that "contextualized chunks" aren't independent; they share underlying chunks.

Proposed solution

adding a "reducer" step

I was able to hack around to receive a batch of "chunks" and use dlt.mark.with_table_name to dispatch both a "context table" and "relation table" from the same @dlt.transformer. Mock code:

def _split_text(text: str):
  words = text.split()
  for i in range(0, len(words), 3):
        yield ' '.join(words[i:i+3])

def _contextualize(chunks: list[str], window=5, stride=3, min_window_size=2):
    n_chunks = len(chunks)
    for start_i in range(0, n_chunks, stride):
        if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):
            yield " ".join(chunks[start_i : min(start_i + window, n_chunks)])

@dlt.source:
def document_source():
  @dlt.resource(primary_key="document_id")
  def document():
    yield {"document_id": "foo", "text": "The quick brown fox jumps over the lazy dog"}

  # this needs to accumulate a "batch" for the whole document before
  # starting the "reduce" / rolling operation step;
  @dlt.transformer(data_from=document, primary_key="chunk_id"):
  def chunks(item: dict):
    return [
      dict(
        document_id=item["document_id"],
        chunk_id=idx,
        text=text,
      )
      for idx, text in _split_text(item["text"])
    ]

  # order is important for reduce / rolling step
  # default to order of the batch or specifying sorting key
  @dlt.transformer(data_from=chunks, primary_key="context_id")
  def contexts(items: list[dict]):
    # first handle the m-to-n relationship
    # set of foreign keys (i.e., "chunk_id")
    chunk_id_set = set(item["chunk_id"] for item in items)
    context_id = hash_set(chunk_id_set )

    # create a table only containing the keys
    for chunk_id in chunk_id_set :
      yield dlt.mark.with_table_name(
        {"chunk_id": chunk_id, "context_id": context_id},
        "chunks_to_contexts_keys",
      ) 

    # main transformation logic
    for contextualized in _contextualize([chunk["text"] for chunk in items]):
      yield dlt.mark.with_table_name(
        {"context_id": context_id, "text": contextualized},
        "contexts"
      )

  return (document, chunks, contexts)

Contexts

|  context id | text                              |
|  hash(1, 2) |  "The quick brown fox jumps over" |
|  hash(2, 3) |  "fox jumps over the lazy dog"    |

Chunks-to-contexts keys

| chunk id | context id |
| 1        | hash(1, 2) |
| 2        | hash(1, 2) |
| 2        | hash(2, 3) |
| 3        | hash(2, 3) |

There's probably room for a generic @dlt.reducer that automatically manages the primary / foreign keys based on the other resources metadata, handles the key set hashing, and dispatches results to tables. Given that this could be a can of worm, it could be tested and refined while being hidden behind the lancedb_adapter. The API could be expanded to

lancedb_adapter(
  chunks,
  embed="text",
  window=10,
  stride=3,  # could also be renamed to overlap by changing the algo
  min_window_size=3,
)

This would reproduce the above logic by creating the chunks table as defined by the user (chunks resource) and creating the second table automatically

Related issues

No response

rudolfix commented 3 weeks ago

@zilto thanks for this idea. a few random (probably I miss some background) comments:

  1. why not to chunk with rolling window from the very start? so chunks are already "overlapping". then you do not need to merge them in transformer
  2. what is the role of "chunks", "contexts" and "chunk-to-context-keys" tables/collection when doing a RAG? are you using both chunks and contexts to identify documents via vector search?
  3. what you could try is to use dlt ability to create child tables and yield in contexts:
    for contextualized in _contextualize([chunk["text"] for chunk in items]):
      yield
        {"context_id": context_id, "text": contextualized}, chunks: [list of chunks])

    that would create contexts and contexts__chunks tables. the won't be as nice as your though (dlt would add its table linking)

btw. with @Pipboyguy we are trying to support chunking in some more or less unified way #1587

zilto commented 3 weeks ago

I agree with the motivation of the cited issue! But to add more context:

1.why not to chunk with rolling window from the very start? so chunks are already "overlapping". then you do not need to merge them in transformer

This suggests doing document -> contexts instead of documents -> chunks -> contexts. Unlike the approach I suggested, one can't know what two "contexts" have in common (e.g., what chunks they share, how many chunks they share, how far are the chunks they share (with ordered chunk ids)).

2.what is the role of "chunks", "contexts" and "chunk-to-context-keys" tables/collection when doing a RAG? are you using both chunks and contexts to identify documents via vector search?

For RAG, I intend to use "contexts" for first-pass vector search then use "context-chunk" lineage to filter out "contexts" that have "too much in common" and increase the information content of the text passed to the LLM. Over time, it's valuable to logging which "context" and underlying "chunk" are high signal for downstream uses.

More concretely, a user asks a question about dlt. You want documentation to be embedded in large "contexts" to have good recall, then the LLM should be able to extract the right info from the "context" and generate an answer. However, it's still fuzzy "what" was useful to the LLM or user. The above lineage would show that retrieving "contexts" with "chunk" dlt is an open source Python library is high signal to answer questions around the topic of pricing.

3.what you could try is to use dlt ability to create child tables and yield in contexts:

Didn't think of that! While it handles relationships, I would have duplicated "chunks" stored, no?

rudolfix commented 3 weeks ago

@zilto it seems will be picking your brain a lot :) our goal is to support chunked documents with "merge" write disposition (where only subset of documents will be updated). I'll get back to this topic tomorrow. we need to move forward...

Pipboyguy commented 3 weeks ago

@zilto Thanks for the detailed use case and explanation!

@rudolfix I think this table can be created as part of a job as well to run after main table chain just like the current orphan removal process, and have its orphans removed in a similar fashion. WDYT