apache / arrow-adbc

Database connectivity API standard and libraries for Apache Arrow
https://arrow.apache.org/adbc/
Apache License 2.0
360 stars 86 forks source link

Streaming LIST data over ADBC #2066

Closed danielballan closed 3 days ago

danielballan commented 1 month ago

What would you like help with?

Hello @paleolimbot, we briefly spoke at SciPy a month ago. @cpcloud and @gforsyth sent me your way.

My colleauge @skarakuzu and I have the following application:

Science Experiment ---> ~10Hz rows of data over HTTP ---> Database

In our use case, we are interested in reading partial datasets while an experiment is still ongoing, but we are not especially sensitive to real-time access; a little lag is acceptable. We would like to use Postgres for facility-scale "production" and SQLite for small deployments and dev/test.

We think it makes sense to use ADBC to get the data into Postgres and SQLite.

Most of our data is simple tabular data with basic types. This is a simple sketch that works:

import os

import pandas
import pyarrow

import adbc_driver_postgresql.dbapi
import adbc_driver_sqlite.dbapi

conn = adbc_driver_sqlite.dbapi.connect("test.sqlite")
with conn.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS example")

dfs = [
    pandas.DataFrame({"A": [1, 2, 3], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [4, 5, 6], "B": [40., 50., 60.]}),
]

# Create a table and append to it.
with conn.cursor() as cur:
    for df in dfs:
        table = pyarrow.Table.from_pandas(df)
        cur.adbc_ingest("example", table, mode="create_append")
        cur.execute("SELECT * FROM example")
        print(cur.fetchall())
conn.close()

In rare cases, we have detectors that produce a small "waveform" of data, a small variable-length list, alongisde other columns of scalar readings, like this:

dfs = [
    pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}),
]

Expand for a complete runnable example with this data:

```python import os import pandas import pyarrow import adbc_driver_postgresql.dbapi import adbc_driver_sqlite.dbapi conn = adbc_driver_sqlite.dbapi.connect("test.sqlite") with conn.cursor() as cur: cur.execute("DROP TABLE IF EXISTS example") dfs = [ pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}), pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}), ] # Create a table and append to it. with conn.cursor() as cur: for df in dfs: table = pyarrow.Table.from_pandas(df) cur.adbc_ingest("example", table, mode="create_append") cur.execute("SELECT * FROM example") print(cur.fetchall()) conn.close() ```

As you would expect this fails with a clear error message:

adbc_driver_manager.NotSupportedError: NOT_IMPLEMENTED: Column 0 has unsupported type list

It looks like support for lists was recently added for Postgres in https://github.com/apache/arrow-adbc/pull/1962 but not yet released. We have tried without success to cobble together a C++ example inserting lists into PostgreSQL on main. I wonder if you could sketch a short runnable example to help us get started. (Let us know if you want to see what we've tried.) We would happy to contribute to the C++ documentation for ADBC once get something working.

Once #1962 is released, should we expect lists to work from Python as well, as it wraps the C/C++, or are more changes needed on the Python side to make that work?

Finally, we're interested in adding support for lists in SQLite. As SQLite is loose about types, perhaps one reasonable way to do this would be with a JSON column. I wonder what you think of that approach.

paleolimbot commented 1 month ago

I find installing the development ADBC Python packages rather difficult; however, I do have a build set up and ran your example (thanks!) against the postgres driver at main and it seems to fail! That error is coming from a previous implementation of ingestion that doesn't use COPY, and the COPY writer is where the feature was added in the PR you linked. It also may be that a recent PR I did resulted in the COPY path not getting picked up.

import os

import pandas
import pyarrow

import adbc_driver_postgresql.dbapi

# From the apache-adbc checkout, run docker compose up postgres-test
conn = adbc_driver_postgresql.dbapi.connect("postgresql://localhost:5432/postgres?user=postgres&password=password")
with conn.cursor() as cur:
    cur.execute("DROP TABLE IF EXISTS example")

dfs = [
    pandas.DataFrame({"A": [[1, 2, 3], [1, 2], [1, 2, 3, 4]], "B": [10., 20., 30.]}),
    pandas.DataFrame({"A": [[4, 5, 6], [4, 5, 6, 7], [4]], "B": [40., 50., 60.]}),
]

# Create a table and append to it.
with conn.cursor() as cur:
    for df in dfs:
        table = pyarrow.Table.from_pandas(df)
        cur.adbc_ingest("example", table, mode="create_append")
        cur.execute("SELECT * FROM example")
        print(cur.fetchall())
conn.close()
#> Traceback
#> ...
#> NotSupportedError: NOT_IMPLEMENTED: [libpq] Field #1 ('A') has unsupported type for ingestion list

Reproducer in R:

library(adbcdrivermanager)
#> Warning: package 'adbcdrivermanager' was built under R version 4.3.3

con <- adbc_database_init(
  adbcpostgresql::adbcpostgresql(),
  uri = "postgresql://localhost:5432/postgres?user=postgres&password=password"
) |> 
  adbc_connection_init()

df <- tibble::tibble(
  A = vctrs::list_of(1:3, 4:5, 6:10),
  B = c(10.0, 20.0, 30.0)
)

con |> 
  execute_adbc("DROP TABLE IF EXISTS table_with_list")

df |> 
  write_adbc(con, "table_with_list")
#> Error in adbc_statement_execute_query(stmt): NOT_IMPLEMENTED: [libpq] Field #1 ('A') has unsupported type for ingestion list

Created on 2024-08-07 with reprex v2.1.0

paleolimbot commented 1 month ago

