vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17k stars 1.47k forks source link

New `gcp_big_query` sink #1536

Open binarylogic opened 4 years ago

binarylogic commented 4 years ago

GCP Big Query is a powerful service for analyzing large amounts of structured data. If used correctly, it can be a cost-effective storage for log data. I would like to see Vector support this service as a sink, but it'll require careful planning due to the many different ways Big Query can be used.

Context

Big Query is flexible and we should consider the following features for our Big Query sink:

  1. Storing JSON data in a single column. Users can use Big Query's JSON functions.
    1. This type of querying is slower for obvious reasons (fetching more data, parsing, etc).
    2. This type of querying is more expensive because each query must fetch and scan the entire JSON payload as opposed to individual columns.
  2. Mapping structured data to individual Big Query table columns.
  3. Automatic schema maintenance.
  4. Streaming inserts vs batching inserts.
    1. Streaming inserts have a cost ($0.010 per 200 MB).
    2. Note the variety of methods to batch insert. This can be done through the API directly or through other GCP services (cloud storage, stackdriver, etc).

This, of course, is not inclusive of all factors we should consider for Big Query, but it helps to demonstrate the variety of options.

Starting simple

v1 of this sink should solve the simplest implementation:

Long Term

We might consider the following features for long-term development:

  1. Support for using the batching API since it does not incur a cost.
  2. Dynamic schema maintenance. Although, I think this might be better solved with a transform or something separate.
bruceg commented 4 years ago

I have this done and minimally tested against GCP, but I have a few questions.

We now have transforms that can alter the schema of the log records. Should this sink be writing the logs into the table as-is, or still follow the rewriting described in the initial issue (using a JSON encoded column)?

How important is being able to configure the column names within the BigQuery sink itself?

binarylogic commented 4 years ago

Good questions, I have a lot of thoughts around this, but I didn't want to distract from a simple first version. Here are a couple of ways this can work today:

  1. Encode the event as JSON into a single column.
  2. Front the sink with a coercer transform with drop_unspecified enabled and ensure the Big Query table columns match exactly. #165 can serve this purpose as well.

Long term I would like to improve the UX by:

  1. Treating the Big Query table schema as the source of truth and using that schema to encode events. One idea is to reactively refresh this schema when we receive specific schema related insert errors.
  2. Treating another schema source (registry) as the source of truth, such as a JSON schema file. This would pair nicely with #1695.
  3. And finally, treating the event schema as the source of truth and dynamically changing the Big Query table schema as fields are discovered. This requires quite a bit more discussion and it's likely we won't do this.
jszwedko commented 3 years ago

Just noting that a user asked for this in gitter today: https://gitter.im/timberio-vector/community?at=5f3d3d31582470633b670c43

seeyarh commented 2 years ago

Any update on this? I prototyped a simple implementation here https://github.com/seeyarh/tobq/blob/main/src/lib.rs It uses the following library: https://github.com/lquerel/gcp-bigquery-client

I'd like to work on this sink, if possible.

jszwedko commented 2 years ago

@seeyarh nice! That seems like a good start. This issue hasn't been scheduled yet, but we are happy to review a PR to Vector if you wanted to try implementing it.

gartemiev commented 2 years ago

@jszwedko @seeyarh @binarylogic , any news on this? We are actively migrating our DW to Google Bigquery and would like to have this feature.

jszwedko commented 2 years ago

Hi @gartemiev ! Nothing yet unfortunately.

JoHuang commented 2 years ago

https://cloud.google.com/blog/products/data-analytics/bigquery-now-natively-supports-semi-structured-data With this support, maybe we can simplify schema and treat data as a field of JSON type.

gartemiev commented 1 year ago

@jszwedko @seeyarh @binarylogic

are there any news on this when it will be approximately available?

spencergilbert commented 1 year ago

are there any news on this when it will be approximately available?

It's currently not on our roadmap, but we'd be open to discussing a community contribution if anyone was interested in working on that.

goakley commented 10 months ago

@jszwedko @spencergilbert I would like to pick this up, as we have also stumbled into a need to write directly to BigQuery. I get the sense that this would be similar to other gcp and HttpSinks since we can leverage the tabledata.insertAll API call. There is a more "featureful" gRPC API for writing to tables, but that would definitely be a heavier lift given that there is no official Rust gRPC client for GCP. Any thoughts on this? Happy to propose an implementation in a PR if that's easier.

neuronull commented 10 months ago

@jszwedko @spencergilbert I would like to pick this up, as we have also stumbled into a need to write directly to BigQuery. I get the sense that this would be similar to other gcp and HttpSinks since we can leverage the tabledata.insertAll API call. There is a more "featureful" gRPC API for writing to tables, but that would definitely be a heavier lift given that there is no official Rust gRPC client for GCP. Any thoughts on this? Happy to propose an implementation in a PR if that's easier.

