opensearch-project / data-prepper

Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
238 stars 176 forks source link

Support for data normalization in arrays #4291

Open guodonglai opened 3 months ago

guodonglai commented 3 months ago

Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. Ex. It would be nice to have [...]

Hi, I have a question about ingestion pipeline: is there a standard way to process arrays of data in ingestion processor that is recommended by data prepper? for example, if I have a {‘category’: [‘a’, ‘b’, ‘c’]}, how can I break elements in the array down?

Describe the solution you'd like A clear and concise description of what you want to happen.

in the example {‘category’: [‘a’, ‘b’, ‘c’]}, ideally it can be broken down like {‘category’: ‘a’,‘category’: ‘b’,‘category’: ‘c’ }?

Describe alternatives you've considered (Optional) A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

common use case is the transition from NoSql database arrays to Sql database to support table joins in data warehousing.

oeyh commented 3 months ago

A few clarification questions:

Curious to know how you use Data Prepper in this case?

oeyh commented 3 months ago

We do support converting from:

[{"category": "a"}, {"category": "b"}, {"category": "c"}]

to

{"category": ["a", "b", "c"]}

with list_to_map processor.

We currently don't support the opposite. But if that is what you are looking for, we can probably enhance map_to_list processor to do that.

guodonglai commented 3 months ago

A few clarification questions:

  • {‘category’: ‘a’, ‘category’: ‘b’, ‘category’: ‘c’ } is not a valid json object or a valid map. You cannot have multiple keys with the same name. Do you want an array like this instead?
[{"category": "a"}, {"category": "b"}, {"category": "c"}]
  • Can you expand more about the use case for my understanding?

common use case is the transition from NoSql database arrays to Sql database to support table joins in data warehousing.

Curious to know how you use Data Prepper in this case?

Yes that's correct

guodonglai commented 3 months ago

We do support converting from:

[{"category": "a"}, {"category": "b"}, {"category": "c"}]

to

{"category": ["a", "b", "c"]}

with list_to_map processor.

We currently don't support the opposite. But if that is what you are looking for, we can probably enhance map_to_list processor to do that.

Yes that's exactly what I am looking for. This use case is more for streaming NoSQL arrays into row based SQL databases for data warehousing. In the end, once we got

[{"category": "a"}, {"category": "b"}, {"category": "c"}]

we will be likely be consuming them as separate rows in a SQL database i.e,

itemId category
A.         a
A          b
A          c

Is there a way in data prepper to perform the for_each function or similar? What is your best recommendation for our use case?

oeyh commented 3 months ago

we will be likely be consuming them as separate rows in a SQL database i.e, itemId category A a A b A c

Thanks for clarifying! This make me think maybe what you need is to split the original event with data {"category": ["a", "b", "c"]} to separated events: {"category": "a"}, {"category": "b"}, and {"category": "c"}. Processors like this would likely work:

  processor:
    - add_entries:
        entries:
          - key: category
            value_expression: join(/category)
            overwrite_if_key_exists: true
    - split_event:
        field: category
        delimiter: "," 

join function will turn [a, b, c] into a string "a,b,c" and then split_event will separate them into 3 events. join and split_event will both be available in the upcoming 2.7 release.

This use case is more for streaming NoSQL arrays into row based SQL databases for data warehousing

Still wondering how you plan to use Data Prepper for it. What's the pipeline source and sink here?

guodonglai commented 3 months ago

Source will be DynamoDB, sink will be S3 bucket for datalake. What is the output of your proposed processor in the json format? 3 rows in S3 file eventually like the following?

{"category": "a"}
{"category": "b"}
{"category": "c"}

or a single row

[{"category": "a"}, {"category": "b"}, {"category": "c"}]
oeyh commented 3 months ago

What is the output of your proposed processor in the json format?

Both are possible. That will depend on the codec used on the s3 sink (ndjson vs json). See this: https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec

It's probably clearer if I include another field in the event, for example, assume the input event has this data:

{"category": ["a", "b", "c"], "itemId": "A"}

With the potential map_to_list method, you will get one output event:

{"category_list": [{"category": "a"}, {"category": "b"}, {"category": "c"}], "itemId": "A"}

With the split_event method, you will get three events instead:

{"category": "a", "itemId": "A"}
{"category": "b", "itemId": "A"}
{"category": "c", "itemId": "A"}

The latter seems to make more sense in your case. What do you think?

guodonglai commented 3 months ago

Okay, split_event seems more applicable to my use case, in your example {"category": "a", "itemId": "A"} {"category": "b", "itemId": "A"} {"category": "c", "itemId": "A"}, is it with ndjson or json?

Will split_event and map_to_list all be released in upcoming 2.7? When will it be? I am more than happy to try them out.

oeyh commented 3 months ago

In that form it's ndjson. 2.7 release will be coming in 1-2 weeks.

The map_to_list method I mentioned will require code changes, no ETA for that.

guodonglai commented 3 months ago

Cool thanks, will give split_event in version 2.7 a try and come back with result. Do you happen to know when this change will be adopted in AWS open search ingestion?