apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
360 stars 86 forks source link

go/adbc/driver/flightsql: support generic ingest #1107

Open joellubi opened 11 months ago

joellubi commented 11 months ago

I wanted to pose the question of what it would take to be able to support ingest with the flightsql driver. I understand that each driver is meant to supply its own specific implementation for ingestion, which makes doing so for a flightsql backend challenging because the driver wouldn't necessarily know the specifics of it's underlying representation or syntax.

I had a few thoughts on how this might be achieved:

Elaborating on how the first option might be implemented, here's an example of how the new message type might look:

message CommandStatementIngest {
  option (experimental) = true;

  enum IngestMode {
    INGEST_MODE_CREATE = 0;
    INGEST_MODE_APPEND = 1;
    INGEST_MODE_REPLACE = 2;
    INGEST_MODE_CREATE_APPEND = 3;
  }

  string target_table = 1;

  IngestMode mode = 2;
}

After receiving this in the FlightDescriptor, the flightsql server may then handle the subsequent stream with whichever means provide the desired throughput to the requested target.

I would appreciate any feedback on this approach, or links to prior context I may have missed. Thanks!

lidavidm commented 11 months ago

Substrait can handle UPDATE etc. but I am not sure if any implementations support it.

Adding support to Flight SQL may be useful, a better place to discuss would be the Arrow dev@ mailing list

lidavidm commented 11 months ago

Thanks for the post. I think this would be useful to support in Flight SQL. I'd propose some slight changes:

message CommandStatementIngest {
  option (experimental) = true;

  enum IngestMode {
    INGEST_MODE_UNSPECIFIED = 0;
    INGEST_MODE_CREATE = 1;
    INGEST_MODE_APPEND = 2;
    INGEST_MODE_REPLACE = 3;
    INGEST_MODE_CREATE_APPEND = 4;
  }

  IngestMode mode = 1;
  string target_table = 1;
  optional string target_schema = 2;
  optional string target_catalog = 3;
  // do we want db-specific parameters?
}
joellubi commented 11 months ago

Thanks for the suggestion, it makes sense to include those fields.

Regarding your question about db-specific parameters, I can see the following options:

  1. Add each option explicitly to the spec, but this would probably only work for parameters that generalize across many backends (i.e. not db-specific).
  2. Add a single field capable of storing arbitrary key-value pairs, like app_metadata.
  3. Use the existing app_metadata field of the initial FlightData sent as part of DoPut to include db-specific kv pairs.

I would appreciate your perspective on these options (or any others) @lidavidm.

alamb commented 11 months ago

Thank you @lidavidm and @joellubi -- I also think this basic idea makes sense.

In terms of the modes, SQL systems I am familiar with typically support two types, which are typically dialect specific and fairly complex:

  1. INSERT/COPY: appends the new rows to the target table
  2. UPSERT/MERGE potentially updates existing rows if present, and inserts new rows if not (for example, snowflake)

In order to implement UPSERT / MERGE you typically need to specify the criteria of what qualifies as an update. For some systems this is done via a declaration of PRIAMARY KEY but many others you can also potentially specify a custom matching condition

Thus I suggest supporting bulk insert in a generic way via a SQL query rather than an enum and table names, which would constrain how this feature gets used.

Perhaps we could add something like the follow (to mirror Update). It would likely make sense to have a prepared statement version of this as well

/*
 * Represents a SQL bulk insert / upsert query. Used in the command member of FlightDescriptor
 * for the the RPC call DoPut to cause the server to execute the included SQL INSERT/COPY/UPSERT/MERGE or similar
 * command, with the data in the batches in the DoPut call.
 */
message CommandStatementInsert {
  option (experimental) = true;

  // The SQL syntax.
  string query = 1;
  // Include the query as part of this transaction (if unset, the query is auto-committed).
  optional bytes transaction_id = 2;
}
lidavidm commented 11 months ago

