SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

Unable to write into multiple partitions in a topic #89

Closed srkpers closed 2 years ago

srkpers commented 2 years ago

We have configured HANA source connector to read from a HANA table in bulk mode and write into a Kafka Topic. We created the topic with 3 partitions and configured partition count and max tasks as 3 in the source connector properties file. All the records are going into a single partition in the topic. This table has about 6 million rows and all of them went into partition 0. We are also using the schema registry. 3 tasks got created and each one is reading 100 rows at a time and all 3 tasks are writing into single partition in the topic. Here is the content of the properties file related with partition and tasks parameters.

"tasks.max": 3, "hana-sampletable.partition.count": 3

Is there any config we are missing in order to write the messages into all the 3 partitions?

elakito commented 2 years ago

Hi, The source connector distributes records across multiple topic partitions based on the partitions of the source table itself. Therefore, when property partition is set to greater than 1 and the records are placed to a single partition, it is suspected that table is not partitioned or the connector is not retrieving the partitions info correctly.

To investigate, could you run the below SQL commands against your HANA instance, where xxxxx should be replaced with the schema and table name of your table.

SELECT TABLE_NAME, PART_ID, PARTITION FROM SYS.M_CS_PARTITIONS WHERE (SCHEMA_NAME='xxxxx' AND TABLE_NAME='xxxxx');

SELECT TABLE_NAME, IS_PARTITIONED FROM SYS.M_TABLES WHERE (SCHEMA_NAME='xxxxx' AND TABLE_NAME='xxxxx');

SELECT TABLE_NAME, PART_ID FROM SYS.M_CS_TABLES WHERE (SCHEMA_NAME='xxxxx' AND TABLE_NAME='xxxxx');

SELECT TABLE_NAME, PART_ID FROM SYS.TABLE_PARTITIONS WHERE (SCHEMA_NAME='xxxxx' AND TABLE_NAME='xxxxx');

regards, aki

srkpers commented 2 years ago

Hello elakito, Thank you for the input. The table is not partitioned. I will run a test against a partitioned table and will get back with the results.

Best Regards, srk

srkpers commented 2 years ago

Hello elakito, I ran a test with partitioned HANA table and it did write into multiple Kafka partitions. One thing I observed is it is writing into single Kafka partition at a time. If I increase the number of tasks then entire HANA table is getting written into each Kafka partition. Is there a way to configure multiple tasks where by each task is reading from a HANA partition and writing into a Kafka partition which will give good throughput.

elakito commented 2 years ago

@srkpers There seems to be a bug at the multiple task handling. We'll have a look into this problem and fix it.

elakito commented 2 years ago

We just merged the code that fixes the task distribution so that tasks are grouped and distributed to the available workers without duplicates. So, I am closing this ticket.

Regarding your original question of polling a single partitioned table into multiple partitions, it does not make sense to dispatch multiple workers to poll from the single partition because that will create more contention and slow them down. If your objective is to have the records distributed to multiple kafka partitions, it would be more appropriate to use a Single Message Transformer (SMT) to set the target partition in the record.

srkpers commented 2 years ago

Hi Elakito, Thank you for merging the code. I used the latest code and ran a test with multiple tasks. There is no duplication of data but noticed all the data is going into a single topic partition. This test involved a HANA table which has 17 partitions. I was expecting the messages will go to multiple topic partitions. Could you please share any test scenarios which were ran against the latest code which involved multiple HANA partitions and multiple Topic partitions and if the messages were going into multiple topic partitions?

elakito commented 2 years ago

Hi @srkpers, could you check if the target topic has multiple partitions? (e.g. kafka-topics.sh --bootstrap-server ... --describe --topic xxx). If you are creating a topic from the source connector automatically, there is no property at the source connector to specify the number of partitions to be created and this number is determined by the value set at the Kafka broker (property num.partitions, which is 1 by default). Currently, property {topic}.partition.count at the source connector is only used to distribute the records across the existing partitions and not used for creating a topic. In other words, it should work if you create a topic beforehand with the desired number of partitions (e.g. kafka-topics.sh --bootstrap-server ... --create --topic xxx --replication-factor 3 --partitions 17 ...) but not if the topic is created automatically by the source connector when the broker's num.partitions is set to 1. This is a limitation. Please feel free to open an issue for it. regards, aki

srkpers commented 2 years ago

