xcherryio / xcherry

server and main repo of xCherry project
Apache License 2.0
26 stars 1 forks source link

Add Pulsar Debezium connectors for Postgres #25

Closed longquanzheng closed 1 year ago

longquanzheng commented 1 year ago

I used command like this to test the CDC + Pulsar is working:

psql -h 127.0.0.1 -U xdb
Password for user xdb:
psql (16.0, server 14.2 (Debian 14.2-1.pgdg110+1))
Type "help" for help.

xdb=# insert into test2 values(123);
ERROR:  duplicate key value violates unique constraint "test2_pkey"
DETAIL:  Key (process_id)=(123) already exists.
xdb=# insert into test2 values(125);
INSERT 0 1
xdb=#

and see

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk17.0.7.jdk/Contents/Home bin/pulsar-client consume -s "sub-test2" public/default/dbserver1.public.test2 -n 0

2023-09-28T22:01:08,489-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x0e45528f, L:/127.0.0.1:54069 - R:localhost/127.0.0.1:6650]] Connected to server
2023-09-28T22:01:08,595-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["public/default/dbserver1.public.test2"],"topicsPattern":null,"subscriptionName":"sub-test2","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
2023-09-28T22:01:08,601-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":8,"numListenerThreads":8,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":"","tlsCertificateFilePath":"","tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":"","tlsKeyStorePassword":"*****","tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
2023-09-28T22:01:08,613-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.public.test2][sub-test2] Subscribing to topic on cnx [id: 0x0e45528f, L:/127.0.0.1:54069 - R:localhost/127.0.0.1:6650], consumerId 0
2023-09-28T22:01:08,623-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.public.test2][sub-test2] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
----- got message -----
key:[eyJwcm9jZXNzX2lkIjoxMjV9], properties:[], content:{"before":null,"after":{"process_id":125},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1695963685010,"snapshot":"false","db":"xdb","sequence":"[\"36651400\",\"36651944\"]","schema":"public","table":"test2","txId":771,"lsn":36651944,"xmin":null},"op":"c","ts_ms":1695963685360,"transaction":null}

and

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk17.0.7.jdk/Contents/Home bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0

2023-09-28T21:52:25,675-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xf72ec72a, L:/127.0.0.1:53965 - R:localhost/127.0.0.1:6650]] Connected to server
2023-09-28T21:52:25,776-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["public/default/dbserver1.inventory.products"],"topicsPattern":null,"subscriptionName":"sub-products","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
2023-09-28T21:52:25,783-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":8,"numListenerThreads":8,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":"","tlsCertificateFilePath":"","tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":"","tlsKeyStorePassword":"*****","tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
2023-09-28T21:52:25,809-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xf72ec72a, L:/127.0.0.1:53965 - R:localhost/127.0.0.1:6650], consumerId 0
2023-09-28T21:52:25,821-0700 [pulsar-client-io-1-3] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0

----- got message -----
key:[eyJpZCI6MTA3fQ==], properties:[], content:{"before":{"id":107,"name":"1111111136","description":"box of assorted rocks","weight":5.3},"after":{"id":107,"name":"1111111137","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1695963154808,"snapshot":"false","db":"xdb","sequence":"[\"36651128\",\"36651184\"]","schema":"inventory","table":"products","txId":769,"lsn":36651184,"xmin":null},"op":"u","ts_ms":1695963155073,"transaction":null}

I will start writing code against this tomorrow

codecov[bot] commented 1 year ago

Welcome to Codecov :tada:

Once merged to your default branch, Codecov will compare your coverage reports and display the results in this comment.

Thanks for integrating Codecov - We've got you covered :open_umbrella: