vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.48k stars 1.53k forks source link

Support `parquet` columnar format in the `aws_s3` sink #1374

Open binarylogic opened 4 years ago

binarylogic commented 4 years ago

Similar to https://github.com/timberio/vector/issues/1373 we should support the Parquet format. The parquet format is a columnar format that enables faster and more efficient data access schemes such as column selection and indexing.

Implementation

Unfortunately, I do not have deep experience with this format as I do with ORC, but like everything else, we should start very simple. Fortunately, there appears to be a Rust library that supports basic writing of this data.

LucioFranco commented 4 years ago

One issue with that parquet library is that it requires nightly rust https://github.com/sunchao/parquet-rs/blob/master/src/lib.rs#L123 and https://github.com/apache/arrow/blob/master/rust/parquet/src/lib.rs#L18, we can't use these features because we are on the stable compiler.

binarylogic commented 4 years ago

Any idea what the timeline is for those to be on stable?

LucioFranco commented 4 years ago

Well really specialization is the last one on the updated apache/arrow but that feature has been in progress since 2016, so we will likely not see it any time soon. Our best bet would be to attempt to remove that feature from that library but I am not sure how hard that would be or if that is even possible.

camerondavison commented 4 years ago

I think that it still requires nightly, but just throwing into the mix https://github.com/apache/arrow/tree/master/rust/parquet

davemt commented 3 years ago

Progress in Apache Arrow project:

Alexx-G commented 3 years ago

IIRC Apache Arrow now works on stable Rust 🎉 (unless SIMD feature is used). Would it be possible to re-evaluate this one? In case the team doesn't have bandwidth, but somebody can help with review and some pointers, I can look into it.

jszwedko commented 3 years ago

Hi @Alexx-G . This hasn't been scheduled yet, but you wanted to pick it up, I'd be happy to advise/review!

Alexx-G commented 3 years ago

I've checked the parquet crate it seems to have some basic support for writing data. @binarylogic @jszwedko I see that the ORC issue has some additional context, do you have any insights/pointers for this one? Especially about schema and the changes around the current encoding logic. I'll try to run some tests with Athena+S3 and parquet to see whether I can get an MVP.

CorbyAsCode commented 2 years ago

Is there an update on this? I'm very interested in JSON input to parquet output to S3.

bhataprameya commented 2 years ago

Any update on this? I am also interested in JSON input and Parquet output to s3

jgournet commented 2 years ago

Any chance this can be looked at ? Right now, we send logs from vector to Kinesis just to get them converted to Parquet... I'd be happy to skip that costly step

spencergilbert commented 2 years ago

This codec isn't currently on our roadmap, but we could help a community PR get merged 👍

ktff commented 1 year ago

Proposal

Add parquet codec and add support for it in the aws_s3 sink.

This can be achieved with official Rust crate, through Serializer construct, and tied with custom type implementing Encoder<Vec<Event>>.

Each batch would be transformed to a single Parquet file with a single row group. With that, configuration of batch can be used to define desired size of row group/Parquet file.

Schema

Since Parquet requires schema we need to derive one.

Each passing event has it's own implicit schema and by joining them we get a unified one. This unified schema can be:

  1. Specific for each batch.
  2. Build up from batch to batch during runtime.

While 2. option can still result in exported schemas to be different from batch to batch they would have tendency to change less than with 1. option. This is relevant for streams that have events with varying schemas, while for consistent ones both options behave the same.

When joining schemas we can get a conflicting situation. When there are multiple types used for the same field/column some resolution is needed:

  1. Choose one type and drop conflicting events.
  2. Unify types into more general one. This needs to be done for primitive and logical type. For a unifying type, String looks like the best option.

In my opinion, 2. option is better. It's more reliable and we can document this behavior.

Options

Add options:

By default plain encoding and no compression are used.

Alternatives

We can expose an option for users to define their own static schema for passing events which would try to cast or filter out conflicting values.

fuchsnj commented 1 year ago

