transferwise / pipelinewise-target-redshift

Singer.io Target for Amazon Redshift - PipelineWise compatible
https://transferwise.github.io/pipelinewise/
Other
12 stars 65 forks source link

Incremental sync is not working as expecting #67

Open sphinks opened 4 years ago

sphinks commented 4 years ago

I have sync Postgres-Redshfit. It works perfectly in case of fast-sync, but once I try to use Incremental sync with the same table and the same data I got issues.

  1. I face issues that dates below 0000-00-00 could not be processed during incremental sync. I know that it is not correct data and should be eliminated during input, but nevertheless. Fix it just replace such dates with null.
  2. Next issue is not clear for me at all:
    
    time=2020-08-10 17:29:51 logger_name=target_redshift log_level=INFO message=Loading 20000 rows into 'public."STG_USER"'
    time=2020-08-10 17:29:53 logger_name=target_redshift log_level=ERROR message=Failed to load stream public-user to Redshift
    Traceback (most recent call last):
    File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 351, in load_stream_batch
    flush_records(stream, records_to_load, row_count[stream], db_sync, compression, slices, temp_dir)
    File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/__init__.py", line 425, in flush_records
    db_sync.load_csv(copy_key, row_count, size_bytes, compression)
    File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/db_sync.py", line 467, in load_csv
    cur.execute(copy_sql)
    File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/psycopg2/extras.py", line 143, in execute
    return super(DictCursor, self).execute(query, vars)
    psycopg2.errors.SyntaxError: conflicting or redundant options

Traceback (most recent call last): File "/app/.virtualenvs/target-redshift/bin/target-redshift", line 8, in sys.exit(main()) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 447, in main persist_lines(config, singer_messages, table_cache) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 195, in persist_lines filter_streams=filter_streams) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 323, in flush_streams ) for stream in streams_to_flush) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 1004, in call if self.dispatch_one_batch(iterator): File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 835, in dispatch_one_batch self._dispatch(tasks) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 754, in _dispatch job = self._backend.apply_async(batch, callback=cb) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 209, in apply_async result = ImmediateResult(func) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 590, in init self.results = batch() File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 256, in call for func, args, kwargs in self.items] File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/joblib/parallel.py", line 256, in for func, args, kwargs in self.items] File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 361, in load_stream_batch raise e File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 351, in load_stream_batch flush_records(stream, records_to_load, row_count[stream], db_sync, compression, slices, temp_dir) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/init.py", line 425, in flush_records db_sync.load_csv(copy_key, row_count, size_bytes, compression) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/target_redshift/db_sync.py", line 467, in load_csv cur.execute(copy_sql) File "/app/.virtualenvs/target-redshift/lib/python3.7/site-packages/psycopg2/extras.py", line 143, in execute return super(DictCursor, self).execute(query, vars) psycopg2.errors.SyntaxError: conflicting or redundant options

time=2020-08-10 17:29:54 logger_name=singer log_level=INFO message=METRIC: {"type": "counter", "metric": "record_count", "value": 20032, "tags": {}} time=2020-08-10 17:29:54 logger_name=tap_postgres log_level=CRITICAL message=[Errno 32] Broken pipe Traceback (most recent call last): File "/app/.virtualenvs/tap-postgres/bin/tap-postgres", line 8, in sys.exit(main()) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 434, in main raise exc File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 431, in main main_impl() File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 421, in main_impl args.config.get('default_replication_method'), state, state_file) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 315, in do_sync end_lsn) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 174, in sync_traditional_stream state = do_sync_incremental(conn_config, stream, state, desired_columns, md_map) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/init.py", line 85, in do_sync_incremental state = incremental.sync_table(conn_config, stream, state, desired_columns, md_map) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/tap_postgres/sync_strategies/incremental.py", line 113, in sync_table singer.write_message(record_message) File "/app/.virtualenvs/tap-postgres/lib/python3.7/site-packages/singer/messages.py", line 227, in write_message sys.stdout.flush() BrokenPipeError: [Errno 32] Broken pipe Exception ignored in: <_io.TextIOWrapper name='' mode='w' encoding='UTF-8'> BrokenPipeError: [Errno 32] Broken pipe


No idea, what is going wrong here. It is really strange: fast_sync move the same data (even incorrect dates) without any issue, incremental run is failing.

Let me know in case you need additional details or the issue should be raised in tap-postgres repository.