Hmm..I'm wondering if this call to SetParamTypes() isn't needed anymore:

https://github.com/apache/arrow-adbc/blob/3168e091d2b80c3af3fe697681fd3e437e52e090/c/driver/postgresql/statement.cc#L1305

skarakuzu commented 1 month ago

Hello @paleolimbot,

Thank you very much for your response. In addition to python, we created also a c++ example by following the code and the tests in the c++ repo. We were able to create a table and read from it when the table does not contain lists. However, the code crashes with segmentation fault if there is any list ingestion. All the queries run without throwing any error but no table is created in the postgresql database. The code crashes at the stream.get_schema(&stream, &schema_rep); line . I am a bit new to arrow-adbc and nano-arrow so I wanted to ask if there is any step I am missing in the process.

I also tried to comment the line you pointed and build the code but it did not fix the problem. We appreciate any help and suggestions. Thanks in advance!

``` #include #include #include #include #include #include #include #include int main() { AdbcError error{}; AdbcDatabase database = {}; AdbcDatabaseNew(&database, &error); AdbcDatabaseSetOption(&database, "driver", "adbc_driver_postgresql", &error); AdbcDatabaseSetOption(&database, "uri", "postgresql://localhost:5432/postgres", &error); AdbcDatabaseInit(&database, &error); /// Creating a Connection AdbcConnection connection = {}; AdbcConnectionNew(&connection, &error); AdbcConnectionInit(&connection, &database, &error); struct ArrowSchema schema; struct ArrowArray batch; static struct ArrowError global_error; ArrowSchemaInit(&schema); ArrowSchemaSetTypeStruct(&schema, 2); ArrowSchemaInit(schema.children[0]); ArrowSchemaSetTypeFixedSize(schema.children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2); ArrowSchemaInit(schema.children[1]); ArrowSchemaSetTypeFixedSize(schema.children[1], NANOARROW_TYPE_FIXED_SIZE_LIST, 2); ArrowSchemaSetName(schema.children[0], "index"); ArrowSchemaSetName(schema.children[1], "create"); ArrowSchemaSetType(schema.children[0]->children[0], NANOARROW_TYPE_INT64); ArrowSchemaSetType(schema.children[1]->children[0], NANOARROW_TYPE_STRING); std::vector> v1{{42, 43}, {-42, -43}}; std::vector> v2{{"foo", "foo1"}, {"bar", "bar1"}}; ArrowArrayInitFromSchema(&batch, &schema, &global_error); ArrowArrayStartAppending(&batch); for (size_t i = 0; i < v1.size(); i++) { for (size_t j = 0; j < v1[i].size(); j++) { ArrowArrayAppendInt(batch.children[0]->children[0], v1[i][j]); } ArrowArrayFinishElement(batch.children[0]); ArrowArrayFinishElement(&batch); } for (size_t i = 0; i < v2.size(); i++) { for (size_t j = 0; j < v2[i].size(); j++) { ArrowArrayAppendString( batch.children[1]->children[0], ArrowStringView{v2[i][j].c_str(), (int64_t)strlen(v2[i][j].c_str())}); } ArrowArrayFinishElement(batch.children[1]); ArrowArrayFinishElement(&batch); } batch.children[0]->length = batch.children[0]->children[0]->length; batch.length = batch.children[0]->length; ArrowArrayFinishBuildingDefault(&batch, &global_error); // Create Stream struct ArrowArrayStream stream {}; // Creating a Statement struct AdbcStatement statement; int64_t rows_affected = -1; // Drop table if exists AdbcStatementNew(&connection, &statement, &error); std::string query = "DROP TABLE IF EXISTS \"bulk_ingest\""; AdbcStatementSetSqlQuery(&statement, query.c_str(), &error); AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error); // End of dropping the table if exists // Start the table AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error); AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, ADBC_INGEST_OPTION_MODE_CREATE_APPEND, &error); AdbcStatementBind(&statement, &batch, &schema, &error); AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error); std::cout << "0 . rows affected: " << rows_affected << std::endl; AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error); AdbcStatementExecuteQuery(&statement, &stream, &rows_affected, &error); std::cout << "1 . rows affected: " << rows_affected << std::endl; struct ArrowSchema schema_rep = {}; stream.get_schema(&stream, &schema_rep); // ... Some other code for post processing AdbcConnectionRelease(&connection, &error); AdbcDatabaseRelease(&database, &error); return 0; } ``` <\details>
paleolimbot commented 1 month ago

Thanks for this! I'll go through the C++ example in a moment, but I played with this a tiny bit and there is also the place where we serialize the type name (which can probably be moved to the PostgresType if it's not already there). In general the using of ADBC from C or C++ at the moment requires a lot of C memory management and it can be difficult to get right.

https://github.com/apache/arrow-adbc/blob/3168e091d2b80c3af3fe697681fd3e437e52e090/c/driver/postgresql/statement.cc#L1098-L1101

I also tried shuffling some pieces around to see if we could use the "append" mode (i.e., issue the CREATE TABLE manually), but when I do this I still get an invalid COPY binary error. I'll put all this in a PR but wanted to document it while it was fresh on my mind!

danielballan commented 2 weeks ago

Hello @paleolimbot. Just giving this a ping to express our continued interest in this topic and willingness to contribute if we can.

paleolimbot commented 2 weeks ago

Thank you for the ping! I was unfortunately on PTO for the last week before the forthcoming release but will be back next week and will make sure a fix for this is included in the next release (~6 weeks).

danielballan commented 2 weeks ago

Thanks! That is helpful for our planning. I hope you had some enjoyable days off.