getdozer / dozer

Dozer is a real-time data movement tool that leverages CDC from various sources and moves data into various sinks.
https://getdozer.io
GNU Affero General Public License v3.0
1.49k stars 121 forks source link

Aerospike sink replication failing #2404

Closed prabhuvaibhav closed 7 months ago

prabhuvaibhav commented 7 months ago

This is regarding #2393. There are some issues when executing multiple transactions at a time.

The config being used:

version: 1
app_name: consistency-test
connections:
  - config: !Oracle
      user: DOZER
      password: 123
      host: oracle-db-consistency-test.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com
      port: 1521
      sid: ORCL
      schemas: 
        - VAIBHAV
      replicator: !LogMiner
        poll_interval_in_milliseconds: 1000
    name: oracle

  - config: !Aerospike
      namespace: test
      sets: 
        - customers
        - transactions
      hosts: localhost:3000
    name: aerospike

sources:
  - name: customers
    table_name: CUSTOMERS
    schema: VAIBHAV
    connection: oracle
    columns:
      - CUSTOMER_ID
      - FIRST_NAME
      - LAST_NAME
      - CITY

  - name: transactions
    table_name: TRANSACTIONS
    schema: VAIBHAV
    connection: oracle
    columns:
      - TRANSACTION_ID
      - CUSTOMER_ID
      - TYPE
      - STATUS

sinks:
  - name: transactions
    config: !Aerospike
      connection: aerospike
      tables: 
        - namespace: test
          set_name: customers
          source_table_name: customers
        - namespace: test
          set_name: transactions
          source_table_name: transactions
          denormalize:
            - from_namespace: test
              from_set: customers
              key: CUSTOMER_ID
              columns:
                - CITY
INSERT INTO CUSTOMERS VALUES(10000, 'John', 'Doe', TO_DATE('1990-01-01', 'YYYY-MM-DD'), 'john@example.com', '1234567890', '123 Main St', 'Anytown', 'CA', '12345', 'USA');
INSERT INTO TRANSACTIONS VALUES(100000, 10000, 'Transfer', 100.00, 'USD', TO_DATE('2024-02-15', 'YYYY-MM-DD'), 'Completed', 'Transfer from savings');
COMMIT;

UPDATE CUSTOMERS
SET CITY = 'New Delhi'
WHERE CUSTOMER_ID = 10000;
COMMIT;

UPDATE TRANSACTIONS
SET STATUS = 'Pending'
WHERE TRANSACTION_ID = 100000;
COMMIT;

On executing these (for replication), one of three things happen:

This is happening even though they are separate transactions. If all the queries are made a single transaction, then the query has never executed perfectly in the limited number of times that this has been run. But even in that case, one or two updates are failing, i.e., the outcome is not fixed.

I repeated this but without denormalization. Now, the above SQL queries are being executed and replicated successfully everytime. However, when all the queries were made a single transaction, it started failing again sometimes with same error.

chubei commented 7 months ago

@Jesse-Bakker is this expected?

Jesse-Bakker commented 7 months ago

I think this happens because the operations are served by different threads and therefore don't preserve order. I have fixed that in the process of implementing #2399.

@prabhuvaibhav can you check if this happens if you set n_threads: 1 inside the sink config block?

prabhuvaibhav commented 7 months ago

I think this happens because the operations are served by different threads and therefore don't preserve order. I have fixed that in the process of implementing #2399.

@prabhuvaibhav can you check if this happens if you set n_threads: 1 inside the sink config block?

Yes, this fixes the issue. I'll test with more queries and update here.

prabhuvaibhav commented 7 months ago

The denormalized columns are not getting updated during UPDATE queries. e.g.

INSERT INTO CUSTOMERS VALUES(10000, 'John', 'Doe', TO_DATE('1990-01-01', 'YYYY-MM-DD'), 'john@example.com', '1234567890', '123 Main St', 'Anytown', 'CA', '12345', 'USA');
INSERT INTO TRANSACTIONS VALUES(100000, 10000, 'Transfer', 100.00, 'USD', TO_DATE('2024-02-15', 'YYYY-MM-DD'), 'Completed', 'Transfer from savings');

UPDATE CUSTOMERS
SET CITY = 'New Delhi'
WHERE CUSTOMER_ID = 10000;
COMMIT;

No error thrown, just that the denormalized column is not updated. In the earlier testing, only the actual column was checked which is why this was missed.

aql> SELECT * FROM test.customers WHERE CUSTOMER_ID = "10000";
+-------------+------------+-----------+-------------+
| CUSTOMER_ID | FIRST_NAME | LAST_NAME | CITY        |
+-------------+------------+-----------+-------------+
| "10000"     | "John"     | "Doe"     | "New Delhi" |
+-------------+------------+-----------+-------------+
1 row in set (0.001 secs)

OK

aql> SELECT * FROM test.transactions WHERE TRANSACTION_ID = "100000";
+----------------+-------------+------------+-------------+-----------+
| TRANSACTION_ID | CUSTOMER_ID | TYPE       | STATUS      | CITY      |
+----------------+-------------+------------+-------------+-----------+
| "100000"       | "10000"     | "Transfer" | "Completed" | "Anytown" |
+----------------+-------------+------------+-------------+-----------+
1 row in set (0.002 secs)

OK

To be clear, this is both with and without n_threads: 1.

Jesse-Bakker commented 7 months ago

That is expected and intended behavior