I think the whole idea is to avoid having to know the specific SQL query (and I believe this is less for upsert and more for plain insert/copy; ADBC doesn't handle updates for its version of this). You are right we should include the transaction field, though.

alamb commented 11 months ago

🤔 since Insert / merge are pretty specific to the system and not standardized the SQL does vary quite a bit. Maybe in this case there is no better option than to support the lowest common denominator in terms of functionality 🤔

What is the intended semantics of CREATE by the way? Is it meant to create a new table it it doesn't already exist?

If so, I wonder how that would work with SQL types, which are not typically the same as Arrow types (e.g. a SQL VARCHAR might be a Utf8Array or a LargeUtf8Array). Is the idea that each FlightSQL implementation would automatically provide a mapping to the types, and if users wanted more control (like defining specific types or keys, etc) the would use the native DDL commands (CREATE TABLE ...)?

lidavidm commented 11 months ago

Yup, CREATE is 'create a new table, fail if already exists'.

Yes, that's pretty much what ADBC does here - the driver/server does its best to map the Arrow types to reasonable database types. If you need full control you'll have to use SQL yourself.

Substrait could provide some generic functionality but it's not well supported yet.

lidavidm commented 11 months ago

Add a single field capable of storing arbitrary key-value pairs, like app_metadata.

@joellubi this would be my vote (just a bytes field, or possibly a bytes field + map<string, string>)

joellubi commented 11 months ago

just a bytes field, or possibly a bytes field + map<string, string>

@lidavidm Could you please elaborate on what you mean by the latter option? Do you mean a bytes field encoding a map<string, string> or to actually use the native proto map field?

lidavidm commented 11 months ago

the latter, because it would presumably be easier for most things; but we've stuck to bytes fields elsewhere in Flight

joellubi commented 11 months ago

Got it. I agree it would be nice to use map though I'm unfamiliar if there are any prior decisions or precedent that might make bytes preferable.

If we went with map then this would be the current draft:

message CommandStatementIngest {
  option (experimental) = true;

  enum IngestMode {
    INGEST_MODE_UNSPECIFIED = 0;
    INGEST_MODE_CREATE = 1;
    INGEST_MODE_APPEND = 2;
    INGEST_MODE_REPLACE = 3;
    INGEST_MODE_CREATE_APPEND = 4;
  }

  IngestMode mode = 1;
  string target_table = 2;
  optional string target_schema = 3;
  optional string target_catalog = 4;
  map<string, string> options = 5;
}

Open to suggestions on the name for that field. app_metadata is used in other messages but it might be unintuitive to call it the same thing if it's not a bytes type.

lidavidm commented 11 months ago

@zeroshade @ywc88 comments here?

judahrand commented 11 months ago

Should we also incorporate the temporary flag which ADBC uses?

message CommandStatementIngest {
  option (experimental) = true;

  enum IngestMode {
    INGEST_MODE_UNSPECIFIED = 0;
    INGEST_MODE_CREATE = 1;
    INGEST_MODE_APPEND = 2;
    INGEST_MODE_REPLACE = 3;
    INGEST_MODE_CREATE_APPEND = 4;
  }

  IngestMode mode = 1;
  string target_table = 2;
  optional string target_schema = 3;
  optional string target_catalog = 4;
  optional bool temporary = 5;
  map<string, string> options = 1000;
}
lidavidm commented 11 months ago

Ah yes, good catch.

zeroshade commented 11 months ago

In general this seems very reasonable to me and I like the idea. Though I think historically we've preferred enum defined option names rather than allowing arbitrary options?

Also, in proto3 all fields are optional so the optional tag on those fields is extraneous and unnecessary, and also invalid syntax for proto3 (technically we already end up having to use --experimental_allow_proto3_optional for FlightSql but I'd prefer to avoid adding more cases of optional in the proto file if we can avoid it)

lidavidm commented 11 months ago

The optional is to signify which parameters aren't actually required (that said it isn't a big deal here, I suppose). Newer versions of Protobuf will allow that by default and then it is just part of Protobuf.

The options here are arbitrary and backend-specific; there is no enum we can define.

zeroshade commented 11 months ago

The options here are arbitrary and backend-specific; there is no enum we can define.

I suspected as much, just wanted to confirm. :smile:

The optional is to signify which parameters aren't actually required (that said it isn't a big deal here, I suppose). Newer versions of Protobuf will allow that by default and then it is just part of Protobuf.

I didn't realize that they changed it so we can get rid of the --experimental_allow_proto3_optional, that's good. Though it looks like using it is discouraged and it's preferred to use the google.api.field_behavior field annotation instead?