Hi @goakley , thanks for letting us know you're interested in working on this.

I think right now its OK to chat about the approach first, before implementing. We also have a formal process for evaluating new proposed components that we may draw some questions from and ask you. There has been good demand for this component already, which is good.

Last December I took a look at this, and the main concern is that google seems to really be advocating new development towards the Storage Write API (gRPC based). It definitely would be a heavier lift that is for sure. I think the trade off worth evaluating is if we get some nice performance improvements with the gRPC based API, and if support is dropped for the legacy API, that could imply a rewrite to the gRPC based API down the line.

goakley commented 10 months ago

Thank you for the feedback @neuronull. Using the gRPC interface requires a more complicated setup, but I believe it can be done as follow:

In addition to the "usual" sink settings (batch size, credentials, etc), users will be required to specify the project, dataset, and table name for the table in which they want to store their data, plus a representation of the table schema, which is necessary for calling the AppendRows method (i.e. we can't write arbitrary blobs of data). I believe the representation of the schema in the config file should be protobuf, as it's (a) the representation by which you can define the schema in BigQuery itself and (b) easy for us to parse in Rust and convert to the data types used by the gRPC API. We could support either a large protobuf string in the config file, or point to another .proto file from the config file. I'm not sure what's preferable here. Maybe supporting both?

By default, Vector will write to the table's default stream. In the future, we could extend the sink, adding a new config setting that allows writing to an application-created stream. I would prefer to keep the initial implementation simple and not add application-created streams at the start. Google themselves recommend using the default stream when possible, however they do not support exactly-once write semantics which some use cases might require. In any case, Vector will not manage streams - streams should be created independently and then provided to Vector via the config file.

Otherwise, the flow here is predictable. Data reaches the sink, gets transformed into the correct structure as specified by the config's protobuf, and is streamed to bigquery with the usual backoff/retry mechanics that vector uses in other sinks.

neuronull commented 10 months ago

Hey @goakley , thanks for the follow up. Ah yes, needing to supply the schema. I remember that now.

What you outlined makes sense to me, and I agree to keep it simple to the default stream to begin (though perhaps wiring it up with in mind that the default stream may not always be the one that is used).

We could support either a large protobuf string in the config file, or point to another .proto file from the config file. I'm not sure what's preferable here. Maybe supporting both?

I would opt for the file path approach to keep the vector config cleaner. We recently had a contribution of a protobuf codec, that does this for desc_file. That seems like it would be a useful pattern to re-use here. (see https://github.com/vectordotdev/vector/pull/18019)

goakley commented 9 months ago

Thanks for the nudge in the right direction @neuronull. I've split out the protobuf-specific serialization logic into its own PR: https://github.com/vectordotdev/vector/pull/18598 Once that's merged, I will follow up with a proposed BigQuery sink implementation.

goakley commented 9 months ago

Hey @neuronull, I've been reading through the vector code and trying some stuff out. Of particular interest in the fact that BigQuery's AppendRows call is (1) a streaming service that (2) returns row-specific errors, and (3) the "new" StreamSink API in the Vector codebase.

(1) By calling AppendRows, tonic establishes a long-lived connection over which a chunk of rows can be sent, after which a response will be streamed back. afaik all Vector sinks are tower::Services (as enforced by the Driver contract), which are based around a one-request-one-response model. I can certainly wrap the AppendRows logic to make the stream look like a request-response interface, but I wanted to make sure I'm not missing some Vector functionality that would allow for a more flexible streaming-based sink model.

(2) AppendRows responses will indicate if individual rows are the cause of a write failure. When the problem is with individual rows, no rows from the request are written, but the request can be tried again with the bad rows omitted, and that followup request is expected to succeed. This sort of behaviour doesn't seem to fit the Vector model, which assumes a single request (batch of events) will either succeed or fail in full. Again, I can finagle some logic to retry requests the hood of the tower::Service, but I wanted to see if I missed finding some backpressure/reprocessing logic in the Driver itself.

(3) The StreamSink API is quite powerful thanks to SinkBuilderExt. I've been referencing the datadog/traces logic as an example for BigQuery, which uses that sink type. Is this the way to build sinks going forward, or are other methods still supported? I've run into some absolutely bonkers rust errors while prototyping this out ("higher-ranked lifetime error" anyone?), so I wanted to make sure this is the right path before I go deep.

neuronull commented 8 months ago

👋 Hey @goakley , those are good questions to be asking. Sorry I didn't get to respond earlier. I will get back to you on this on Monday.

neuronull commented 8 months ago

(1) I can certainly wrap the AppendRows logic to make the stream look like a request-response interface, but I wanted to make sure I'm not missing some Vector functionality that would allow for a more flexible streaming-based sink model.

Indeed most of the sinks we have are on that request-response model. An example of one that differs from that is the websocket sink. We don't really have the same level of scaffolding/interfaces setup for this model as we do the request-response. Another potentially helpful reference might be the vector sink, where we use tonic as well, to connect to a downstream Vector instance, and call push_events.

(2) but the request can be tried again with the bad rows omitted, and that followup request is expected to succeed. This sort of behaviour doesn't seem to fit the Vector model, which assumes a single request (batch of events) will either succeed or fail in full. Again, I can finagle some logic to retry requests the hood of the tower::Service, but I wanted to see if I missed finding some backpressure/reprocessing logic in the Driver itself.

We do have some infrastructure here for retrying requests. A RetryLogic implementation can be passed into a ServiceBuilder. Doing so allows the ability to implement is_retriable_error(), in which the Response could be inspected to see if its the type of write failure due to individual rows.

See https://github.com/vectordotdev/vector/blob/0d09898867bcb489244aaba4e9a257eed9e97437/src/sinks/util/retries.rs#L25-L35 and https://github.com/vectordotdev/vector/blob/0d09898867bcb489244aaba4e9a257eed9e97437/src/sinks/aws_s3/config.rs#L197-L199 .

(3) Is this the way to build sinks going forward, or are other methods still supported?

Yes the StreamSink variant is the only acceptable path forward. We are slowly making progress on converting all the "old style" sinks to the new style (https://github.com/vectordotdev/vector/issues/9261).

ICYMI there is a detailed tutorial for new style sinks here: https://github.com/vectordotdev/vector/tree/master/docs/tutorials/sinks. It does center on the request-response based model but I think it does a good job at explaining the infrastructure that is used.

I've run into some absolutely bonkers rust errors while prototyping this out ("higher-ranked lifetime error" anyone?)

😅 There are definitely some obscure compilation errors that can crop up, and they can be tricky sometimes to track down. If you get stuck, you're welcome to share your branch and I can take a look at the error(s). I have done that in the past for others.

goakley commented 8 months ago

Thank you @neuronull, that is helpful! I've tried to keep things relatively simple in this initial PR, which does function as expected: https://github.com/vectordotdev/vector/pull/18886 Adding RetryLogic would probably be a good next step here, depending on the feedback. At a high level, the websocket and vector sinks seem kind of straightforward, but they don't quite match or combine to exactly how the BigQuery API works - I've implemented a less-optimal way of writing to BigQuery in exchange for greater sink simplicity, which may or may not be the right call.

(Oh, and the higher-ranked lifetime error was because vector_common::Error is a boxed dyn that I buried deep in nested futures)

neuronull commented 8 months ago

Awesome! I'll queue that PR up for review~

neuronull commented 8 months ago

👋 I haven't given that a deep review yet but I looked deep enough to see the manifestation of this

I've implemented a less-optimal way of writing to BigQuery in exchange for greater sink simplicity, which may or may not be the right call.

, and that is the main thing I think needs deciding on before moving forward with diving deeper into the review.

To fully utilize the gRPC Stream service model that the Storage Write API has, would mean going against the unary request/response mode the Vector stream sink driver framework relies on a bit (which we've touched on a bit already).

I think it's still possible to do it though. I do see your point about it being a tradeoff with complexity.

My bad for pointing to the Vector sink which doesn't utilize the Stream service model. A better reference point would probably actually be the gcp_pubsub source. That does leverage the gRPC Stream service model. As you can/will see, the code is definitely more involved. It means setting up an async tokio runtime and managing things in there. Of course, the pubsub source is not a sink so it doesn't have to fit into the constrains of the StreamSink requirements, but that is where the websocket sink kind of comes into play.

I am curious about the performance of the current design is. It's one thing to have perhaps a more robust design with leveraging the Stream service model, but it would add to the argument for it if the performance of the current model was noticeably poor.

goakley commented 6 months ago

@neuronull I'm not sure what performance we're looking for, but my team is currently using this branch in production (don't tell on us) to push an average of 3.3k events per second to BigQuery from a single gcp_bigquery sink in a single container. (We're running 32 of them, for a throughput of 108k/sec.) We aren't seeing any dropped messages or buffers filling up from this. As expected, there's no real CPU impact since this is all in the networking.

neuronull commented 5 months ago

I'm glad to hear it is working well for your use case, thank you for sharing that (🙈)

Raised this with the team and we have a proposal for you @goakley:

We are working on a formalized process for community-driven components in Vector. But in the interim, would you / your company be willing to informally volunteer ownership of the component in it's current state? That essentially implies that aside from routine maintenance required, the Vector team would be "hands-off" on this component, relying on you and your team for bug fixes reported by the community, etc. For an example of that you can see the history of the appsignal sink.

In the future, we may want to adopt this into a "core" vector component that we would re-assume ownership of, at which time we could further investigate the stream service approach that we've been discussing in this thread.

If this is agreeable to you/your company, we will dive into your PR for in-depth code review. How does that sound?