osalvador / ReplicaDB

ReplicaDB is open source tool for database replication, designed for efficiently transferring bulk data between relational and non-relational databases
https://osalvador.github.io/ReplicaDB/
Apache License 2.0
407 stars 96 forks source link

deleted records aren't getting deleted from the sink tables #23

Closed taher843 closed 3 years ago

taher843 commented 3 years ago

deleted records aren't getting deleted from the sink tables. Oracle to MS SQL Server. output is normal without and warning or error. Update is working though.

Version - 0.8.9

sourceConnect='jdbc:oracle:thin:@localhost:1521:TEST',
sourceUser='sys as sysdba',
sourcePassword='',
sourceTable='sys.employee',
sourceColumns='empname,empid',
sourceWhere='null',
sourceQuery='null',
sinkConnect='jdbc:sqlserver://check.database.windows.net:1433;database=rep',
sinkUser='dbuser',
sinkPassword='',
sinkTable='dbo.employee',
sinkStagingTable='null',
sinkStagingSchema='dbo',
sinkStagingTableAlias='null',
sinkColumns='empname,empid',
sinkDisableEscape=false,
sinkDisableIndex=false,
sinkDisableTruncate=false,
sinkAnalyze=false,
jobs=1,
bandwidthThrottling=0,
quotedIdentifiers=false,
fetchSize=100,
help=false,
version=false,
verbose=true,
optionsFile='employee.conf',
mode='incremental',
sourceConnectionParams={source.connect.parameter.defaultRowPrefetch=5000, source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}, source.connect.parameter.oracle.net.networkCompression=on},
sinkConnectionParams={}}
osalvador commented 3 years ago

Hi @taher843 ,

ReplicaDB replicates a snapshot of the data from a source table, so we cannot get DELETE events. Some strategies involve saving DELETE transactions to another table, based on a trigger from the source table, but all this logic has to be implemented and maintained. This is a Trigger-based CDC strategy.

At ReplicaDB we are working on a Log-based CDC with Debezium implementation for continuous replication of all data events: INSERT, UPDATE and DELETE. You can check it out in our last commit b2fe6e7

Regards.

taher843 commented 3 years ago

Okay so this CDC with Debezium, is this available for use yet ? The issue is, with 0.8.8 i can insert, update and delete. But with 0.8.9 it doesn't work. I wanted to use .0.8.8 but that needs a identity column present in the SQL server table. Any way I can come around without having to use identity column ?

osalvador commented 3 years ago

Hi @taher843,

CDC is not yet ready for production.

To test your use case, can you provide me with the following data?

Thanks!

taher843 commented 3 years ago

Pretty simple for POC.

Source Oracle Table

CREATE TABLE data (name varchar2(40),id int);
insert into data (name,id) values ('Pete',1);

Target SQL Server sink Table

