apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
329 stars 84 forks source link

go/adbc/driver/flightsql: can't use for Ballista #862

Open kou opened 1 year ago

kou commented 1 year ago

Ballista https://arrow.apache.org/ballista/ uses Flight SQL for protocol.

https://arrow.apache.org/ballista/user-guide/flightsql.html#a-name-tool-use-the-driver-in-your-favorite-data-tool is the connection information.

(We can use docker-compose to run Ballista cluster on local: https://arrow.apache.org/ballista/user-guide/deployment/docker-compose.html )

Here is a C program that connects to Ballista by ADBC with Flight SQL driver implemented in Go:

```c #include #include int main(void) { struct AdbcDatabase database = {0}; struct AdbcError error = {0}; AdbcStatusCode code = AdbcDatabaseNew(&database, &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseNew() is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseSetOption(driver) is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcDatabaseSetOption(&database, "uri", "grpc://127.0.0.1:50050", &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseSetOption(uri) is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcDatabaseSetOption(&database, "username", "admin", &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseSetOption(username) is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcDatabaseSetOption(&database, "password", "password", &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseSetOption(password) is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcDatabaseInit(&database, &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseInit() is failed: %s\n", error.message); error.release(&error); return 1; } { struct AdbcConnection connection = {0}; code = AdbcConnectionNew(&connection, &error); if (code != ADBC_STATUS_OK) { printf("AdbcConnectionNew() is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcConnectionInit(&connection, &database, &error); if (code != ADBC_STATUS_OK) { printf("AdbcConnectionInit() is failed: %s\n", error.message); error.release(&error); return 1; } { struct AdbcStatement statement = {0}; code = AdbcStatementNew(&connection, &statement, &error); if (code != ADBC_STATUS_OK) { printf("AdbcStatementNew() is failed: %s\n", error.message); error.release(&error); return 1; } code = AdbcStatementSetSqlQuery(&statement, "SELECT 'hello'", &error); if (code != ADBC_STATUS_OK) { printf("AdbcStatementSetSqlQuery() is failed: %s\n", error.message); error.release(&error); return 1; } struct ArrowArrayStream stream = {0}; int64_t rows_affected; code = AdbcStatementExecuteQuery(&statement, &stream, &rows_affected, &error); if (code != ADBC_STATUS_OK) { printf("AdbcStatementExecute() is failed: %s\n", error.message); error.release(&error); return 1; } struct ArrowArray array = {0}; if (stream.get_next(&stream, &array) != 0) { printf("ArrowArrayStream::get_next() is failed: %s\n", stream.get_last_error(&stream)); return 1; } array.release(&array); stream.release(&stream); code = AdbcStatementRelease(&statement, &error); if (code != ADBC_STATUS_OK) { printf("AdbcStatementRelease() is failed: %s\n", error.message); error.release(&error); return 1; } } code = AdbcConnectionRelease(&connection, &error); if (code != ADBC_STATUS_OK) { printf("AdbcConnectionRelease() is failed: %s\n", error.message); error.release(&error); return 1; } } code = AdbcDatabaseRelease(&database, &error); if (code != ADBC_STATUS_OK) { printf("AdbcDatabaseRelease() is failed: %s\n", error.message); error.release(&error); return 1; } return 0; } ```

FYI: Here is a Ruby version:

```ruby require "adbc" options = { "driver" => "adbc_driver_flightsql", "uri" => "grpc://127.0.0.1:50050", "username" => "admin", "password" => "password", } ADBC::Database.open(**options) do |database| database.connect do |connection| connection.open_statement do |statement| table, _n_rows_affected = statement.query("SELECT 'hello'") p table end end end ```

But the program is failed with:

ArrowArrayStream::get_next() is failed: rpc error: code = Internal desc = Ballista Error: General("scheduler::from_proto(Action) invalid or missing action")

I don't know why but I think that Flight SQL driver may send a Flight SQL request that isn't supported Ballista yet.

FYI: We can access to Ballista with raw Apache Arrow Flight SQL C++ implementation:

```ruby require "arrow-flight-sql" call_options = ArrowFlight::CallOptions.new client = ArrowFlight::Client.new("grpc://127.0.0.1:50050") client.authenticate_basic("admin", "password", call_options) sql_client = ArrowFlightSQL::Client.new(client) info = sql_client.execute("SELECT 'hello'", call_options) endpoint = info.endpoints.first reader = sql_client.do_get(endpoint.ticket, call_options) table = reader.read_all p table ```
lidavidm commented 1 year ago

At the least, we need to be better about errors, including showing which RPC exactly failed

lidavidm commented 11 months ago

Hmm, it seems that Ballista returns a location in the FlightEndpoint that can't actually handle the request. The raw C++ implementation is ignoring the location info. As it stands, I think this means Ballista is implementing the protocol wrongly.

lidavidm commented 11 months ago

I put up a PR to add more context to this kind of error, but otherwise I think the fix should go on the Ballista side.