This project shows the basic of MapR-DB Change Data Capture (CDC) API allowing application to capture database events.
The Change Data Capture (CDC) system allows you to capture changes made to data records in MapR-DB tables (JSON or binary) and propagate them to a MapR-ES topic. These data changes are the result of inserts, updates, and deletions and are called change data records. Once the change data records are propagated to a topic, a consumer application is used to read and process them.
The steps to build the CDC application are:
In this example you will use a MapR-DB JSON table named /demo_table_json
that contains user information such as first name, last name, age, email and address. The Application (FtsAndGeoServiceWithCDC
) analyze the events coming from the changelog to post new messages when the first and/or last names or when the address have been created or modified, or when a document is deleted. The sample application flow is:
The messages posted by the application could then be consumed by new services to:
Prerequisites
This project uses the same table that the one used in the MapR-DB JSON and OJAI 2.0 example
If the table does not exist already, open a terminal window on your MapR 6.0 Cluster, and run the following command to create a JSON Table:
$ maprcli table create -path /demo_table_json -tabletype json
This command has created a new MapR-DB JSON Table, note that it is also possible to use MapR-DB CDC on binary tables, but this example is based on JSON table.
Changelog Stream
In a terminal window on your MapR 6.0 cluster run the following command to create a stream:
$ maprcli stream create -path /demo_changelog -ischangelog true -consumeperm p
This command:
/demo_changelog
ischangelog
set to true to configure the stream to store change log-consumeperm p
set the changelog consumer presentation to "public" allowing any application to subscribe to the events.Application Stream
In this application the database events are process by a consumer that publish new message on application topics that can be consumes by new services. Create the application topics:
$ maprcli stream create -path /demo_app_stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /demo_app_stream -topic fts_service -partitions 3
$ maprcli stream topic create -path /demo_app_stream -topic geo_service -partitions 3
You have created a JSON table, and changelog stream, let's now use the maprcli
command to capture the events and send them to the streams.
In a terminal enter the following command:
$ maprcli table changelog add -path /demo_table_json -changelog /demo_changelog:demo_table_json
This command associate the demo_table_json
table events with the /demo_changelog:demo_table_json
topic.
Build the application using Apache Maven:
$ mvn clean package
Copy the file to your cluster:
$ scp ./target/maprdb-cdc-sample-1.0-SNAPSHOT.jar mapr@mapr60:/home/mapr/
where mapr60 is one of the nodes of your cluster.
Run the CDC Application
$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath` com.mapr.samples.db.cdc.json.FtsAndGeoServiceJSONWithCDC
Open a new terminal and run the following commands, to create, update and delete documents
$ mapr dbshell
maprdb mapr:> insert /demo_table_json --value '{"_id":"user0010", "firstName" : "Matt", "lastName" : "Porker" , "age" : 34 }'
maprdb mapr:> update /demo_table_json --id user0010 --m '{ "$set":[ { "address":{"city":"San Jose","state":"CA","street":"320 Blossom Hill Road","zipCode":9519} }] }'
maprdb mapr:> update /demo_table_json --id user0010 --m '{ "$set":[ {"lastName":"Parker"}, { "address":{"city":"San Jose","state":"CA","street":"330 Blossom Hill Road","zipCode":9519} }] }'
maprdb mapr:> delete /demo_table_json --id user0010
Each operation made in the MapR DB Shell generates some entries in the changelog.
The first operation insert a new document, and the application capture the first and last name and post the value on the /demo_app_stream:fts_service
topic. You can see in the terminal the following message
Document Inserted "user0010"
Posting to FTS Service {"_id":"user0010","operation":"RECORD_INSERT", "type":"json","fields_to_index":{"firstName":"Matt","lastName":"Porker"}}
The next operation add the address to the user profile, doing an update do the document; the CDC application capture the address and send it to the /demo_app_stream:geo_service
.
The the operation updates two attributes, the last name and the address; the CDC application capture these changes and post messages and the two topics.
Finally the last operation deletes the document; the CDC application send a delete message with the document id on /demo_app_stream:fts_service
.
This project is an example allowing you to understand how to use MapR DB CDC feature; the following section explain the main steps to build your own project.
To use MapR-DB CDC you must add the MapR Maven Repository and the MapR OJAI Dependencies to your project
MapR Maven Repository
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
</repository>
MapR-DB CDC Dependency
<dependency>
<groupId>com.mapr.db</groupId>
<artifactId>maprdb-cdc</artifactId>
<version>6.0.0-mapr</version>
</dependency>
A MapR CDC application is built the same way than any MapR-ES application. In this example we use a Java application with a main()
method.
1- Create The Consumer
The first thing to do is to configure the consumer using Java properties. This could be externalized in a file, for simplicity reason the consumer properties are hard coded in the application code.
// Consumer configuration
Properties consumerProperties = new Properties();
consumerProperties.setProperty("group.id", "cdc.consumer.demo_table.json.fts_geo");
consumerProperties.setProperty("enable.auto.commit", "true");
consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
The properties are the same as any MapR-ES or Apache Kafka application:
group.id
that identies the group of consumerkey.deserializer
an array of bytes created by the CDC gatewayvalue.deserializer
the value deserializer, MapR CDC uses a optimized serialization format for all the events, so you must specify the com.mapr.db.cdc.ChangeDataRecordDeserializer
deserializer.Create the consumer and subscribe to the Changelog, that is a MapR-ES topic.
// Consumer used to consume MapR-DB CDC events
KafkaConsumer<byte[], ChangeDataRecord> consumer = new KafkaConsumer<byte[], ChangeDataRecord>(consumerProperties);
consumer.subscribe(Arrays.asList("/demo_changelog:demo_table_json"));
The consumer is created using a key (bytes[]) and a ChangeDataRecord
object for the value.
2- Consume the events
You can now listen to the event and process each ChangeDataRecord
.
while (true) {
ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(500);
Iterator<ConsumerRecord<byte[], ChangeDataRecord>> iter = changeRecords.iterator();
while (iter.hasNext()) {
ConsumerRecord<byte[], ChangeDataRecord> crec = iter.next();
// The ChangeDataRecord contains all the changes made to a document
ChangeDataRecord changeDataRecord = crec.value();
String documentId = changeDataRecord.getId().getString();
// process events
...
...
}
}
In the iter.hasNext()
loop, start by extracting:
ChangeDataRecord
using crec.value()
methodchangeDataRecord.getId().getString()
3- Process Change Data Records
It is now time to process the Change Data Records, based on the type of event (insert, update, delete), using the changeDataRecord.getType()
method. You can use the ChangeDataRecordType
class to check the type.
Processing Deletes
Look at the deleteDocument()
method in the sample application.
Processing a delete
is a simple operation since the operation is based a single change data record, so you can directly get the document id using changeDataRecord.getId()
and then process the document deletion.
Processing Inserts and Updates
Look at the insertAndUpdateDocument()
method in the sample application.
Document mutations are stored into a list of ChangeNodes
, that you retrieve using the following code:
// Use the ChangeNode Iterator to capture all the individual changes
Iterator<KeyValue<FieldPath, ChangeNode>> cdrItr = changeDataRecord.iterator();
while (cdrItr.hasNext()) {
Map.Entry<FieldPath, ChangeNode> changeNodeEntry = cdrItr.next();
String fieldPathAsString = changeNodeEntry.getKey().asPathString();
ChangeNode changeNode = changeNodeEntry.getValue();
...
...
}
changeDataRecord.iterator()
and loop on themchangeNodeEntry.getKey().asPathString()
ChangeNode
using changeNodeEntry.getValue()
that contains the change information.It is quite common when you want with CDC to capture the change of a subset of fields, for example in this case we just need to
loop at firstName
, lastName
and address
. To check this it depends of the type of change node operation.
Inserting a document
When inserting a new document, you have a single ChangeNode object in the iterator, the field path is empty, and the value contains the full document as a Map. To access the field names and values you can use the following logic:
if (fieldPathAsString == null || fieldPathAsString.equals("")) { // Insert
Map<String, Object> documentInserted = changeNode.getMap();
if (documentInserted.containsKey("firstName")) {
fieldToIndex.put("firstName", (String) documentInserted.get("firstName"));
sendIndexingMessage = true;
}
if (documentInserted.containsKey("lastName")) {
fieldToIndex.put("lastName", (String) documentInserted.get("lastName"));
sendIndexingMessage = true;
}
if (documentInserted.containsKey("address")) {
addressMessage.set("address", jsonMapper.convertValue((Map)documentInserted.get("address"), JsonNode.class) );
sendAddressMessage = true;
}
}
To process the document insert the code:
using changeNode.getMap()
(String) documentInserted.get("firstName")
(Map)documentInserted.get("address")
This application creates new JSON documents (fieldToIndex
, addressMessage
) to send them to another topic.
Updating a document
When updating a document, the iterator contains one ChangeNode by updated field. You can then access the field path and valye directly as follow:
if (fieldPathAsString.equalsIgnoreCase("firstName")) {
fieldToIndex.put("firstName", changeNode.getString());
sendIndexingMessage = true;
} else if (fieldPathAsString.equalsIgnoreCase("lastName")) {
fieldToIndex.put("lastName", changeNode.getString());
sendIndexingMessage = true;
} else if (fieldPathAsString.equalsIgnoreCase("address")) {
addressMessage.set("address", jsonMapper.convertValue( changeNode.getMap(), JsonNode.class) );
sendAddressMessage = true;
}
To process the document update the code:
if (fieldPathAsString.equalsIgnoreCase("firstName"))
changeNode.getString()
for the first and last namechangeNode.getMap()
for the addressWe will use the same approach for the binary table:
/demo_changelog:demo_table_binary
demo_app_stream:fts_service
For this example we will limit the events to the default:firstName
and default:lastName
columns.
$ maprcli table create -path /demo_table_binary -tabletype binary
$ maprcli table cf create -path /demo_table_binary -cfname default
You have created a JSON table, and changelog stream, let's now use the maprcli
command to capture the events and send them to the streams.
In a terminal enter the following command:
$ maprcli table changelog add -path /demo_table_binary -changelog /demo_changelog:demo_table_binary
This command associate the demo_table_binary
table events with the /demo_changelog:demo_table_binary
topic.
Build the application using Apache Maven:
$ mvn clean package
Copy the file to your cluster:
$ scp ./target/maprdb-cdc-sample-1.0-SNAPSHOT.jar mapr@mapr60:/home/mapr/
where mapr60 is one of the nodes of your cluster.
Run the CDC Application
$ java -cp maprdb-cdc-sample-1.0-SNAPSHOT.jar:`mapr clientclasspath`:`hbase classpath` com.mapr.samples.db.cdc.binary.FtsAndGeoServiceBinaryWithCDC
Open a new terminal and run the following commands, to create, update and delete rows
$ hbase shell
hbase(main):001:0> put '/demo_table_binary' , 'user010' , 'default:firstName', 'John'
hbase(main):002:0> put '/demo_table_binary' , 'user010' , 'default:lastName', 'Doe'
hbase(main):003:0> deleteall '/demo_table_binary' , 'user010'
The CDC application should print the following information:
Document Updated user010
Posting to FTS Service {"_id":"user010","operation":"RECORD_UPDATE","type":"binary","fields_to_index":{"firstName":"John"}}
Document Updated user010
Posting to FTS Service {"_id":"user010","operation":"RECORD_UPDATE","type":"binary","fields_to_index":{"lastName":"Doe"}}
Document Deleted user010
Posting to FTS Service {"_id":"user010","operation":"RECORD_DELETE"}
The code used to process binary table events is very similar to the JSON table one, and available in the FtsAndGeoServiceBinaryWithCDC.java
class.
Add the following dependency to your project:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.8-mapr-1710</version>
</dependency>
This will be used to deserialized the table content that is stored as Bytes.
// Consumer configuration
Properties consumerProperties = new Properties();
consumerProperties.setProperty("group.id", "cdc.consumer.demo_table.binary.fts");
consumerProperties.setProperty("enable.auto.commit", "true");
consumerProperties.setProperty("auto.offset.reset", "latest");
consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
// Consumer used to consume MapR-DB CDC events
KafkaConsumer<byte[], ChangeDataRecord> consumer = new KafkaConsumer<byte[], ChangeDataRecord>(consumerProperties);
consumer.subscribe(Arrays.asList("/demo_changelog:demo_table_binary"));
The properties are the same as any MapR-ES or Apache Kafka application:
group.id that identies the group of consumer key.deserializer an array of bytes created by the CDC gateway value.deserializer the value deserializer, MapR CDC uses a optimized serialization format for all the events, so you must specify the com.mapr.db.cdc.ChangeDataRecordDeserializer deserializer. Create the consumer and subscribe to the Changelog, that is a MapR-ES topic.
// Consumer used to consume MapR-DB CDC events
KafkaConsumer<byte[], ChangeDataRecord> consumer = new KafkaConsumer<byte[], ChangeDataRecord>(consumerProperties);
consumer.subscribe(Arrays.asList("/demo_changelog:demo_table_binary"));
You can now listen to the event and process each ChangeDataRecord.
while (true) {
ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(500);
Iterator<ConsumerRecord<byte[], ChangeDataRecord>> iter = changeRecords.iterator();
while (iter.hasNext()) {
ConsumerRecord<byte[], ChangeDataRecord> crec = iter.next();
// The ChangeDataRecord contains all the changes made to a document
ChangeDataRecord changeDataRecord = crec.value();
// get document ID from the binary key
String documentId = Bytes.toString(changeDataRecord.getId().getBinary().array());
...
...
}
}
In the iter.hasNext() loop, start by extracting:
crec.value()
methodchangeDataRecord.getId()
, to get the value as String you must convert it using the following code Bytes.toString(changeDataRecord.getId().getBinary().array())
It is now time to process the Change Data Records, based on the type of event (insert, update, delete), using the changeDataRecord.getType()
method. You can use the ChangeDataRecordType
class to check the type.
Processing Deletes
Look at the deleteDocument()
method in the sample application.
Processing a delete is a simple operation since the operation is based a single change data record, so you can directly get the document id using changeDataRecord.getId()
and then process the document deletion.
Processing Inserts and Updates
Look at the insertAndUpdateDocument()
method in the sample application.
Document mutations are stored into a list of ChangeNodes, that you retrieve using the following code:
// Use the ChangeNode Iterator to capture all the individual changes
Iterator<KeyValue<FieldPath, ChangeNode>> cdrItr = changeDataRecord.iterator();
while (cdrItr.hasNext()) {
Map.Entry<FieldPath, ChangeNode> changeNodeEntry = cdrItr.next();
String fieldPathAsString = changeNodeEntry.getKey().asPathString();
ChangeNode changeNode = changeNodeEntry.getValue();
// when doing an update the database event is masde of one ChangeNode by field
if (fieldPathAsString.equalsIgnoreCase("default.firstName")) { // name of the field including column family
// extract the value as a string since we know that default.firstName is a string
fieldToIndex.put("firstName", Bytes.toString(changeNode.getBinary().array()));
sendIndexingMessage = true;
} else if (fieldPathAsString.equalsIgnoreCase("default.lastName")) {
fieldToIndex.put("lastName", Bytes.toString(changeNode.getBinary().array()));
sendIndexingMessage = true;
}
}
changeNodeEntry.getKey().asPathString()
, in the contect of binary the valye is column_family:column
, for example default.firstName
.changeNodeEntry.getValue()
that contains the change information.Bytes.toString(changeNode.getBinary().array())
In this application you have learned: