Closed jamesbraza closed 1 month ago
I hope it's ok me stopping by, I just wanted to do a quick check on the merging_threads addition.
hopefully this doesn't hurt performance
In your code, it will definitely hurt async concurrency, but probably not data throughput for indexing. I don't know whether your application is sensitive to the concurrency impact or not. Handwaving some numbers, if your application usually serves say 10k concurrent connections, a little blocking code like the writer interactions you have could drop that to, say 1k if it was in a hot path. It probably isn't, but using these made up numbers that's the kind of impact blocking code has inside async event loops. (OTOH if you have very few concurrent IO-bound tasks/users then it probably doesn't matter)
wait_merging_threads
makes it worse, but you already are doing blocking operations with the writer so this is an existing issue.
async def _add_document() -> None:
if not await self.filecheck(index_doc["file_location"], index_doc["body"]):
try:
writer: IndexWriter = (await self.index).writer()
writer.add_document(Document.from_dict(index_doc)) # type: ignore[call-arg]
writer.commit()
writer.wait_merging_threads()
filehash = self.filehash(index_doc["body"])
(await self.index_files)[index_doc["file_location"]] = filehash
if document:
docs_index_dir = await self.docs_index_directory
async with await anyio.open_file(
docs_index_dir / f"{filehash}.{self.storage.extension()}",
"wb",
) as f:
await f.write(self.storage.write_to_string(document))
self.changed = True
except ValueError as e:
if "Failed to acquire Lockfile: LockBusy." in str(e):
raise AsyncRetryError("Failed to acquire lock.") from e
raise
The issue is that all interactions with the writer
that touch disk are blocking, and wait_merging_threads
is especially blocking. It doesn't normally take a long time, but even hundreds of milliseconds are going to tank your asyncio concurrency. To resolve this, you would have to perform the writer operations inside a thread or a subprocess. I had a quick look in the tantivy-py
source and I don't think the writer operations release the GIL, which means you'll probably need a subprocess. This can be set up with run_in_executor
.
It might look something like this (pseudocode):
def writer_stuff(index_path, index_docs):
writer: IndexWriter = Index.open(index_path).writer(heap_size=1000 * 1000 * 1000)
for index_doc in index_docs:
writer.add_document(Document.from_dict(index_doc)) # type: ignore[call-arg]
writer.commit()
writer.wait_merging_threads()
And then at the call site
async def _add():
...
index_docs = ...
exe = concurrent.futures.ProcessPoolExecutor()
loop = asyncio.get_running_loop()
await loop.run_in_executor(exe, writer_stuff, index_path, index_docs)
...
This kind of code is annoying to write, because any parameters that cross the interprocess boundary do so in a pickle and that's why I pass the index_path
instead of an index instance. It is what it is. Index may already be pickleable, I haven't checked.
(fyi @mskarlin @jamesbraza )
Hello @cjrh sorry we had some stuff that delayed me a bit on a response. Thanks for your thoughts/time here, it's much appreciated, and actually this PR turns out to be a massive win:
tantivy.Searcher.search
performance: unchangedtantivy.Searcher.search
speed: definitely faster (but didn't quantify this)So it turns out by not calling wait_merging_threads
, we were missing out on tons of compaction.
I might suggest to tantivy-py to somehow tweak the interface on IndexWriter
to make this more seamless, so other people don't miss out on this same gain.
Per your comment on multiprocessing/GIL, that is a great suggestion. I opened https://github.com/Future-House/paper-qa/issues/646 to make sure we track this possibility
good to hear 👍🏼
22 files sounds much more like what I was expecting. You should no longer be seeing the memory error.
Per advice in https://github.com/quickwit-oss/tantivy-py/issues/359#issuecomment-2430441432