confluentinc / bottledwater-pg

Change data capture from PostgreSQL into Kafka
http://blog.confluent.io/2015/04/23/bottled-water-real-time-integration-of-postgresql-and-kafka/
Apache License 2.0
2 stars 0 forks source link

Deleted rows problem #137

Closed jmedinillaparadigma closed 7 years ago

jmedinillaparadigma commented 7 years ago

Hello,

I'm trying to write a java consumer for the Kafka queue but I'm unable to get the row id for the deleted rows.

This is all the info I'm getting with my Java client (tablea is my topic name):

(inserting row) ------> key: tablea ######### Value:{0=[{"id": 49, "name": "name c"}]} (deleting row) ------> key: tablea ######### Value:{0=[null]}

However, using the kafka-avro-console-consumer I'm getting the info properly:

(inserting row) ------> {"id":{"int":49}} {"id":{"int":49},"name":{"string":"name c"}} (deleting row) ------> {"id":{"int":49}} null

As you can see, I'm not getting the row id for the deleted rows.

Can anyone help me out with this, please?

Thank you,

This is my code:

@Bean
IntegrationFlow consumer() {

        Properties properties = new Properties();
        properties.put("schema.registry.url", "http://172.17.0.1:48081");

    VerifiableProperties vProps = new VerifiableProperties(properties);
    KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka
            .inboundChannelAdapter(new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
            .consumerProperties(
                    props -> props.put("auto.offset.reset", "smallest")
                                    .put("auto.commit.interval.ms", "100")
                                    )

            .addConsumer("myGroup",
                    metadata -> metadata.consumerTimeout(100)
                            .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1)).maxMessages(10)
                            .valueDecoder(new KafkaAvroDecoder(vProps))
                            .keyDecoder(new KafkaAvroDecoder(vProps))
                            );

    Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

    return IntegrationFlows.from(messageSourceSpec, endpointConfigurer)
            .<Map<String, List<String>>> handle((payload, headers) -> handleMessage( payload, headers)).get();
}

private Map<String, List<String>> handleMessage(Map<String, List<String>> payload, Map<String, Object> headers) {
    payload.entrySet().forEach(e -> handle(e));
    return null;
}

private void handle(Entry<String, List<String>> e) {
    log.info("------> key: " + e.getKey() + "######### Value:" + e.getValue());
}
jmedinillaparadigma commented 7 years ago

Hello,

This problem was related with the version of the kafka libraries I was using in my implementation.

Thank you,