ml6team / fondant

Production-ready data processing made easy and shareable
https://fondant.ai/en/stable/
Apache License 2.0
339 stars 26 forks source link

Redesign dataset format and interface #567

Closed RobbeSneyders closed 9 months ago

RobbeSneyders commented 11 months ago

We should revisit the design of our subsets & fields:

RobbeSneyders commented 10 months ago

Proposed design

The component spec

The consumes and produces section will be flattened, to only contain fields:

**old** **new**
```yaml consumes: images: fields: data: type: binary produces: captions: fields: text: type: utf8 ``` ```yaml consumes: image_data: type: binary produces: captions: type: utf8 ```

Data storage

The produced output by each component is written as a new parquet dataset.

**/Component 1** **/Component 2** **/Component 3** **/Component 4**
``` - index - field 1 - field 2 - field 3 - - ``` ``` - index - - - - field 4 - field 5 ``` ``` - index - - - field 3 - - ``` ``` - index - - - - - ```

The complete dataset after the third component can be found by looking from the right (format parquet-dataset:field):

- /component4:index
- /component1:field1
- /component1:field2
- /component3:field3
- /component2:field4
- /component2:field5

The manifest

The manifest provides this view at each point in the pipeline. After step 4, it will represent the view above:

{
  "metadata": {
    "base_path": "gs://bucket",
    "pipeline_name": "test_pipeline",
    "run_id": "test_pipeline_12345",
    "component_id": "component4"
  },
  "index": {
    "location": "/component4"
  },
  "fields": {
    "field1": {
      "location": "/component1",
      "type": "..."
    },
    "field2": {
      "location": "/component1",
      "type": "..."
    }
    "field3": {
      "location": "/component3",
      "type": "..."
    }
    "field4": {
      "location": "/component2",
      "type": "..."
    }
    "field5": {
      "location": "/component2",
      "type": "..."
    }
  }
}

Additional fields

We still need a mechanism to remove additional fields from the output dataset of a component if it changes the index (eg. LaionRetrieval components which go from a prompt id to a Laion id).

For example, if component 3 above would define additionalFields: false, its component spec would look like this:

produces:
  field3:
    type: ...
additionalFields: false

The data storage would still look exactly the same as above, but now the manifest only looks back until the output of component3:

{
  "metadata": {
    ...
  },
  "index": {
    "location": "/component4"
  },
  "fields": {
    "field3": {
      "location": "/component3",
      "type": "..."
    }
  }
}

User interface

We still present all the data in a manifest as a single dataframe to the user. Both the Dask and Pandas dataframes will have the same flat columns.

| index | field1 | field2 | field3 | field4 ]
| ----- | ------ | ------ | ------ | ------ |
| id    |  data  |  data  |  data  |  data  |

We can create this dataframe by creating a reverse mapping of the locations and fields in the manifest:

{
    "/component4": ["index"],
    "/component3": ["field3"],
    "/component2": ["field4", "field5"],
    "/component1": ["field1", "field2"],
}

And reading the fields from each location and merging them together.

dataframe = dd.empty()
for location, fields in field_mapping.items():
    partial_df = dd.read_parquet(location, columns=fields)
    dataframe = dd.merge(dataframe, partial_df, how="left")
RobbeSneyders commented 10 months ago

As an example, let's see what the component specs, manifests, and data storage would look like for the following pipeline based on our ControlNet example:

  1. Prompt generation
  2. Laion retrieval
  3. Download images
  4. Filter resolution
  5. Crop images

1. Prompt generation

fondant_component.yaml

produces:
  text:
    type: string

data storage

| /generate_prompts
| - index
| - text

manifest

{
  "index": {
    "location": "/generate_prompts"
  },
  "fields": {
    "text": {
      "location": "/generate_prompts",
      "type": "string"
    }
  }
}

2. Laion Rerieval

fondant_component.yaml

consumes:
  text:
    type: string

produces:
  image_urls:
    type: string

additionalFields: false

data storage

| /generate_prompts | /laion_retrieval |
| - index           | - index          |
| - text            |                  |
|                   | - image_urls     |

manifest

{
  "index": {
    "location": "/laion_retrieval"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    }
  }
}

3. Download images

fondant_component.yaml

consumes:
  image_urls:
    type: string

produces:
  image:
    type: bytes
  width:
    type: int32
  height:
    type: int32

data storage

| /generate_prompts | /laion_retrieval | /download_images
| - index           | - index          | - index
| - text            |                  |
|                   | - image_urls     |
|                   |                  | - image
|                   |                  | - width
|                   |                  | - height

manifest