Hi @elakito, I am able to auto create the topic with multiple partitions using the connector with below parameter in the connector configuration. "topic.creation.default.partitions": "3" I am using Confluent platform and using the Confluent connect cluster to run the SAP HANA Source connector. So the topic is being created by the connector with multiple partitions but all the data from HANA table from multiple tasks is going into a single partition.

Regards,

elakito commented 2 years ago

Hi @srkpers, If the Kafka partitions exist and the source table contains data in multiple partitions, the data should be transferred to the multiple kafka partitions. When using a slightly modified sample of examples/persons1db where the connector properties are modified with the following values.

         "tasks.max": "2",
         "topics": "test_topic_tp",
         "test_topic_tp.partition.count": "2",
         "test_topic_tp.table.name": "\"TESTER\".\"PERSONS1P2\"",
         "topic.creation.default.partitions": "2",
        "topic.creation.default.replication.factor": "1"

And the source HANA table PERSONS1P2 has two partitions with some data in both partitions.

SELECT * FROM PERSONS1P2 PARTITION (1);

PERSONID  LASTNAME  FIRSTNAME
--------  --------  ---------
       3  simpson   bart 
SELECT * FROM PERSONS1P2 PARTITION (2);

PERSONID  LASTNAME  FIRSTNAME
--------  --------  ---------
       1  simpson   homer    
       2  simpson   merge    

And Kafka topic has been created with two partitions that can be seen by

[kafka@60593bad953d ~]$ bin/kafka-topics.sh --bootstrap-server kafka:9092 --topic test_topic_tp --describe
Topic: test_topic_tp    TopicId: 0bjiekJyTXWeFyVA3zt9pw PartitionCount: 2   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: test_topic_tp    Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: test_topic_tp    Partition: 1    Leader: 1   Replicas: 1 Isr: 1

Consumer-1 consuming from topic partition 0

[kafka@60593bad953d ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test_topic_tp --partition 0 --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"PERSONID"},{"type":"string","optional":true,"field":"LASTNAME"},{"type":"string","optional":true,"field":"FIRSTNAME"}],"optional":false,"name":"testerpersons1p2"},"payload":{"PERSONID":1,"LASTNAME":"simpson","FIRSTNAME":"homer"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"PERSONID"},{"type":"string","optional":true,"field":"LASTNAME"},{"type":"string","optional":true,"field":"FIRSTNAME"}],"optional":false,"name":"testerpersons1p2"},"payload":{"PERSONID":2,"LASTNAME":"simpson","FIRSTNAME":"merge"}}

Consumer-2 consuming from topic partition 1

[kafka@60593bad953d ~]$ bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test_topic_tp --partition 1 --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"PERSONID"},{"type":"string","optional":true,"field":"LASTNAME"},{"type":"string","optional":true,"field":"FIRSTNAME"}],"optional":false,"name":"testerpersons1p2"},"payload":{"PERSONID":3,"LASTNAME":"simpson","FIRSTNAME":"bart"}}

If you observe that all data are transferred to only one partition, the possible cause could be

In order to exclude these cases, please provide the output from the following

  1. verify the topic information bin/kafka-topics.sh ... --describe --topic $kafka_topic where $kafka_topic is the target kafka topic

  2. run the following SQL commands to verify the data are stored in multiple partitions

    SELECT COUNT(*) FROM $SOURCE_TABLE PARTITION (1);
    SELECT COUNT(*) FROM $SOURCE_TABLE PARTITION (2);
    ...

    where $SOURCE_TABLE is the source HANA table

  3. invoke the following SQL to retrieve the partition information of the source table

    SELECT TABLE_NAME, PARTITION FROM SYS.M_CS_PARTITIONS WHERE TABLE_NAME = '$SOURCE_TABLE'

    where $SOURCE_TABLE is the source HANA table

For example, for the above sample, the below statement should print

SELECT TABLE_NAME, PARTITION FROM SYS.M_CS_PARTITIONS WHERE TABLE_NAME = 'PERSONS1P2'

TABLE_NAME  PARTITION
----------  ---------
PERSONS1P2          1
PERSONS1P2          2

If you could verify the above information for your table, that would be helpful in determining why all data are transferred to one partition in your scenario. Maybe, in your case, the topic partitions exist but either the data is not stored in multiple partitions or the metadata lookup didn't return the partition info.

thanks. aki