kamu-data / kamu-cli

Next-generation decentralized data lakehouse and a multi-party stream processing network
https://kamu.dev
Other
302 stars 13 forks source link

feature request: add ability to ingest from S3 bucket #912

Open mingfang opened 5 days ago

mingfang commented 5 days ago

We need to ingest many objects(billions) from a S3 bucket. Due to the large number of objects, just getting the object listing will take a very long time.

The ingestion should be similar to FilesGlob.

sergiimk commented 4 days ago

FilesGlob could definitely be extended to support listing of S3 objects, but can you clarify the scale of your problem? Are we talking about potentially billions of Parquet or JSON files that you want to read and ingest via kamu?

FilesGlob has some tricky login to sort files by timestamps extracted from file names to:

Thus current logic will attempt to list all entries with every ingest iteration. So if you really have billions of object, I'm afraid even if FilesGlob supported S3 - the listing would take a very long time and cause issues.

I assume your objects aren't just sitting under one key prefix, but partitioned somehow. Thus you likely can express a listing logic far more efficiently than what generic FilesGlob implementation would do.

I therefore see three options:

mingfang commented 4 days ago

One example of very large S3 bucket is to process AWS Cloudtrail logs In general, new data is written to S3 using monotonically increasing object keys, e.g. timestamp. We process batches of objects and have to key track of the last processed key.

I'm looking at using Container but I think it needs to way to keep track of state. Maybe we can have some similar to ODF_NEW_ETAG_PATH, say ODF_NEW_HANDBACK_DATA_PATH for container to write arbitrary state data. Then Kamu can handback the content of that file to the container on next poll as env ODF_NEW_HANDBACK_DATA.

sergiimk commented 3 days ago

For Container ingest the state management is very simple. If your data objects are keyed by timestamp you can use ODF_LAST_MODIFIED / ODF_NEW_LAST_MODIFIED to store last processed timestamp in dataset state and resume from it.

Using ETAG similarly allows you to store any kind of state data - it's a fully opaque object. For example this image is using ETAG to store the sequence number of the last processed blockchain block.

Container pipes the raw data via stdout for kamu to read - this may be not very efficient in your case where data is already in S3 and in some structured format - it would be better to read it directly. A variation of Container that allows the script to return URLs to data is what I meant earlier by ListingSource. I guess something like ODF_NEW_HANDBACK_DATA_PATH would be one way to implement it.