Hey @ktff. @spencergilbert and I discussed this a bit. I don't think anyone on the team is really an expert in Parquet, so we have a couple questions.

  1. Today the encoders in Vector can only operate on individual events. We have plans to eventually have a 2nd layer of encoders that can run on batches of events, but that doesn't exist today. So the main question here is, can a batch be built from already encoded events.

  2. What's the reasoning for deriving a schema over having the user provide one (and Vector could ensure it matches). Having the schema potentially change for each batch emitted seems undesirable, but again I don't really know enough about parquet to fully understand the impact here.

ktff commented 1 year ago

Hey @fuchsnj

Regarding 1. question, no, a Parquet batch can't be built from already encoded events. It's necessary to intercept them before that, or process them in a suitable way for Parquet. Fortunately there is a way to to that, by implementing https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/util/encoding.rs#L9 for Vec<Event>.

There are similar cases:

Current aws_s3 sink is using https://github.com/vectordotdev/vector/blob/018637ad93ebb74bcec597e89d33127ac83202d8/src/sinks/util/encoding.rs#L18 which would be replaced by ParquetEncoder when it's configured.


Regarding 2. question, Parquet is a file format that is usually the final/long term destination for storing data that can later be read for queries, by ,for example, Amazon Athena. Such systems will/do encounter differences in schema with time, especially if they are performing federated queries, so they usually have the means to reconcile different schemas. That said, I came to this conclusion via research so comments from those with experience would be much appreciated.

So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.

fuchsnj commented 1 year ago
  1. The impl Encoder<Vec<ProcessedEvent>> is the sink specific encoder. While you could have a setting that replaces this on the sink, this doesn't follow our current conventions, and would only work for that specific sink. What we would prefer (which isn't quite supported yet) is expanding our codecs to add parquet, which would allow this to be used in any sink. The current (single event only) codecs are selected here.

  2. So my main argument is that it's better to have the event reach it's destination and leave it to the user to transform the event into desired schema before or after the sink if they so wish/need to, then to require configuration of fixed schema that drops events which forces those that do have events with varying schema to yet again define the schema in some transformer before this sink to transform the events or to change the fixed schema.

    Our current codecs define a schema_requirement which can type check events at Vector startup to ensure events will match the expected parquet schema. This would prevent events from being dropped at runtime for not matching the schema. (Note that full schema support in Vector is relatively new, and not enabled by default yet (It's the global setting schema.enabled)).

jszwedko commented 1 year ago

For point 1, I'm in agreement with @fuchsnj . It'd be really nice if we could update the codec model to do the batch encoding in a generic way to work across all sinks using codecs. The lack of this came up recently with the avro codec not outputting the schema at the head of each batch (https://github.com/vectordotdev/vector/issues/16994) and would come up with other codecs that require headers like csv.

fuchsnj commented 1 year ago

Adding support for "batched" codecs is also discussed here: https://github.com/vectordotdev/vector/issues/16637

ktff commented 1 year ago

For point 1., while implementation of parquet would be sink specific encoder at the moment, it can be written to be generic in a sense that it can be reused in a different sink and/or in the batch encoder abstraction/feature once that comes.

Going by https://github.com/vectordotdev/vector/issues/16637 for parquet codec envelope is not enough, it's column vise format so it needs to deconstruct all events to form those columns. That is inline with https://github.com/vectordotdev/vector/issues/16637#issuecomment-1462691621 for batch sinks.

If batch encoding was already implemented that would simplify things. Going by impl diversity of RequestBuilder that's not something trivially doable. From what I see there are two parts to it. Abstraction over encoders and abstraction over events. For parquet codec in aws_s3 sink, abstraction over encoders is necessary. Just using Box<dyn Encoder<E>> seems fine for that. The second abstraction is the one that would allow for this codec to be used in other sinks. I would say that that is an orthogonal concern/feature since it would need to unify all types of events in batch sinks into a single thing and then modify, if necessary, all of the codecs to use this unified type. That's a completely other issue.

For point 2. @fuchsnj can you provide some docs for schema_requirement, I can't find it in the docs/website. But that feature would be useful.

davemt commented 1 year ago

We currently collect logs and store as parquet-on-s3, using fluentd and an internal plugin we manage for the parquet conversion on our aggregators. We are very interested in migrating to Vector and this issue is currently the remaining blocker.

FWIW, in our setup we configure a schema to use for each type of log and we would hope that the Vector implementation would at least support an option for specifying schemas.

ktff commented 1 year ago

@fuchsnj I found the schema_requirement. That seems like exactly what's needed. So we can go with that.

Instead of determining schema during runtime, add option to specify schema for passing events.

scMarkus commented 1 year ago

Happy to see this discussion being continued. We are currently using Vector for Kubernetes cluster log collection but would have other use cases with much higher throughput where Vector is an interesting option. I would like to add some thoughts which would be mandatory to us:

  1. Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be null for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting?
  2. On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.
  3. What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.
spencergilbert commented 1 year ago
  1. Since we are dividing concerns in multiple micro services some producer (source) shall be able to change a schema in a predefined manner (e.g. adding fields in JSON). Vector (transformer / aggregator) is another service with its own schema. It should simply ignore the additional field of the source without dropping any events. Later Vector may update its schema independently so it will pick up previously added fields send to it. On the other hand if fields defined in Vector but not send by the source they should be null for that event when written to some sink. This relaxed behavior may be configurable to be more restrictive? I am mentioning this specifically since I tested fluentd plugin with coolumnify and it made it difficult to migrate schemas without data loss. Still the configuration options of the fluentd plugin may be a nice hint of what people are expecting?

Yeah, that's interesting - I certainly appreciate wanting to migrations easier. I imagine that could be "manually" done today with remap, checking if your field exists and if not insert a null value. On the flip side, I don't feel great about dropping portions of events that aren't part of the schema, but that matches some existing behavior in csv. An alternative could be erroring those when encoding and being able to route them to a different sink, DLQ style.

  1. On the batch encoding (may not be a concern of the current issue) it would be quite powerful to have some additional preprocessing options. For my use case I am thinking of sorting the batch by an array of fields. Utilizing parquets run length encoding (which should be done by the respective parquet writing library?) it can make a rather large difference in output size when sorted by fields with low cardinality or fields which are functional dependent on each other. Considering we are talking about an S3 sink which probably is long time storage this would reduce costs on the one hand side and also improve performance when reading and filtering data with apache spark, trino / athena, etc. In my case i could reduces file sizes by up to 20% but i have not seen this as a feature anywhere so far except the hive syntex. Still the benefits are measured without hive.

Definitely an interesting feature to keep in mind, we'd definitely want to get the basic implementation in before adding additional tooling to it though 👍

  1. What I would be genuinely curious about is how acknowledgements would behave in a batched manner? To stay consistent with the current semantics obviously only when an event gets committed to a sink the acknowledgement would be propagated. But would it be reasonable to consider an event acknowledged towards upstream sources when written to Vectors disk buffer which may be utilized for batching anyway? Surely this is a difficult problem and out of the scope of the parquet encoder. Still I like to spark the thought.

It's been a while but I feel as though I remember seeing that discussion in the past, if it's not currently the behavior I expect there are arguments that it could be.

@ktff is this still something you're planning to work on?

ktff commented 1 year ago

@spencergilbert I do. I was on vacation hence the silence. The 1. point raised by @fuchsnj remains uresolved. Simplified, there are two ways forward:

  1. Implement parquet codec only for aws_s3 sink. Once the support for batched codecs lands, that limitation can be lifted.
  2. Wait for batched codecs support and then merge parquet codec.

In both cases, my plan is to implement codec first and in the case of 2. submit it once batched codecs have landed.

spencergilbert commented 1 year ago

Hope you had a nice vacation! Sounds good to me.

Kikkon commented 1 year ago

@ktff I happened to find a similar solution. If you need help anywhere, I'd be happy to contribute together! If I can figure it out, that is. 🚀

ktff commented 1 year ago

@Kikkon the draft contains functioning parquet codec for aws_s3 sink. It's missing two important things though:

I'm currently not in the position to add support for batched codecs hence the limbo state of https://github.com/vectordotdev/vector/pull/17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.

Kikkon commented 1 year ago

@Kikkon the draft contains functioning parquet codec for aws_s3 sink. It's missing two important things though:

  • Proper dealing with non conforming events. This can be resolved with accepting a performance hit.
  • Support for batched codecs. This is the only blocking issue.

I'm currently not in the position to add support for batched codecs hence the limbo state of #17395. If this is something you feel confident to add to Vector then reach out to @jszwedko. Once that lands I'll be able to finish the PR and get it merged.

@ktff I have some experience using Parquet, but am not familiar with Vector. The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.

jszwedko commented 1 year ago

@jszwedko The Vector community has plans for a proposal to support batch codecs. Perhaps once I become more familiar with Vector's architecture, I can discuss with everyone how we could add batch codecs.

Hey! Yes, we would like to add the concept of batches to codecs but haven't been able to prioritize it on our side just yet. We'd be happy to help guide a contribution for it. I believe @lukesteensen would have some thoughts about what it could look like and also be able to answer questions.

Kikkon commented 1 year ago

Can you give me some advice? @lukesteensen 🫡

ktff commented 1 year ago

The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@Kikkon it's not doable with regular codecs, but if it would be then yes it would have performance cost. Also one thing to note, currently the PR does add limited support for batch codecs to support Parquet but it's hacky and so not something that can be merged. The goal is to replace this with proper support.

Kikkon commented 1 year ago

The issue with this PR is: Parquet does not support append writes. If appending, it may require merging new and old files which has performance costs. However, batch codecs implementation does not yet exist in Vector. So for now the PR is pending. if right?

@Kikkon it's not doable with regular codecs, but if it would be then yes it would have performance cost. Also one thing to note, currently the PR does add limited support for batch codecs to support Parquet but it's hacky and so not something that can be merged. The goal is to replace this with proper support.

Got it, thank you!

lukesteensen commented 1 year ago

@Kikkon that's correct, you can't really do this right now with the current design of our codecs crate. It's something I would love to enable, but I haven't had time to figure out a good path forward. It will likely take quite a bit of refactoring and design work. I'm hoping we'll be able to tackle that before long, as I agree this is a feature we should have!

Kikkon commented 1 year ago

@lukesteensen Is there a corresponding roadmap currently? Are there any parts that I can participate in?

rstml commented 10 months ago

This is a really big blocker for us. Any plans to make this happen?

Ralkion commented 10 months ago

@rstml : We've used the patch provided by @ktff and applied it ourselves to a release branch (can't remember which one off the top of my head). It's been working fine for us for a while now. It only works for S3, but it does work.

rstml commented 10 months ago

@Ralkion thanks! that's helpful. Will give it a try.

I got an impression that #16637 was a blocker, but it seems that we should be able to define metadata during consumption.

Why not to merge #17395 then? If there's a support for Avro that works without headers, Parquet should be acceptable too?

Ralkion commented 10 months ago

My understanding from the discussion above (and others may correct me if I'm over-simplifying) is that @ktff 's patch only works for S3, and only works in a certain way that isn't necessarily applicable to other output destinations. Because it can't be generalized to other destinations it's not really 'production ready'.

bruceg commented 10 months ago

@rstml We have not been able to prioritize it codec batches at this point, nor is it likely in the near future, unfortunately.

ktff commented 10 months ago

@rstml it's as @Ralkion said, this codec is currently accessible only through aws_s3 sink. For it to be a general codec for all sinks codec batches need to be implemented but they aren't on the roadmap for the near future as @bruceg mentioned.

fpytloun commented 4 months ago

I am also very interested in this feature. Going to be on roadmap anytime soon?

yjagdale commented 1 month ago

@jszwedko - any possibility to get this in roadmap?