MeltanoLabs / Singer-Working-Group

Working group for ongoing development and iteration of the Singer Spec, the de-facto protocol for open source data connectors. Please use "Issues" to create discussion items - or use "Discussions" for general questions.
Apache License 2.0
13 stars 4 forks source link

Adding alternatives to the exchange of record-oriented json messages #29

Open Marnixvdb opened 2 years ago

Marnixvdb commented 2 years ago

As great and universal as json is for exchanging messages, the fact that Singer tap -> target communication requires a record-oriented json format is a big drawback (at least for me), as the unnecessary serialisation/deserialisation overhead becomes a real pain when processing (analytical) bulk data.

I was wondering how much room/importance the community sees in extending the spec in this area.

My first thought would be to add the option to use Apache Arrow Inter Process Communication (IPC). For those unfamiliar with Arrow: Arrow is a standardised columnar memory specification, and IPC is a way of transferring arrow record batches without the need for serialisation/deserialisation. As many data storage systems are adopting Arrow Flight, the will be a lot of value in data pipelines that use Arrow as the shared data layout in every step from extraction to loading.

Let me know if this is of interest, or if more information is needed, and I will add more detail.

tayloramurphy commented 2 years ago

@Marnixvdb there's great interest around that! We've got https://github.com/MeltanoLabs/Singer-Working-Group/issues/2 logged as an issue just for that. I think Arrow makes a lot of sense as an option for users.

Marnixvdb commented 2 years ago

Thanks @tayloramurphy. I've read the discussion and the PR, which proposes adding a BATCH message to enable a tap to send a manifest of files to process to the target. It doesn't talk about an alternative data format for the exchange of data, does it?

While the BATCH format will be useful in some scenario's, it seems more complicated than necessary and it runs counter to separation of concerns between taps and targets.

I think it's simpler and more effective to instead (or additionally) add Arrow IPC as format to pipe data from tap -> target. Arrow has been designed to be highly performant and also, like singer, to be interoperable and language agnostic. It has matured quickly and is getting adopted widely. Arrow and Singer would be a great fit and, contrary to the BATCH proposal, the independence and composability of taps and targets would be preserved. (edited to add) Also: Arrow is typed, handles NULL values unambiguously and brings its schema along.

Having to deal with more than one data format in the exchange between taps and targets would introduce some management overhead, but I that should be trivial to solve for the engineers writing the pipelines.

What do you think?

edgarrmondragon commented 2 years ago

Hi @Marnixvdb!

Overall, I think the prospect of leveraging the Arrow spec is very interesting. A PoC or reference implementation showcasing feature parity with the Singer spec, would be a good start for an enhancement proposal.

Message types

There is definite overlap between Singer and Arrow message types, so that is a good sign. The ones supported by Arrow are:

The documentation mentions Custom Application Metadata, which might be useful for our case:

We provide a custom_metadata field at three levels to provide a mechanism for developers to pass application-specific metadata in Arrow protocol messages. This includes Field, Schema, and Message.

The colon symbol : is to be used as a namespace separator. It can be used multiple times in a key.    

The ARROW pattern is a reserved namespace for internal Arrow use in the custom_metadata fields. For example, ARROW:extension:name.

Metadata at the Field level might be useful for adding indicators like SQL-types, etc. At the Message level, it could decorate a record batch with a state value and (ACTIVATE) version. It's not clear by reading the IPC streams docs how this metadata can be added, though. The pyarrow.record_batch function mentions metadata for the schema, but not for the batch message itself.

Message Type Singer Arrow
Schema
Record No individual records, only batch
State Custom metadata might be a substitute.
Activate Version Extension Custom metadata might be a substitute.
Batch Extension

Ecosystem

JSON support is ubiquitous and very stable in almost all programming languages. Arrow, however, is maybe stable and support may not be as good across the board.

Separate processes for tap and target

To get the most out of Arrow IPC (e.g. zero-copy reads), the source and target processes have to share memory, but it's not clear how to achieve that with taps and targets running as individual applications as they do today.

pkit commented 1 month ago

To get the most out of Arrow IPC (e.g. zero-copy reads), the source and target processes have to share memory, but it's not clear how to achieve that with taps and targets running as individual applications as they do today.

Arrow IPC is much faster than JSON, even as an interprocess format. It just does less serialization. So, everything is fine here. In my tests it's even faster than messagepack and on par with Native ClickHouse protocol (which is as fast as I've seen so far).

edgarrmondragon commented 4 weeks ago

To get the most out of Arrow IPC (e.g. zero-copy reads), the source and target processes have to share memory, but it's not clear how to achieve that with taps and targets running as individual applications as they do today.

Arrow IPC is much faster than JSON, even as an interprocess format. It just does less serialization. So, everything is fine here. In my tests it's even faster than messagepack and on par with Native ClickHouse protocol (which is as fast as I've seen so far).

@pkit Thanks for the data point. We have considered adding this (or accepting PRs to add it) to the Singer SDK: https://github.com/meltano/sdk/issues/1684. It's definitely an interesting idea to explore.