rom1504 / embedding-reader

Efficiently read embedding in streaming from any filesystem
MIT License
92 stars 19 forks source link

reimplement parquet reader using ParquetFile #20

Closed rom1504 closed 2 years ago

rom1504 commented 2 years ago

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html

the whole logic of splitting in small pieces and using our own thread pool make little sense with pyarrow since it has its own pool

try not doing that and instead use directly ParquetFile

think if it still makes sense for small parquet files

rom1504 commented 2 years ago

maybe just try to use the current code but cache usage of ParquetFile and close the file when all pieces have been read

rom1504 commented 2 years ago

consider using https://arrow.apache.org/docs/python/dataset.html , it's really good

rom1504 commented 2 years ago

along with https://arrow.apache.org/docs/python/filesystems.html#filesystem-fsspec

rom1504 commented 2 years ago
import fsspec
import pyarrow.dataset as ds
from tqdm import tqdm
fs, p = fsspec.core.url_to_fs("https://mystic.the-eye.eu/public/AI/cah/laion5b/embeddings/laion1B-nolang/laion1B-nolang-metadata")
files = fs.ls(p, detail=False)
d = ds.dataset(files, filesystem=fs)
b = d.to_batches()
for _ in tqdm(b):
    pass

about 1M sample/s eg 200MB/s, saturates the external server connection

rom1504 commented 2 years ago

idea:

this should fix the parquet speed and will solve the memory issue

rom1504 commented 2 years ago

doesn't work due to no support of start/end in pyarrow.dataset

rom1504 commented 2 years ago

numpy reader doing 300MB/s from https fsspec numpy parquet is at 30MB/s ... this needs to be improved

rom1504 commented 2 years ago

maybe https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner can be used to support start stop

rom1504 commented 2 years ago

no, can't

rom1504 commented 2 years ago

ok best idea is to cache the files

rom1504 commented 2 years ago

seems a little bit better but not much

@lru_cache(maxsize=None)
def open_parquet_file(fs, filename):
    return pq.read_table(fs.open(filename, "rb"), use_threads=False)
rom1504 commented 2 years ago

actually lru cache doesn't work with multiple threads...

rom1504 commented 2 years ago
r = Semaphore(1)

d = {}
def open_parquet_file(fs, filename):
    r.acquire()
    if filename in d:
        r.release()
        return d[filename]
    print(filename)
    t = pq.read_table(fs.open(filename, "rb"), use_threads=False)
    d[filename] = t
    r.release()
    return t

makes things much much faster

rom1504 commented 2 years ago

estimated 325min total (laion1B-nolang) with use_threads=True ; probably the same with False

rom1504 commented 2 years ago

150min for numpy alone

rom1504 commented 2 years ago

1012 without this change

rom1504 commented 2 years ago

seems to have solved the memleak too

rom1504 commented 2 years ago

https://github.com/rom1504/embedding-reader/pull/21/files

faster but didn't solve memleak

rom1504 commented 2 years ago

actually looks like it did. memory usage is a bit high (15GB with these settings), but memleak seems gone

rom1504 commented 2 years ago

did a fix here