datastax / cdc-apache-cassandra

Datastax CDC for Apache Cassandra
Apache License 2.0
35 stars 21 forks source link

Implement cdc backfill skeleton #109

Closed aymkhalil closed 1 year ago

aymkhalil commented 1 year ago

Initial skeleton for the CDC backfill CLI tool:

./gradlew cdc-backfill-client:assemble
java  -jar cdc-backfill-client/build/libs/cdc-backfill-client-2.2.3-all.jar backfill --data-dir=target/export --export-host=127.0.0.1:9042 --export-username=cassandra --export-password=cassandra --keyspace ks1 --table sample1
aymkhalil commented 1 year ago

Sample run

16:59:28.931 INFO  Contacting origin cluster...
16:59:30.372 WARN  [] 127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:30.668 WARN  [] 127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:30.928 INFO  Successfully contacted origin cluster
16:59:30.928 INFO  Tables to migrate:
16:59:30.931 INFO  - ks1.sample1 (regular)
16:59:33.226 INFO  Exporting ks1.sample1...
16:59:33.527 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
16:59:33.535 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Operation directory: /Users/ayman.khalil/workplace/datastax/cdc-apache-cassandra/logs/EXPORT_ks1_sample1_20230214_005933_227
16:59:33.601 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.805 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.802 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.810 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.802 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.804 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.810 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.831 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.831 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:34.439 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Continuous paging is not available, read performance will not be optimal. Check your remote DSE cluster configuration, and ensure that the configured consistency level is either ONE or LOCAL_ONE.
16:59:34.831 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Operation EXPORT_ks1_sample1_20230214_005933_227 completed successfully in less than one second.
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Checkpoints for the current operation were written to checkpoint.csv.
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] To resume the current operation, re-run it with the same settings, and add the following command line flag:
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] --dsbulk.log.checkpoint.file=/Users/ayman.khalil/workplace/datastax/cdc-apache-cassandra/logs/EXPORT_ks1_sample1_20230214_005933_227/checkpoint.csv
16:59:37.036 INFO  Export of ks1.sample1 finished with STATUS_OK
16:59:37.099 INFO  errorCommitLogReprocessEnabled=false, pulsarServiceUrl=pulsar://localhost:6650/, pulsarAuthParams=null, cdcConcurrentProcessors=-1, sslKeystorePath=null, sslTruststorePath=null, tlsTrustCertsFilePath=null, sslTruststoreType=JKS, sslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1, sslCipherSuites=null, sslKeystorePassword=null, pulsarBatchDelayInMs=-1, cdcPollIntervalMs=60000, cdcWorkingDir=null/cdc, pulsarAuthPluginClassName=null, pulsarMaxPendingMessagesAcrossPartitions=50000, sslHostnameVerificationEnable=false, topicPrefix=events-, maxInflightMessagesPerTask=16384, sslAllowInsecureConnection=false, useKeyStoreTls=false, sslProvider=null, sslTruststorePassword=null, pulsarKeyBasedBatcher=false, pulsarMaxPendingMessages=1000
16:59:37.708 WARN  Can not find org.apache.pulsar.shade.io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
16:59:37.807 INFO  Pulsar client connected
16:59:37.918 INFO  InetAddress.getLocalHost() was used to resolve listen_address to akhalil-rmbp16/192.168.1.64, double check this is correct. Please check your node's config and set the listen_address in cassandra.yaml accordingly if applicable.
16:59:38.143 INFO  Pulsar producer name=cdc-producer-null-ks1.sample1 created with batching delay=-1ms
16:59:38.263 INFO  [[id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]] Connected to server
16:59:38.384 INFO  Starting Pulsar producer perf with config: {
  "topicName" : "events-ks1.sample1",
  "producerName" : "cdc-producer-null-ks1.sample1",
  "sendTimeoutMs" : 0,
  "blockIfQueueFull" : true,
  "maxPendingMessages" : 1000,
  "maxPendingMessagesAcrossPartitions" : 50000,
  "messageRoutingMode" : "CustomPartition",
  "hashingScheme" : "Murmur3_32Hash",
  "cryptoFailureAction" : "FAIL",
  "batchingMaxPublishDelayMicros" : 1000,
  "batchingPartitionSwitchFrequencyByPublishDelay" : 10,
  "batchingMaxMessages" : 1000,
  "batchingMaxBytes" : 131072,
  "batchingEnabled" : false,
  "chunkingEnabled" : false,
  "compressionType" : "NONE",
  "initialSequenceId" : null,
  "autoUpdatePartitions" : true,
  "autoUpdatePartitionsIntervalSeconds" : 60,
  "multiSchema" : true,
  "accessMode" : "Shared",
  "properties" : { }
}
16:59:38.408 INFO  Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650/",
  "authPluginClassName" : null,
  "authParams" : null,
  "authParamMap" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : false,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "enableBusyWait" : false,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : null,
  "tlsTrustStorePassword" : null,
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "memoryLimitBytes" : 0,
  "proxyServiceUrl" : null,
  "proxyProtocol" : null,
  "enableTransaction" : false
}
16:59:38.429 INFO  [events-ks1.sample1] [cdc-producer-null-ks1.sample1] Creating producer on cnx [id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]
16:59:38.494 INFO  [events-ks1.sample1] [cdc-producer-null-ks1.sample1] Created producer on cnx [id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]
16:59:38.525 WARN  Failed to load Circe JNI library. Falling back to Java based CRC32c provider
16:59:38.575 INFO  sent 8 records to Pulsar
16:59:38.587 INFO  Migration finished with 1 successfully migrated tables, 0 failed tables.
16:59:38.588 INFO  Table ks1.sample1 migrated successfully.
aymkhalil commented 1 year ago

OSS license

Can anyone use this tool or do they need a license for DSBulk ?

Yes. DSBulk switched to Apache License V2 few years ago and permits distribution of the software.

Cassandra compatibility

we must support C_3, C_4 and DSE, IIRC you mentioned that we need different dependencies. That's not a big deal, but do you know if DSBulk will work with all the supported versions ?

Yes, dsbulk supports DSE and C2.1 and later. That covers the table export part. There is also a C*/DSE version dependent logic which translating the schema of the PK to AVRO before sending records to the events topic. That part will be covered by reusing the agent code. In this patch I hardcoded it to use DSE. With some refactoring, we should be able to fully reuse the agent code so the net result, C_3, C_4, DSE will be supported

Dry run mode

We should add a "--dry-run" mode in which we read from Cassandra without writing to Pulsar

+1. I'll capture this requirement

Disk space and retention

We should make it clear that we are storing locally some files, for two reasons:

  1. The local machine must have enough space
  2. The user MUST remember to delete everything, because it may contain sensitive data

+1. Will add to read me. Please note that we can add a flag to automatically delete the local snapshot after the backfilling completes, but we need to highlight that enough disk space should be available. We can consider optimization later on on sending data in batches, delete them from disk, and then continue with other batches which could be over engineering for now.

I suggest to add a flag to auto-delete the data in the end of the procedure (and enable the feature by default)

Yes!

what happens if you run the tool on a non empty directory and you have data from other tables ?

Each table will be stored in its own directory following keyspace/tablename heirarcy. If the user have data in the directory with the same keyspace and table name, but those don't actually belong to the table, the tool will not know. Now if the PK mismatches, publishing to events topic would fail, but otherwise the tool doesn't know that the data has been manipulated. One we to mitigate, would be to generate random foldername and a new snapshot every time the tool is run, but that would complicate the capability to resume work after the tool is paused because it will require a manifest file (and the user can still manipulate the file). Encryption is another thing we can consider but for V0 I think the existing model suffices.