CREATE TABLE [dbo].[data](
    [id] [int] NOT NULL,
    [name] [nchar](10) NULL,
 CONSTRAINT [PK_data] PRIMARY KEY CLUSTERED 
(
    [id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

INSERT State from SQL Server

INSERT INTO [dbo].[data]
           ([id]
           ,[name])
     VALUES
           (<id, int,>
           ,<name, nchar(10),>)
GO

ReplicaDB Config

#####ReplicadB General Options ########################
mode=incremental
jobs=1
fetch.size=100
verbose=true
############################# Soruce Options ##############################
source.connect=jdbc:oracle:thin:@localhost:1521:TEST
source.user=sys as sysdba
source.password=OraPasswd1
source.table=sys.data
source.columns=name,id
source.connect.parameter.source.connect.parameter.oracle.net.tns_admin=${TNS_ADMIN}
source.connect.parameter.source.connect.parameter.oracle.net.networkCompression=on
source.connect.parameter.source.connect.parameter.defaultRowPrefetch=5000
############################# Sink Options ################################
sink.connect=jdbc:sqlserver://sqlserver.database.windows.net:1433;database=rep
sink.user=user
sink.password=password@12
sink.table=dbo.data
sink.staging.schema=dbo
sink.columns=name,id
osalvador commented 3 years ago

Hi @taher843,

Everything works fine.

2021-03-10 12:17:10,706 INFO  ReplicaDB:42 Running ReplicaDB version: 0.8.9
2021-03-10 12:17:10,711 INFO  ReplicaDB:46 Setting verbose mode
2021-03-10 12:17:10,711 DEBUG ReplicaDB:47 ToolOptions{
    sourceConnect='jdbc:oracle:thin:@localhost:1521:XE',
    sourceUser='system',
    sourcePassword='****',
    sourceTable='system.data',
    sourceColumns='name,id',
    sourceWhere='null',
    sourceQuery='null',
    sinkConnect='jdbc:sqlserver://localhost:1433;database=master',
    sinkUser='sa',
    sinkPassword='****',
    sinkTable='dbo.data',
    sinkStagingTable='null',
    sinkStagingSchema='dbo',
    sinkStagingTableAlias='null',
    sinkColumns='name,id',
    sinkDisableEscape=false,
    sinkDisableIndex=false,
    sinkDisableTruncate=false,
    sinkAnalyze=false,
    jobs=1,
    bandwidthThrottling=0,
    quotedIdentifiers=false,
    fetchSize=100,
    help=false,
    version=false,
    verbose=true,
    optionsFile='/Users/osalvador/Documents/GitHub/ReplicaDB/test/replicadb-ora2sqlserver.conf',
    mode='incremental',
    sourceConnectionParams={},
    sinkConnectionParams={}}
2021-03-10 12:17:10,723 DEBUG ManagerFactory:41 Trying with scheme: jdbc:oracle:thin:@localhost:1521
2021-03-10 12:17:10,725 DEBUG ManagerFactory:41 Trying with scheme: jdbc:sqlserver:
2021-03-10 12:17:10,911 DEBUG SqlManager:269 No connection parameters specified. Using regular API for making connection.
2021-03-10 12:17:11,790 INFO  SQLServerManager:130 Creating staging table with this command:  SELECT name,id INTO dbo.datarepdb4010 FROM dbo.data WHERE 0 = 1
2021-03-10 12:17:11,829 INFO  SqlManager:386 Truncating sink table with this command: TRUNCATE TABLE dbo.datarepdb4010
2021-03-10 12:17:11,869 INFO  ReplicaTask:36 Starting TaskId-0
2021-03-10 12:17:11,869 DEBUG ManagerFactory:41 Trying with scheme: jdbc:oracle:thin:@localhost:1521
2021-03-10 12:17:11,869 DEBUG ManagerFactory:41 Trying with scheme: jdbc:sqlserver:
2021-03-10 12:17:11,869 DEBUG SqlManager:212 No connection parameters specified. Using regular API for making connection.
2021-03-10 12:17:12,613 DEBUG SqlManager:269 No connection parameters specified. Using regular API for making connection.
2021-03-10 12:17:12,970 DEBUG SqlManager:131 TaskId-0: Using fetchSize for next query: 100
2021-03-10 12:17:12,974 INFO  SqlManager:141 TaskId-0: Executing SQL statement: SELECT /*+ NO_INDEX(system.data)*/ name,id FROM system.data where 0 = ?
2021-03-10 12:17:12,975 INFO  SqlManager:148 TaskId-0: With args: 0,
2021-03-10 12:17:13,173 INFO  SQLServerManager:95 Perfoming BulkCopy into dbo.datarepdb4010
2021-03-10 12:17:13,324 INFO  SQLServerManager:46 IF OBJECTPROPERTY(OBJECT_ID('dbo.data'), 'TableHasIdentity') = 1 SET IDENTITY_INSERT dbo.data ON
2021-03-10 12:17:13,420 INFO  SqlManager:331 Getting PKs for schema: dbo and table: data. Found.
2021-03-10 12:17:13,421 INFO  SQLServerManager:192 Merging staging table and sink table with this command: MERGE INTO dbo.data trg USING (SELECT name,id FROM dbo.datarepdb4010 ) src ON  (src.id= trg.id ) WHEN MATCHED THEN UPDATE SET  trg.name = src.name  WHEN NOT MATCHED THEN INSERT ( name,id ) VALUES ( src.name , src.id  );
2021-03-10 12:17:13,466 INFO  SQLServerManager:46 IF OBJECTPROPERTY(OBJECT_ID('dbo.data'), 'TableHasIdentity') = 1 SET IDENTITY_INSERT dbo.data OFF
2021-03-10 12:17:13,485 INFO  SqlManager:475 Dropping staging table with this command: DROP TABLE dbo.datarepdb4010
2021-03-10 12:17:13,501 INFO  ReplicaDB:124 Total process time: 2823ms

Maybe I didn't understand your problem. ReplicaDB with incremental mode never performs a DELETE on the sink database. If you need to have an exact snapshot of the source table in the sink table, you need to do a complete one, which deletes (TRUNCATE) all the data from the sink table and loads it with all the data from the source table. You can read how the replication modes work in the documentation: https://osalvador.github.io/ReplicaDB/docs/docs.html#21-replication-mode

Regards.

taher843 commented 3 years ago

I think delete was working with Incremental mode in 0.8.8 but it required a Identity column and I can't use identity column. Let me play around. Thank you so much for you efforts. You Rock!