{
  "index": {
    "location": "/download_images"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/download_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

4. Filter resolution

fondant_component.yaml

consumes:
  width:
    type: int32
  height:
    type: int32

data storage

| /generate_prompts | /laion_retrieval | /download_images | /filter_resolution
| - index           | - index          | - index          | - index
| - text            |                  |                  |
|                   | - image_urls     |                  |
|                   |                  | - image          |
|                   |                  | - width          |
|                   |                  | - height         |

manifest

{
  "index": {
    "location": "/filter_resolution"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/download_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

5. Crop images

fondant_component.yaml

consumes:
  image:
    type: bytes

produces:
  image:
    type: bytes

data storage

| /generate_prompts | /laion_retrieval | /download_images | /filter_resolution | /crop_images
| - index           | - index          | - index          | - index            |
| - text            |                  |                  |                    |
|                   | - image_urls     |                  |                    |
|                   |                  | - image          |                    | - image
|                   |                  | - width          |                    |
|                   |                  | - height         |                    |

manifest

{
  "index": {
    "location": "/filter_resolution"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/crop_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

This is quite a logical and simple flow I believe. The last step shows the only issue I see with this approach: the data is cropped and overwritten, but the width and height still contain the old values. We had the same issue with the previous approach though (although additionalSubsets could alleviate it partially), so I still think this is a step forward. This will be something for the user to validate when chaining components.

RobbeSneyders commented 10 months ago
mrchtr commented 10 months ago

I think this approach tackles a lot of the recent feedback as you have mentioned. A few questions from my side to clarify my understanding.

Isn't it more an issue related to the component design (not updating the width and height after image cropping) instead of an issue of your approach?

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema. Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on.

Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

PhilippeMoussalli commented 10 months ago

Thanks Robbe! Looks really promising so far,

Few questions:

RobbeSneyders commented 10 months ago
  • Due to the changes regarding the data storage we will start writing parquet files containing all columns that are produced by the component or do we keep the column wise storage?

I'm not completely sure what you mean. We indeed write all columns that are produced by a component to a single parquet dataset. Parquet is a columnar storage format, so the data is still stored column-wise.

  • You have mentioned an issue in the ControlNet pipeline:

The last step shows the only issue I see with this approach: the data is cropped and overwritten, but the width and height still contain the old values.

Isn't it more an issue related to the component design (not updating the width and height after image cropping) instead of an issue of your approach?

Partially, yes. However, it might be difficult for a component to know up front all possible fields that might be related. For instance, there might have been a resolution field already in the data instead of height and width fields.

I think the user will have to handle this. We might want to provide a mechanism for the user to invalidate certain columns on the pipeline level.

  • We can't find a way to map subsets between components that sounds like a good idea.

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema. Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on.

Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

So you mean that the user should explicitly map the fields to a component every time? I think that could make sense if we can keep the interface simple enough. I'm open to proposals on this front, I think it's the main open step before we can decide to move forward with this.


  • Do we expect an increase in overhead if the user wants to consume multiple columns because of the multiple merge operations?

Not sure. We already merge the index and different subsets now, and I haven't really noticed an impact from this. We will be merging more in this new proposal, but both sides should always contain indexes, and maybe even be sorted, so the impact might be limited.

It would probably be good to test this before implementation if we choose to go this way.

  • Jan already mentioned this: regarding dropping fields when additional fields is set to False. I understand the reasoning behind it but I still feel like it's something that might not be very intuitive for end users. I agree with the mechanism but wondering if we could somehow automatically detect it without having to explicitly specifying it in the component spec?

I don't think we can detect his automatically, since this is about the semantic meaning of the index. I believe a component that changes the semantic meaning of the index needs to be able to mark this in its component spec, since it will invalidate all older data with 100% certainty. I'm open to other ways of marking this though.

mrchtr commented 10 months ago

I'm not completely sure what you mean. We indeed write all columns that are produced by a component to a single parquet dataset. Parquet is a columnar storage format, so the data is still stored column-wise.

Actually, the resulting file structure looks like this:

├── component_1/
│   ├── index/
│   │   ├── part.0.parquet
│   │   ├── part.1.parquet
│   │   └── ...
│   └── text/
│       ├── part.0.parquet
│       ├── part.1.parquet
│       └── ...
└── component_2/
    ├── index/
    │   ├── part.0.parquet
    │   ├── part.1.parquet
    │   └── ...
    └── image_urls/
        ├── part.0.parquet
        ├── part.1.parquet
        └── ...

I thought we are achieving this by using our custom write to parquet approach.

... 
write_tasks = [
    dd.to_parquet(index_df, "/component_1/index", compute=False),
    dd.to_parquet(url_df, "/component_2/text", compute=False)
]

dd.compute(*write_tasks, ...)

I was mainly wondering if your proposed approach has an effect on this. Essentially, should we continue with our approach or combine this into a single write task, which would result in something like this:

├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...
RobbeSneyders commented 10 months ago
├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...

This is indeed what I propose, and what I meant with "a single parquet dataset". I don't see any reason to split them, since we can select which columns to read from a parquet dataset.

PhilippeMoussalli commented 10 months ago
├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...

This is indeed what I propose, and what I meant with "a single parquet dataset". I don't see any reason to split them, since we can select which columns to read from a parquet dataset.

Oh I initially though that we were planning on storing them separately (individual folder per column). I think this is better especially for the merging since there might be a higher probability that some of the columns to consume might be in the same "single parquet dataset"

GeorgesLorre commented 10 months ago

Interesting, I like it so far.

Do we always need to write the Index in every component? Or only the ones that modify the number of rows (filter/expand)

RobbeSneyders commented 10 months ago

We need to write it in every component, but we are no longer writing it separately every time. If a component only updates the index, only the index will be written. Otherwise, it will just be written as part of the data, which is necessary anyway.

This means that we write the index less often, while we still keep the advantage of only updating the index for filtering components.

So ml6team/fondant#70 would be fixed automatically by adopting this approach.

PhilippeMoussalli commented 10 months ago
  • We can't find a way to map subsets between components that sounds like a good idea.

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema.

I am not sure if there are is a better alternative then just adding a mapping directly to the componentOp

So let's say the user starts with this pipeline in place

load_data (generic)
     produces:
    - image_array 
    - text_data

embed_text (custom)
    consumes: 
      - text_data
    produces:
      - image_embedding

and then later on the user decides to add a resuable component to caption text because they loaded other images that don't have captions:

load_data (generic)
produces
  - image_array 

caption_data (reusable)
    consumes: 
      - images_data
    produces:
      - image_captions

embed_text (custom)
    consumes: 
      - text_data
    produces:
      - image_embedding

What we can do is add a mapper

mapper = FieldMapper()
mapper.add_input_mapping(from_input_dataset_field=image_array, to_consumed_component_field = images_data)
mapper.add_output_mapping(from_produced_component_field=image_captions, to_output_dataset_field = text_data)

caption_op = ComponentOp(dir=..., mapper=mapper)

or alternatively

caption_op = ComponentOp(dir=..., mapper=mapper)
    .map_input(from_input_dataset_field=image_array, to_consumed_component_field = images_data)
    .map_output(from_produced_component_field=image_captions, to_output_dataset_field = text_data)

It might not seem like the smoothest experience but I think what we can do is make sure to provide clear instructions in the docs and error catching during static pipeline validation that could then point the user to the correct action to take (depending on input or output mismatch).

We can also always plot the pipeline manifest evolution and highlight in red the source of the mismatch. Open to hearing other suggestions.

image

Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on. Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

Do you mean to say that it should me mandatory for every component? This might force us to define it everywhere at every op even when it's not needed (component spec fields allign) and might have some overlap with the information in the component spec.

Or it might mean that we would get rid of the consumes produces field altogether from the component spec but I am not sure if that's a direction we want to take since it will be difficult then to understand what that component operates on unless we can somehow offer that functionality as part of a function with some defaults in it's consume, produce arguments (again not sure if that's possible)

RobbeSneyders commented 10 months ago

I can think of some other approaches. These are not fully fleshed out, but just examples to indicate some other directions we could take. They might not be better :)

Chaining ComponentOps consumes directly

from fondant.pipeline import ComponentOp

load_op = ComponentOp(
    component_dir="load_data",
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    consumes={
        "images_data": load_op.image_array
    }
)

embed_op = ComponentOp(
    component_dir="embed_text",
    consumes={
        "text_data": caption_op.image_captions
    }
)

We should be able to calculate the output manifest of each component as we go, so each ComponentOp provides attributes for all the fields that will be available, and the IDE can recognize this.

You should only ever need to access the previous ComponentOp, but I can see how this can be confusing:

crop_op = ComponentOp.from_registry(
    name="crop_images",
    consumes={
        "images_data": embed_op.image_array,  # Correct
        "images_data": load_op.image_array,  # Incorrect
    }
)

Chaining consumes directly in pipeline

This can probably be solved by moving the explicit consumption chaining to the pipeline

from fondant.pipeline import ComponentOp

load_op = ComponentOp(
    component_dir="load_data",
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
)

embed_op = ComponentOp(
    component_dir="embed_text",
)

crop_op = ComponentOp.from_registry(
    name="crop_images",
)

pipeline = pipeline.add_op(load_op)
pipeline = pipeline.add_op(
    caption_op, 
    consumes={
        "images_data": pipeline.image_array
    }
)
pipeline = pipeline.add_op(
    embed_op, 
    consumes={
        "text_data": pipeline.image_captions
    }
)
pipeline = pipeline.add_op(
    crop_op, 
    consumes={
        "images_data": pipeline.image_array,
    }
)

Creating operations and chaining consumes directly in pipeline

Compressing everything into the pipeline add_op method.

pipeline = pipeline.add_op(
    component_dir="load_data"
)
pipeline = pipeline.add_op(
    name="caption_images", 
    consumes={
        "images_data": pipeline.image_array
    }
)
pipeline = pipeline.add_op(
    component_dir="embed_text", 
    consumes={
        "text_data": pipeline.image_captions
    }
)
pipeline = pipeline.add_op(
    name="crop_images", 
    consumes={
        "images_data": pipeline.image_array,
    }
)

These all only map inputs, but if mapping the inputs is mandatory, I don't think there's a lot of reason to map the outputs. Apart maybe for the ability to overwrite data produced in previous steps.

Is this closer to what you meant @mrchtr?

PhilippeMoussalli commented 10 months ago

We should be able to calculate the output manifest of each component as we go, so each ComponentOp provides attributes for all the fields that will be available, and the IDE can recognize this.

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

These all only map inputs, but if mapping the inputs is mandatory, I don't think there's a lot of reason to map the outputs. Apart maybe for the ability to overwrite data produced in previous steps.

I think this it might be needed in order to not break a whole pipeline (having to remap everywhere) if a component is added to a pipeline at a later stage as discussed here. I do agree though that it might be confusing.

The examples that you mentioned above have a nice interface and could be nice if we manage to make the dynamic attributes work. Do you see the consumes section as something that is mandatory then that has to be defined between components (even if they have matching fields by default)?

RobbeSneyders commented 10 months ago

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

The code below works, Pycharm recognizes the images_data attribute on component_op.

class ComponentOp:
    def __init__(self, produces: t.List[str]):
        for field in produces:
            setattr(self, field, field)        
component_op = ComponentOp(produces=["images_data"])

Do you see the consumes section as something that is mandatory then that has to be defined between components (even if they have matching fields by default)?

Not sure, both are technically feasible. I haven't really thought this through yet, I just wanted to showcase that there are other ways to tackle this. I would need some time to think about the different options and flesh them out completely.


I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

PhilippeMoussalli commented 10 months ago

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

The code below works, Pycharm recognizes the images_data attribute on component_op.

class ComponentOp:
    def __init__(self, produces: t.List[str]):
        for field in produces:
            setattr(self, field, field)

component_op = ComponentOp(produces=["images_data"])

For me it doesn't seem to detect it, is it part of a plugin? image

I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

I agree

mrchtr commented 10 months ago

Is this closer to what you meant @mrchtr?

It is super close to my thoughts. I like the last two approaches. But I would still try to include the produce step as well. Assuming we have a reusable text normalisation component.

...
consumes:
  text:
    type: string
produces:
   text_normalised:
     type: string

and we would use the last approach it would look like this:

pipeline = pipeline.add_op(
    name="crop_images", 
    consumes={
        "text": pipeline.custom_text_column
    },
    produces={
        "text_normalised": pipeline.custom_text_column_normalized
    }
)

We would utilise these components as operators applied to the pipeline dataframe. This approach gives us a kind of global pipeline schema to work with. I believe this would decouple the components and their transformations from a particular pipeline.

I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

I think we can indeed move this discussion to a different issue.

RobbeSneyders commented 10 months ago

For me it doesn't seem to detect it, is it part of a plugin?

Woops, I tested it in a Python console, which is of course at runtime :) So probably won't work statically indeed, but it would work in a notebook.

    produces={
        "text_normalised": pipeline.custom_text_column_normalized
    }

pipeline.custom_text_column_normalized will not exist here, right? I'm not sure we can use the same approach for produces.

mrchtr commented 10 months ago

pipeline.custom_text_column_normalized will not exist here, right? I'm not sure we can use the same approach for produces.

My idea was that the custom produces would here add the column custom_text_column_normalized to the dataset. Basically map the components produce to a custom produce. The custom_text_column_normalized could be used by the next component as well.

If we collect all the consumes and produces fields (either the default one or the custom ones on the right side) we would know the final schema of the dataset after the pipeline run successfully. The ComponentOp or Pipeline would define the dataframe schema which we could use for the validations.

When we add a consumes either to the CompnentOps or the Pipeline interface, it would be a bit confusing if we wouldn't add a produces option I guess.

RobbeSneyders commented 10 months ago

Yes, you can do that, but you cannot access it as an attribute on the pipeline. So then you should be working with string mappings again.

    produces={
        "text_normalised": "custom_text_column_normalized"
    }

And then we probably don't want to use attributes for consumes either. So the choice would be:

RobbeSneyders commented 10 months ago

If we want to go with the last proposed interface above, it might make more sense to work with datasets as the interface (it matches our design principle 😎):

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.apply(
    component_dir="load_data"
)
dataset = dataset.apply(
    name="caption_images", 
    consumes={
        "images_data": dataset.image_array
    }
)
dataset = dataset.apply(
    component_dir="embed_text", 
    consumes={
        "text_data": dataset.image_captions
    }
)
dataset = dataset.apply(
    name="crop_images", 
    consumes={
        "images_data": dataset.image_array,
    }
)

dataset.image_array makes more sense than pipeline.image_array. This would also work great if we want to offer an eager mode in the future, since you then inspect every dataset between a component.

This is the same interface that Apache Beam uses, so we can get a lot of inspiration from there.

mrchtr commented 10 months ago

The dataset interface could also solve this problem:

Yes, you can do that, but you cannot access it as an attribute on the pipeline. So then you should be working with string mappings again.

If components operates on datasets we could add a schema to the dataset. Let the user define at the beginning the schema of the dataset or even let the first load component initialise the schema.

We could apply something like this:

produces={
        "text_normalised": dataset.schema.custom_text_column_normalized
    }

If we would call it writes instead of produces it would be even more clear that we only can operate on dataset column which were already initialised.

A Pipeline would initialise a Dataset. Components would apply different operations on a Dataset.

RobbeSneyders commented 10 months ago

The last remaining part of the interface which I'd like to simplify is that of generic components. I don't like that users have to overwrite the component spec, especially since they need to repeat the arguments etc.

With the interface above (with pipelines or datasets, doesn't matter), we get a mechanism that simplifies the interface for generic write components for free:

dataset = dataset.apply(
    name="write_data", 
    consumes={
        "image": dataset.image_array,
        "embedding": dataset.embeddings,
        "caption": dataset.image_captions,
    }
)

We know the schema of every field in the dataset, so this contains all the information we get from an overwritten component specification.

In the component specification, we should indicate that the component can handle a generic input dataset.

consumes:
    "*"

If this is present, the consumes mapping accepts unknown field names, otherwise it doesn't.


Two reservations on this:

RobbeSneyders commented 10 months ago

If components operates on datasets we could add a schema to the dataset. Let the user define at the beginning the schema of the dataset or even let the first load component initialise the schema.

We could apply something like this:

produces={ "text_normalised": dataset.schema.custom_text_column_normalized }

The dataset is created by the previous component (or the pipeline at the start), so I still think we run into this issue:

Yes, you can do that, but you cannot access it as an attribute on the pipeline.

In essence, it's the same issue that we need to solve as the one I mention in my previous comment:

We don't have as nice a solution yet for generic read components

The cleanest solution I can think of, indeed creates the schema in the generic read component:

from fondant import type

pipeline = pipeline.add_op(
    name="load_data",
    produces={
        "text": type.string,
    }
)

But this should only be used for generic read components to overwrite the component_spec. It can't be used by any non-generic components.

PhilippeMoussalli commented 10 months ago

We don't have as nice a solution yet for generic read components

The cleanest solution I can think of, indeed creates the schema in the generic read component:

from fondant import type

pipeline = pipeline.add_op(
    name="load_data",
    produces={
        "text": type.string,
    }
)

But this should only be used for generic read components to overwrite the component_spec. It can't be used by any non-generic components.

I feel like we might benefit from explicitly defining I/O operators on the componentOp/Dataset class.

Maybe we can somehow tie them explicitly to specific components with the newly introduced tags or some other mechanism. Component with those tags don't need to implement a consumes or produces section so there is no need to copy over the component spec. This is feasible since we are currently the only ones developing read/write component. We can later have some guides on how to develop I/O components similar to Beam.

I also sometimes find the fact that a load component "produces" and a write component "consumes" to be a bit unintuitive.

from fondant.schema import type

dataset = Dataset()
dataset = dataset.read_data(
     name="load_data",
     input_dataset_schema={  # clearly mention that only a schema is needed t.Dict[str, type]
           "text": type.string
      }
 )  

dataset = dataset.transform(    #  consumes (and maybe produces)
    name="process_text", 
    consumes={
        "text": dataset.text
    }
)    

dataset = dataset.write_dataset(  # columns_to_write is "consumes" behind the scenes but it makes it more explicit
    name="write_data", 
    columns_to_write={
        "caption": dataset.text,
    }
)
RobbeSneyders commented 10 months ago

Yes, @mrchtr and I had a call and concluded the same thing. I will try to summarize our conclusions here. Let me know if I missed anything @mrchtr.


We create a pipeline and use a read component to create the first dataset. The user needs to provide a schema for the reader, which replaces overwriting the component specification.

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    name="load_images",
    schema={
        "image": type.binary  # or pa.binary()
    }
)

For the schema, we need to decide if we want to roll our own types, or if we want to use the ones from PyArrow. If the PyArrow types suffice, I would go with those myself.

From then on, transform components can be applied to the dataset. They can provide a mapping for both consumes and produces. Matthias made the case that you might want to rename produced fields to prevent conflicts (eg. cropping the same image multiple times), which convinced me that we indeed need it. The mapping is optional, if it's not provided, we still look for a field with the original name from the specification.

dataset = dataset.apply(
    name="caption_images", 
    consumes={
        "images_data": dataset.image  # or "image"
    }
    produces={
        "captions": "text"
    }

Note that the consumes mapping maps from a string to a dataset attribute, while the produces mapping maps from a string to a string. While I do see some advantages in using dataset attributes, I also see value in aligning both and making consumes a string to string mapping as well. An additional benefit of this is that you could chain apply() calls since you don't require the intermediate dataset to be defined. A third possibility would be to accept both a dataset attribute or a string, but this might just be confusing.

Write components also accept a schema. We can make the same choice here between a string to dataset attribute mapping or a string to string mapping. Linking to a dataset attribute here might make more sense, since we need to know both the name of the columns to map and their schema. However there is no reason we can't just look this up on the dataset using the string value.

dataset.write(
    name="write_data", 
    schema={
        "image": dataset.image,  # or "image"
        "caption": dataset.text,  # or "text"
    }
)
RobbeSneyders commented 10 months ago

Just as validation that we don't block ourselves, a small thought experiment on how branching would work.

Branching into multiple branches would be straightforward:

branch1 = dataset.apply(...)
branch2 = dataset.apply(...)

We can choose which interface to use for merging. Eg.:

dataset = branch1.merge(branch2)
dataset = Dataset.from(branch1, branch2)

Most important here will be to select a or support multiple merge strategies, but I can't think of anything here that would complicate this compared to our current design.

A pipeline reading from multiple sources can be created by adding multiple readers to the pipeline:

branch1 = pipeline.read(...)
branch2 = pipeline.read(...)

Again, we can get a lot of ideas from Beam here.

RobbeSneyders commented 10 months ago

Coming back to this part of my proposal above:

Additional fields

We still need a mechanism to remove additional fields from the output dataset of a component if it changes the index (eg. LaionRetrieval components which go from a prompt id to a Laion id).

For example, if component 3 above would define additionalFields: false, its component spec would look like this:

produces:
  field3:
    type: ...
additionalFields: false

I would actually propose a different interface here. Instead of additionalFields: false, I propose to add something like previous_index: <name>.

For example in the laion_retrieval example from above, this could be:

consumes:
  text:
    type: string

produces:
  image_urls:
    type: string

previous_index: prompt_id

In a chunking component, this could be previous_index: document_id.

This has two advantages:

PhilippeMoussalli commented 10 months ago

For the schema, we need to decide if we want to roll our own types, or if we want to use the ones from PyArrow. If the PyArrow types suffice, I would go with those myself.

I this this makes sense, only reason we have a type class it to be able to write it to json schema and retrieve it from it.

Note that the consumes mapping maps from a string to a dataset attribute, while the produces mapping maps from a string to a string. While I do see some advantages in using dataset attributes, I also see value in aligning both and making consumes a string to string mapping as well. An additional benefit of this is that you could chain apply() calls since you don't require the intermediate dataset to be defined. A third possibility would be to accept both a dataset attribute or a string, but this might just be confusing.

I'm in favor of aligning both to text, am I correct in assuming that the attribute only makes sense when you're executing cell by cell? otherwise it might return an unrecognized attribute in your IDE.

Write components also accept a schema. We can make the same choice here between a string to dataset attribute mapping or a string to string mapping. Linking to a dataset attribute here might make more sense, since we need to know both the name of the columns to map and their schema. However there is no reason we can't just look this up on the dataset using the string value.

Would keep it to string to string just for consistency. I think we can make this optional: if not specified it will write all the fields under their original name. The schema here is really just to remap the name of the columns if needed and select the final columns to be written.

PhilippeMoussalli commented 10 months ago
  • additionalProperties

In a chunking component, this could be previous_index: document_id.

In the document_id example, we reset the index to a specific columns called document_id so this approach most likely will expect the user to return the original index as an additional column. We can either handle this internally or make sure to document it well

RobbeSneyders commented 10 months ago

I'm in favor of aligning both to text, am I correct in assuming that the attribute only makes sense when you're executing cell by cell? otherwise it might return an unrecognized attribute in your IDE.

Yes it indeed only really makes sense in an interactive mode. We could still support both if we want to enable this while running interactively.

Would keep it to string to string just for consistency. I think we can make this optional: if not specified it will write all the fields under their original name. The schema here is really just to remap the name of the columns if needed and select the final columns to be written.

Ok, but just to be clear, then the schema attribute is optional. If you do provide a schema, you need to provide all fields. Otherwise there's no way to exclude fields to write.

In the document_id example, we reset the index to a specific columns called document_id so this approach most likely will expect the user to return the original index as an additional column. We can either handle this internally or make sure to document it well

Yes indeed, the component should return it as a column. We can easily validate that it is part of the produced schema.

mrchtr commented 10 months ago

@RobbeSneyders has nailed down our discussion perfectly. In the meanwhile I agree with @PhilippeMoussalli and believe it would be nice to have the same interface for the consumes and produces section.

I think it is fine using string value for both sections. As mentioned, we would lose the benefit of type safeties. But I think it is negligible, cause we can validate at latest during the compilation.

I wanted to note down an additional thought. I've mentioned a concern during @PhilippeMoussalli implementation of the subset field mapping. We have to make sure that it isn't confusing for the user which site of the dictionary entry represents the schema of the dataset and which one the schema of the component dataframe. Back then I've proposed to have a dataclass that allows us to use explicit names.

dataset = dataset.apply(
    name="caption_images", 
    consumes=[
        ColumnMapping(component_column="images_data",  dataset_column="image"),
        ...
    ]
    produces=[
        ColumnMapping(component_column="captions",  dataset_column="text")
    ]
)

This only has an effect on the interface but might make it clear if you are looking the first time on a pipeline/component definition. I didn't found it straight forward on the initial draft to see which column names belongs to which dataframe.

If we use dataclasses, I can even think about a different interface:

dataset = dataset.apply(
    name="caption_images", 
    schema_mapping=[
        Consumes(component_column="images_data",  dataset_column="image"),
        Consumes(component_column="...",  dataset_column="..."),
        Produces(component_column="captions",  dataset_column="text")
        Index(previous_index="...")
    ]
)

Not sure about the dataclasses myself anymore, it either complicates things or improves the understanding.

RobbeSneyders commented 10 months ago

I'm not a big fan of the dataclasses, it makes the interface look a lot more complex. Agree that the direction of the mapping can be confusing, but we can validate this at compilation and provide clear error messages.

RobbeSneyders commented 10 months ago

Working on the RAG use case, it's become clear that we need to support "generic" behavior not just for read and write components, but also for transform components. Think about the following components:

I therefore propose to provide an option for the user to overwrite consumes and produces on any type of component. I would keep the consumes and produces names for all component types so they are aligned everywhere.

pipeline = Pipeline("my-pipe", base_path="/base_path")

dataset = pipeline.read(
    "read_component",
    arguments={},
    produces={                    # A read component can only overwrite produces.
        "image": pa.binary()
    }
)

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={                    # A transform component can overwrite consumes
        "image": pa.binary()
    },
    produces={                    # And produces
        "caption": pa.string()
    }
)

dataset.write(
    "write_component",
    consumes={                    # A write component can only overwrite consumes
        "text": pa.string(),
    }
)

Generic components could define a schema for the consumes and produces they support. Eg. for the read_from_hf_hub component, this could be:

produces:
    additionalProperties: true

Or for a generic captioning component:

consumes:
    additionalProperties:
        type: binary

produces:
    additionalProperties:
        type: string

This way we could validate the consumes and produces arguments. We can start without this for now though.


Then of course we are still left with the question on how to map fields between components.

We could overload the consumes and produces arguments for this:

pipeline = Pipeline("my-pipe", base_path="/base_path")

dataset = pipeline.read(
    "read_component",
    arguments={},
    produces={
        "image": pa.binary(),     # A read component can only specify types in produces since we need the schema info.
    }
)

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={
        "image": "image",         # A generic transform component should specify a string. The type is inferred from the column on the dataset and the column is renamed.
    },
    produces={
        "caption": pa.string(),   # A generic transform component can only specify types in produces since we need the schema info.
    }
)

dataset = dataset.apply(
    "text_beautifier",
    "consumes={
        "text": "caption",        # A non-generic transform component can specify strings in consumes  to rename columns.
    },
    produces={
        "text": "caption",        # A non-generic transform component can specify a string in produces to rename columns
    }
)

dataset.write(
    "write_component",
    consumes={
        "text": "text",           # A write component should specify a string. The type is inferred from the column on the dataset and the column is renamed.
    }
)

The rules can be summarized succinctly as follows, so I think (hope) this won't be too complex:

I thought about other ways to do this. Eg. by using a .rename() method on the Dataset. But that would only allow for input mapping, not output mapping.

PhilippeMoussalli commented 10 months ago

Working on the RAG use case, it's become clear that we need to support "generic" behavior not just for read and write components, but also for transform components. Think about the following components:

  • A RAG retrieval component which optionally takes embeddings to query

I feel like this information would still be better placed in the component spec rather than overwriting it at the dataset level since it better describes what are the possible fields that this component can take either text or text_embeddings. It would be difficult to understand that behavior if the only thing you had in the component spec was

consumes:
    additionalProperties:
        type: binary

Instead it would make more sense if it looks something like this (not sure about the final format yet)

consumes:
    anyOf:
        - text:
            type: str
        - text_embedding:
            type: binary

The component would need to have something like this

class Retrieval(PandasTransformComponent):

    def __init__(self, *args) -> None:

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
             if text in dataframe.columns:
                   embeddings = dataframe[text].apply(get_embeddings)
             else: 
                  embeddings = dataframe[text_emeddings]
  • An evaluation component which optionally takes ground truth answers to return additional metrics

Same comment as above except for a slightly different component spec

consumes:
      - answers:
          type: str
          optional: true

produces:
     - additional_metric:
        type:int 
        optional: true
        depends_on: answers
  • An aggregation component which can aggregate an arbitrary number of fields

There it makes sense to maybe include

 consumes:
     additionalProperties: true

provided that all the consumed columns will be aggregated.

I therefore propose to provide an option for the user to overwrite consumes and produces on any type of component. I would keep the consumes and produces names for all component types so they are aligned everywhere.

It seems like this would mean that we're defining some of the behavior of the components purely on the the how the dataset interface is defined rather than the component spec itself. The new fields that you're defining here would still need to be defined in the component itself and some of the behavior is not captured here (e.g. additional produced column depending on optional input column, one of two columns that need to be provided) which can make it difficult to validate.

I still think it might make more sense to override the spec only for read/write components and some exceptional transform components (aggregation). The rest of the behavior and cases should be outlined in the spec. For the generic transform component, we could maybe have a method apply_generic_transformation to overwrite it's produces field.

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={                    # A transform component can overwrite consumes
        "image": pa.binary()
    },
    produces={                    # And produces
        "caption": pa.string()
    }
)

The new overriden image field here would still need to be defined somehow in the component

Then of course we are still left with the question on how to map fields between components.

The rules can be summarized succinctly as follows, so I think (hope) this won't be too complex:

  • consumes mappings are always string to string mappings. For generic components, the schema is inferred from the dataset via the column name.
  • produces mappings are

    • string to string mappings for non-generic components to rename the columns
    • string to type mappings for generic components to define the schema

I generally agree with the behavior but I think it might be a bit difficult for end users. If we have a separate method for the generic components as I mentioned above we could provide better information for this via docstrings. We could define how to use the component with which method in the hub:

All in all I think having additional columns and fields is desirable but it does indeed come with additional complexity (component spec definition, additional considerations in the pipeline interface, ...). But I think this should be more of advanced use cases rather than the common definition of a component.

RobbeSneyders commented 10 months ago

On how to define generic stuff in the component spec, I agree that the ways you suggest might be better. But I think we can summarize it as: "Will be able to be defined by OpenAPI spec", and keep it out of scope for now.

It seems like this would mean that we're defining some of the behavior of the components purely on the the how the dataset interface is defined rather than the component spec itself. The new fields that you're defining here would still need to be defined in the component itself and some of the behavior is not captured here (e.g. additional produced column depending on optional input column, one of two columns that need to be provided) which can make it difficult to validate.

This is exactly why we need to define this on the dataset. If we make the component spec generic, the arguments on the dataset need to make it specific. So we know exactly which columns will be part of the intermediate dataset at each point. Otherwise we cannot evolve our manifest.

I still think it might make more sense to override the spec only for read/write components and some exceptional transform components (aggregation). The rest of the behavior and cases should be outlined in the spec. For the generic transform component, we could maybe have a method apply_generic_transformation to overwrite it's produces field.

I wouldn't want to make generic components a special type. Components could be a combination of specific & generic. Eg. a specific consumes, but generic produces based on provided arguments. Or produces that has some specific fields and some generic.

All in all I think having additional columns and fields is desirable but it does indeed come with additional complexity (component spec definition, additional considerations in the pipeline interface, ...). But I think this should be more of advanced use cases rather than the common definition of a component.

I'm not sure this will only be for advanced use cases. As mentioned, for the RAG use case, I see a lot of components that would benefit from being (partly) generic. Probably more than components that would not benefit from it.