lidavidm commented 11 months ago

It's official: https://github.com/protocolbuffers/protobuf/issues/10463

I think that annotation is more google API fluff and isn't really relevant here (nor would it affect codegen like the optional keyword)

joellubi commented 11 months ago

Thanks for the feedback on this. I've started to put together some of these changes in preparation for a pull request but have had a few further questions come up.

First, here is the current draft of the proto definition I have:

  /*
   * Represents a bulk ingestion request. Used in the command member of FlightDescriptor
   * for the the RPC call DoPut to cause the server load the contents of the stream's
   * FlightData into the target destination.
   */
message CommandStatementIngest {
  option (experimental) = true;

  // Describes the behavior for loading bulk data.
  enum IngestMode {
    // Ingestion behavior unspecified.
    INGEST_MODE_UNSPECIFIED = 0;
    // Create the target table. Fail if the target table already exists.
    INGEST_MODE_CREATE = 1;
    // Append to an existing target table. Fail if the target table does not exist.
    INGEST_MODE_APPEND = 2;
    // Drop the target table if it exists. Then follow INGEST_MODE_CREATE behavior.
    INGEST_MODE_REPLACE = 3;
    // Create the target table if it does not exist. Then follow INGEST_MODE_APPEND behavior.
    INGEST_MODE_CREATE_APPEND = 4;
  }

  // The ingestion behavior.
  IngestMode mode = 1;
  // The table to load data into.
  string target_table = 2;
  // The db_schema of the target_table to load data into. If unset, ... (TODO)
  optional string target_schema = 3;
  // The catalog of the target_table to load data into. If unset, ... (TODO)
  optional string target_catalog = 4;
  // Use a temporary table for target_table.
  optional bool temporary = 5;
  // Backend-specific options.
  map<string, string> options = 1000;
}

A few open questions:

  1. How should the behavior of an unset target_schema or target_catalog be described? I suppose it would have to use the backend-specific default, if one exists. Would that be a reasonable specification of the behavior?
  2. @alamb had a comment about including optional bytes transaction_id as a field. I think this makes sense but it's not clear to me how a server should answer SqlInfo requests about SQL_TRANSACTIONS_SUPPORTED or FLIGHT_SQL_SERVER_TRANSACTION if it supports transactions for queries but not necessarily bulk ingestion, which could often times be the case. We could add more options to FLIGHT_SQL_SERVER_TRANSACTION to capture the distinction, but perhaps it might be simpler to leave it out of the spec and wait until transaction support for bulk ingestion is explicitly needed in the spec before making those extensions.
  3. A more general development/contributing question: Are there specific tool versions specified to run when generating files during development. For example there are several newer versions of protoc-gen-go than the one the go pb files are currently generated with (v1.28.1) and I was wondering if it would be an issue to bump these versions.
lidavidm commented 11 months ago
  1. Yes, it's up to the backend. This will perhaps become clearer as the proposal for session state in Flight SQL gets worked out (since 'current' catalog/schema are part of that)
  2. I think we can include it up-front. It might be better to use a separate SqlInfo value for it, though.
  3. Is there a version in the go.mod?
joellubi commented 11 months ago

@lidavidm

  1. Got it, thanks.
  2. Ok I'll make that change.
  3. Yes the version is currently v1.31.0, so it probably makes sense to use that version. I was a little confused because the file has still been generated with v1.28.1 despite go.mod indicating otherwise. TLDR it turns out that because protoc-gen-go is a plugin for protoc and not directly runnable via go run ..., there's not a simple way to tell //go:generate to run the version specified in go.mod. That's unfortunate but at least I can align my local environment with go.mod manually.
joellubi commented 11 months ago

I've opened a draft PR in the arrow repository with the proposed changes and the go implementation. I'd appreciate any feedback on the specifics.

I thought that the following other changes might be required, but I wasn't sure:

If any or all of these are required, I'm happy to add them to the PR.

lidavidm commented 11 months ago

Thanks! All are required, but please ping the mailing list for opinions; once we have all the parts we can then hold a formal vote.

joellubi commented 10 months ago

I've opened a second PR into which I've split the go implementation + integration tests, which should help add some color to the actual usage of these changes. I'd appreciate any feedback on that or the original format PR, as there have been some minor changes there as well. Thanks!