Logminer Kafka Connect is a CDC Kafka Connect source for Oracle Databases (tested with Oracle 11.2.0.4).
Changes are extracted from the Archivelog using Oracle Logminer.
Stable features:
Unstable features:
Planned features:
You can install this Kafka Connect component into Kafka Connect using the Confluent Hub Client. Additionally, you need to download the Oracle JDBC driver and put it on the classpath of Logminer Kafka Connect. The JDBC driver can't be included in the Logminer Kafka Connect release as its license does not allow this.
The following script will install Logminer Kafka Connect into an existing Kafka Connect installation:
wget https://github.com/thake/logminer-kafka-connect/releases/download/0.4.0/thake-logminer-kafka-connect-0.4.0.zip
confluent-hub install ./thake-logminer-kafka-connect-0.4.0.zip --no-prompt
rm ./thake-logminer-kafka-connect-0.4.0.zip
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/19.7.0.0/ojdbc8-19.7.0.0.jar -o /usr/share/confluent-hub-components/thake-logminer-kafka-connect/lib/ojdbc8-19.7.0.0.jar
If you plan to run Logminer Kafka Connect as a container, you can also have a look at the docker image at https://github.com/thake/logminer-kafka-connect-image
In order for Logminer Kafka Connect to work, the database needs to be in ARCHIVELOG mode and Supplemental Logging needs to be enabled with all columns. Here are the commands that need to be executed in sqlplus:
prompt Shutting down database to activate archivelog mode;
shutdown immediate;
startup mount;
alter database archivelog;
prompt Archive log activated.;
alter database add supplemental log data (all) columns;
prompt Activated supplemental logging with all columns.;
prompt Starting up database;
alter database open;
If the start SCN is not set or set to 0, logminer kafka connect will query the configured tables for an initial import. During the initial import, no DDL statements should be executed against the database. Otherwise the initial import will fail.
To support initial import, database flashback queries need to be enabled on the source database.
All rows that are in the table at time of initial import will be treated as "INSERT" changes.
The change types are compatible to change types published by the debezium (https://debezium.io/) project. Thus it is easy to migrate to the official debezium Oracle plugin ones it reaches a stable state. The key of the kafka topic will be filled with a struct containing the primary key values of the changed row.
The value is a structure having the following fields:
op
before
Image of the row before the operation has been executed. Contains all columns.
after
Image of the row after the operation has been executed. Contains all columns.
ts_ms
Timestamp of import as millis since epoch
source
Additional information about this change record from the source database.
The following source fields will be provided:
version
connector
ts_ms
scn
txId
table
schema
user
You can find an example configuration under logminer-kafka-connect.properties.
The following configuration parameter are available:
db.hostname
Database hostname
db.name
Logical name of the database. This name will be used as a prefix for
the topic. You can choose this name as you like.
db.port
Database port (usually 1521)
db.sid
Database SID
db.user
Database user
db.user.password
Database password
db.logminer.dictionary
Type of logminer dictionary that should be used.
Valid values: ONLINE, REDO_LOG
db.timezone
The timezone in which TIMESTAMP columns (without any timezone information) should be interpreted as. Valid values are all values that can be passed to https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html#of-java.lang.String-
batch.size
Batch size of rows that should be fetched in one batch
start.scn
Start SCN, if set to 0 an initial intake from the tables will be
performed.
table.whitelist
Tables that should be monitored, separated by ','. Tables have to be
specified with schema. Table names are case-sensitive (e.g. if your table name is an unquoted identifier, you'll need to specify it in all caps).
You can also just specify a schema to indicate
that all tables within that schema should be monitored. Examples:
'MY_USER.TABLE, OTHER_SCHEMA'.
tombstones.on.delete
If set to false, no tombstone records will be emitted after a delete operation.
db.fetch.size
JDBC result set prefetch size. If not set, it will be defaulted to
batch.size. The fetch should not be smaller than the batch size.
db.attempts
Maximum number of attempts to retrieve a valid JDBC connection.
db.backoff.ms
Backoff time in milliseconds between connection attempts.
poll.interval.ms
Positive integer value that specifies the number of milliseconds the
connector should wait after a polling attempt didn't retrieve any
results.
Due to limitations of Oracle Logminer, it is not possible to track the UPDATE statements to existing records that are implicitly performed whenever a new not null column with a default value will be added to a table.
However you can change the way you add these columns in order to correctly record the UPDATE statements. Instead of doing everything in one command, one could separate it into the following steps: