dlt-hub / verified-sources

Contribute to dlt verified sources 🔥
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
72 stars 50 forks source link

`sql_database` throwing exception on nullable JSONB column from CockroachDB #582

Open jdavisbp opened 2 weeks ago

jdavisbp commented 2 weeks ago

dlt version

1.3

Source name

sql_database

Describe the problem

dlt.extract.exceptions.ResourceExtractionError: In processing pipe sqltable: extraction of resource sqltable in generator table_rows caused an exception: Expected bytes, got a 'dict' object

... traces back to below ...

  File "pyarrow/table.pxi", line 1920, in pyarrow.lib._Tabular.from_pydict
  File "pyarrow/table.pxi", line 6009, in pyarrow.lib._from_pydict
  File "pyarrow/array.pxi", line 385, in pyarrow.lib.asarray
  File "pyarrow/array.pxi", line 345, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object

The pipeline will run for a bit as evidence by this message repeating over and over again:

[WARNING] Field extra was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast.

But then after a bit the error message will pop up. I do attempt to suggest the column type like this: columns={"extra":{"data_type": "text"} but doesn't resolve the issue.

As I understand it, that should coerce the column into a string format in the select query

Expected behavior

Should dump a valid JSON string into the Snowflake table (column extra)

Steps to reproduce

This is occurring on a large (50M+ records) database table, running on a subset of the data (i.e. only new records created in the last 10 days succeeds) so it may be challenging to reproduce but the code snippet I am running can be found below:

pipeline = dlt.pipeline(
    pipeline_name="crdb",
    destination="snowflake",
    dataset_name="crdb",
)
source = sql_database(
    credentials=ConnectionStringCredentials(
        f"cockroachdb://{valid_cockroachdb_string}"
    ),
    backend="pyarrow",
    chunk_size=10000,
    backend_kwargs={"tz": "UTC"},
).with_resources(
  "table_a"
)
source.table_a.apply_hints(
    incremental=dlt.sources.incremental("created_at", initial_value=pendulum.DateTime(1970, 1, 1, 0, 0, 0).in_timezone("UTC")),
    columns={"extra":{"data_type": "text"}}
)
load_info = pipeline.run(source, write_disposition="append")

How you are using the source?

I'm considering using this source in my work, but bug is preventing this.

Operating system

Linux, macOS

Runtime environment

Local

Python version

3.9.6

dlt destination

Snowflake

Additional information

https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1730405813993339 ^ related slack thread