airbytehq / PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer.
https://docs.airbyte.com/pyairbyte
Other
224 stars 36 forks source link

🐛 Bug: PostgreSQL Cache can't handle undeclared dict type Column. #117

Closed tinomerl closed 6 months ago

tinomerl commented 7 months ago

Problem

When trying to replicate data from a source and saving it in a PostgreSQL DB i receive the following Exception.

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'

DuckDB has no problem with saving data in the JSON Format. Only when trying to save in a PostgreSQL DB it happens. I didn't have time/access to evaluate other caches.

Example Code

import airbyte as ab
from airbyte.caches import PostgresCache

cache = PostgresCache(
    host="localhost",
    username="postgres",
    password="pass123",
    database="postgres",
    port=5432
)

source: ab.Source = ab.get_source("source-pokeapi")

config = {
    "pokemon_name": "pikachu"
}

source.set_config(config)
source.check()
source.select_all_streams()

records = source.read(cache=cache)
Full Error Stacktrace

``` Started `source-pokeapi` read operation at 12:25:09... Failed `source-pokeapi` read operation at 12:25:12. Traceback (most recent call last): File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) psycopg2.ProgrammingError: can't adapt type 'dict' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 621, in read cache.processor.process_airbyte_messages( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 190, in process_airbyte_messages self.write_all_stream_data( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/base.py", line 201, in write_all_stream_data self.write_stream_data(stream_name, write_strategy=write_strategy) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 532, in write_stream_data temp_table_name = self._write_files_to_new_table( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/_processors/sql/base.py", line 648, in _write_files_to_new_table dataframe.to_sql( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/util/_decorators.py", line 333, in wrapper return func(*args, **kwargs) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/core/generic.py", line 3008, in to_sql return sql.to_sql( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 788, in to_sql return pandas_sql.to_sql( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 1958, in to_sql total_inserted = sql_engine.insert_records( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 1507, in insert_records raise err File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 1498, in insert_records return table.insert(chunksize=chunksize, method=method) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 1059, in insert num_inserted = exec_insert(conn, keys, chunk_iter) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/pandas/io/sql.py", line 951, in _execute_insert result = conn.execute(self.table.insert(), data) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1385, in execute return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context self.dialect.do_execute( File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict' [SQL: INSERT INTO airbyte_raw.pokemon_01hrpm4vqfzvzbdv38yrqwcnde (abilities, base_experience, cries, forms, game_indices, height, held_items, id, is_default, location_area_encounters, moves, name, "order", past_abilities, past_types, species, sprites, stats, types, weight) VALUES (%(abilities)s, %(base_experience)s, %(cries)s, %(forms)s, %(game_indices)s, %(height)s, %(held_items)s, %(id)s, %(is_default)s, %(location_area_encounters)s, %(moves)s, %(name)s, %(order)s, %(past_abilities)s, %(past_types)s, %(species)s, %(sprites)s, %(stats)s, %(types)s, %(weight)s)] [parameters: {'abilities': '[{"ability": {"name": "static", "url": "https://pokeapi.co/api/v2/ability/9/"}, "is_hidden": false, "slot": 1}, {"ability": {"name": "lightning-rod", "url": "https://pokeapi.co/api/v2/ability/31/"}, "is_hidden": true, "slot": 3}]', 'base_experience': 112, 'cries': {'latest': 'https://raw.githubusercontent.com/PokeAPI/cries/main/cries/pokemon/latest/25.ogg', 'legacy': 'https://raw.githubusercontent.com/PokeAPI/cries/main/cries/pokemon/legacy/25.ogg'}, 'forms': '[{"name": "pikachu", "url": "https://pokeapi.co/api/v2/pokemon-form/25/"}]', 'game_indices': '[{"game_index": 84, "version": {"name": "red", "url": "https://pokeapi.co/api/v2/version/1/"}}, {"game_index": 84, "version": {"name": "blue", "url": ... (1681 characters truncated) ... "url": "https://pokeapi.co/api/v2/version/21/"}}, {"game_index": 25, "version": {"name": "white-2", "url": "https://pokeapi.co/api/v2/version/22/"}}]', 'height': 4, 'held_items': '[{"item": {"name": "oran-berry", "url": "https://pokeapi.co/api/v2/item/132/"}, "version_details": [{"rarity": 50, "version": {"name": "ruby", "url": ... (2750 characters truncated) ... "url": "https://pokeapi.co/api/v2/version/29/"}}, {"rarity": 5, "version": {"name": "ultra-moon", "url": "https://pokeapi.co/api/v2/version/30/"}}]}]', 'id': 25, 'is_default': True, 'location_area_encounters': 'https://pokeapi.co/api/v2/pokemon/25/encounters', 'moves': '[{"move": {"name": "mega-punch", "url": "https://pokeapi.co/api/v2/move/5/"}, "version_group_details": [{"level_learned_at": 0, "move_learn_method": ... (235085 characters truncated) ... tps://pokeapi.co/api/v2/move-learn-method/4/"}, "version_group": {"name": "scarlet-violet", "url": "https://pokeapi.co/api/v2/version-group/25/"}}]}]', 'name': 'pikachu', 'order': 35, 'past_abilities': [], 'past_types': '[]', 'species': '{"name": "pikachu", "url": "https://pokeapi.co/api/v2/pokemon-species/25/"}', 'sprites': '{"back_default": "https://raw.githubusercontent.com/PokeAPI/sprites/master/sprites/pokemon/back/25.png", "back_female": "https://raw.githubuserconten ... (16502 characters truncated) ... /25.png", "front_female": "https://raw.githubusercontent.com/PokeAPI/sprites/master/sprites/pokemon/versions/generation-viii/icons/female/25.png"}}}}', 'stats': '[{"base_stat": 35, "effort": 0, "stat": {"name": "hp", "url": "https://pokeapi.co/api/v2/stat/1/"}}, {"base_stat": 55, "effort": 0, "stat": {"name": ... (339 characters truncated) ... "url": "https://pokeapi.co/api/v2/stat/5/"}}, {"base_stat": 90, "effort": 2, "stat": {"name": "speed", "url": "https://pokeapi.co/api/v2/stat/6/"}}]', 'types': '[{"slot": 1, "type": {"name": "electric", "url": "https://pokeapi.co/api/v2/type/13/"}}]', 'weight': 60}] (Background on this error at: https://sqlalche.me/e/14/f405) The above exception was the direct cause of the following exception: Traceback (most recent call last): File "", line 1, in File "/home/tino/repos/PyAirbyte_tests/.venv/lib/python3.10/site-packages/airbyte/sources/base.py", line 630, in read raise exc.AirbyteConnectorFailedError( airbyte.exceptions.AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed. Log output: Starting syncing SourcePokeapi Marking stream pokemon as STARTED Syncing stream: pokemon Marking stream pokemon as RUNNING Read 1 records from pokemon stream Marking stream pokemon as STOPPED Finished syncing pokemon SourcePokeapi runtimes: Syncing stream pokemon 0:00:01.182570 Finished syncing SourcePokeapi ```

aaronsteers commented 7 months ago

@tinomerl - Thanks for raising! 🙏

aaronsteers commented 7 months ago

@tinomerl - I have opened this PR which begins to add the PokeAPI source to our core test suite.

While testing, I found that there's a compounding issue with that source, which is that an extra top-level field cries is not declared in the stream's catalog. My hypothesis as of now (still to confirm/disconfirm) is that the issue you are seeing above is perhaps specific to having undeclared dict/JSON columns, and perhaps not with all such columns.

More detail in this comment, but either way, we will plan to make a fix.

aaronsteers commented 7 months ago

@tinomerl - Thanks again for raising this. We are closing in on a fix. The PR below has replicated and successfully resolved the issue. Merge is expected soon, and should release tomorrow or early next week.

aaronsteers commented 6 months ago

@tinomerl - We've just merged the PR. This fix should release shortly.

tinomerl commented 6 months ago

Hey @aaronsteers , thanks for the update. Just retested and it worked. Unfortunately i found another bug when testing with the PostgreSQL cache. I opened another Issue here at #148