argoproj-labs / old-argo-dataflow

Dataflow is a Kubernetes-native platform for executing large parallel data-processing pipelines.
Apache License 2.0
269 stars 31 forks source link

s3 sink & source #148

Closed domderen closed 3 years ago

domderen commented 3 years ago

As a user I would like to have an easy to use AWS S3 (or more generally MinIO) sink & source.

The source could work by subscribing to events on a specific bucket, and sending a message with the event whenever one gets published by S3:

sources:
- s3:
  bucket: my-s3-bucket
  eventName: ObjectCreated // Details to be defined based on this: https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html
  serviceAccountName: name-of-sa-that-can-access-s3 //optional
  accessKeySecret:
    name: name-of-s3-secret-object
    key: key-in-this-secret-containing-s3-access-key
  secretKeySecret:
    name: name-of-s3-secret-object
    key: key-in-this-secret-containing-s3-secret-key

Similarly, sink could work by uploading the message as an object to S3:

sinks:
- s3:
  bucket: my-s3-bucket
  path: path/to/message.csv
  serviceAccountName: name-of-sa-that-can-access-s3 //optional
  accessKeySecret:
    name: name-of-s3-secret-object
    key: key-in-this-secret-containing-s3-access-key
  secretKeySecret:
    name: name-of-s3-secret-object
    key: key-in-this-secret-containing-s3-secret-key
  messageTemplate: | // optional allows defining how the message should be transformed before publishing
  name; other-column; third-column
  {{message.field1}}; {{message.deeper.nested.field}}; {{message.another-field}}

Message template could be some kind of language that allows operating on an incoming message and transform it based on how user would like the final message sent to s3 look like.

alexec commented 3 years ago

Duplicate of #51. But let us keep this one.

When a new file appears, do you want the actual file contents? Or just a URL to the file?

Files can be big, so passing as a message might be a bad idea.

domderen commented 3 years ago

In an ideal scenario, I would love an option to specify what I want:

But I’m not sure how hard it would be to get the content, so getting just the path would be enough to start processing it with for example a git step.

It would be awesome if I could also start processing it with a handler step, but then I would need the ability to provide dependencies to my code, as I would need an S3 client to download the file first.

And that’s off topic from an s3 source/sink, and I can create a new issue for it, but for handler steps & git steps, what if I wanted to produce more than one message? Say I loaded a file from s3, which is a csv file, and I would like to produce single message for each line in this file. I think current interface of those steps doesn’t allow me to do that, right? Similarly, the same question for consuming multiple messages. What if I had a git step that can read up to 1k messages from Kafka and process them in parallel, and then send them out? I think current git step interface doesn’t allow me to do it, and I would need to create a custom container step to implement it, right?

alexec commented 3 years ago

The Git interface is not about sourcing messages. It is about building a step from a Git repo. If you wanted to source files from Git, I think you can just do a Git clone in your step.

Regarding the file contents, how about when we get a message, we download the file to the shared volume (e.g. /var/argo/dataflow/files/abc123) and the message the just contains the bucket etc for path to the file on that volume. This file might be a FIFO, so even large files can be processed using minimal memory, and are available to read as quick as is possible.

files:
  s3:
    bucket: my-s3-bucket
    eventName: ObjectCreated 
    // other S3 config

I'm not keen on creating a foot-gun - and putting files into the messages would do that - but I can also see a lot of value in accessing file contents.

The built-in steps (such as map) therefore could perhaps access file contents using some special functions:

map: file.matches('regexp', object(msg).filePath) 

I don't know about CSV support. Perhaps it would be up to your code to parse the file as a CSV? Maybe we would need another way? A new type of step called "split" which splits messages using an expression:

steps:
 - name: split 
    split: file.csvSplit(object(msg).filePath) 
    sources: 
    - files:
        s3: // S3 config
    sinks:
    - stan: 
        subject: csv-files

This examples splits a CSV file into multiple messages for the next step to consume.

split is the antonym of group. There's already a "group" step, and understand it's opposite helps understand what capabilities that might need (e.g. perhaps group step might create CSV files and save them to S3).

Regarding concurrent message processing, this is currently available for Kafka. You can consume up to the number of partitions available (we use 12), but no one has asked for more in parallel yet. It is not available for NATS Streaming, because no one has asked for it.

alexec commented 3 years ago

Fixed in v0.0.75