iterative / datachain

AI-data warehouse to enrich, transform and analyze data from cloud storages
https://docs.datachain.ai
Apache License 2.0
921 stars 53 forks source link

Need resolve_files() method #301

Closed volkfox closed 1 month ago

volkfox commented 2 months ago

Description

Datachain cannot enforce immutability of storage, so it helps to check if the files are still there. Currently there is no way to verify, and a chain will just crash when missing a file.

Suggestion is to introduce a method

resolve_files(signal = "file")

Which checks file object under "signal" name, and marks File object as valid or invalid.

This requires introduction of a new field of the type Boolean or Date which will mark validity as True/False or None/"last accessed" fashion.

shcheklein commented 2 months ago

@volkfox wdyt about implementing this as a regular mapper? some users might prefer to just check the existence in the UDF when actual file is passed (or does it crash before we even open it? - then it can be considered as a bug)

volkfox commented 2 months ago

May not work well from the data consistency standpoint. If the dataset is broken, we need to gracefully fail and tell the user to recheck it instead of silently skipping the gaps.

shcheklein commented 2 months ago

May not work well from the data consistency standpoint.

I'm not sure I understand this. Could you elaborate please?

What I mean is that users (from what I understand) can implement this additional signal with a map or gen. I mean this:

This requires introduction of a new field of the type Boolean or Date which will mark validity as True/False or None/"last accessed" fashion.

and then can decide on the next steps (if they want to filter those out or not).

Or do you mean that our UDFs break if there are non existent files referenced in the DB?

dmpetrov commented 2 months ago

wdyt about implementing this as a regular mapper?

Yes, it should be a separate mapper/generator because we will need multiple file operations and it explodes DC API (we already have issues with multiple json/csv that needs to be extracted).

I'd specify requirement that way:


from datachain.file import resolve_files
...
dc.from_storage("s3:/...", output_name="file").map(file2=resolve_files)

This should populate the following columns:

class File(DataModel):
    source: str = Field(default="")
    path: str
    size: int = Field(default=0)           # <--- 
    version: str = Field(default="")    # <--- 
    etag: str = Field(default="")         # <--- 
    is_latest: bool = Field(default=True)  # <--- 
    last_modified: datetime = Field(default=TIME_ZERO)  # <--- 
    location: Optional[Union[dict, list[dict]]] = Field(default=None)  # <--- 
    vtype: str = Field(default="")

In case of issues with a file, I'd avoid introducing is_valid signal at this point (it breaks APIs). Just keep the specified above signals empty. Later, we can introduce is_valid if there is a need.

volkfox commented 2 months ago

unsure what is the intention of the below example. Create yet another file record?

And if i check the file existence 5 multiple times, i will have 5 file records?

Create yet another file object, this time with

On Wed, Aug 21, 2024 at 12:59 PM Dmitry Petrov @.***> wrote:

wdyt about implementing this as a regular mapper?

Yes, it should be a separate mapper/generator because we will need multiple file operations and it explodes DC API (we already have issues with multiple json/csv that needs to be extracted).

I'd specify requirement that way:

from datachain.file import resolve_files ...dc.from_storage("s3:/...", output_name="file").map(file2=resolve_files)

This should populate the following columns:

class File(DataModel): source: str = Field(default="") path: str size: int = Field(default=0) # <--- version: str = Field(default="") # <--- etag: str = Field(default="") # <--- is_latest: bool = Field(default=True) # <--- last_modified: datetime = Field(default=TIME_ZERO) # <--- location: Optional[Union[dict, list[dict]]] = Field(default=None) # <--- vtype: str = Field(default="")

In case of issues with a file, I'd avoid introducing is_valid signal at this point (it breaks APIs). Just keep the specified above signals empty. Later, we can introduce is_valid if there is a need.

— Reply to this email directly, view it on GitHub https://github.com/iterative/datachain/issues/301#issuecomment-2302910379, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEC4S3VDQHYA6LT2E5M5OODZSTWQXAVCNFSM6AAAAABMRFSWT2VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGMBSHEYTAMZXHE . You are receiving this because you were mentioned.Message ID: @.***>

volkfox commented 2 months ago

How about this API design:

Use case 1. Verify existing file object:

mutate(file=resolve("file"))

^^ if successful, the File object will have valid size, otherwise -1

Use case 2. Create file object from a field:

map(file=resolve(uri="uri"))

the rules same as above.

shcheklein commented 2 months ago

mutate(file=resolve("file"))

it contradicts the mutate though? mutate now doesn't allow mutating an existing column (we just merged a fix for this https://github.com/iterative/datachain/issues/297 ). My 2cs - introducing special edge cases into an API like mutate complicates it and complicates the perception.

Use case 2. Create file object from a field:

seems like it's solving a different issue?


is it possible with just current set of methods and syntax to implement an additional signal valid / not, or check and populate size field in the file.size column? how would that look like?

if it's not possible to modify an existing file, can users just generate a new column, name it they way they want and use it downstream?

dmpetrov commented 2 months ago

mutate(file=resolve("file"))

Yes, but dc.map(file=resolve_files) should overwrite it (when we support overwriting).

dmpetrov commented 2 months ago

mutate now doesn't allow mutating an existing column (we just merged a fix for this #297 ). My 2cs - introducing special edge cases into an API like mutate complicates it and complicates the perception.

297 seems like a workaround to me (explicitly disable it until we can properly implement).

It would be great to enable overwriting for both mutate as well as map/gen/agg. It's kind of a syntax sugar that merges 3 commands (rename, remove, rename) into a single one.

shcheklein commented 2 months ago

It would be great to enable overwriting for both mutate as well as map/gen/agg.

sounds good.

dmpetrov commented 2 months ago

After live discussion with @volkfox

It seems we need 2 resolve functions:

First (in this issue),

from datachain.file import resolve
# Singnature: def resolve(file: File) -> File: ...

dc.map(file1=resolve)

Second (we can create a separate issue),

from datachain.file import resolve_uri
# Singnature: def resolve(uri: str) -> File: ...

dc.map(file=resolve_uri, params="link_to_file")

Both should create File record with all resolved fields such as etag

Additionally, the overwrites go to separate issues #337 & #336