scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
46 stars 22 forks source link

kafka sink connection is slower in writing in DB #34

Closed dhgokul closed 2 months ago

dhgokul commented 3 years ago

Used MQTT - Malaria to publish the message to Kafka, The MQTT takes only 1 min to publish 4 million records from MQTT to Kafka but in Scylla insertion, it takes 10 mins.

Why its 10 times slower?

Is there any way to make it faster?

avelanarius commented 3 years ago

You should try to improve the performance by:

dhgokul commented 3 years ago

No performance increment, Changed the partition topic to 10 and task.mx =12 but still slower when writing to the scylla

avelanarius commented 3 years ago

Could you provide some more details about the node size of your Scylla cluster? What is the number of shards/vCPUs in that cluster, was smp configuration modified? What is the CPU/disk usage of it (is some core pegged at 100%)?

Could it be that you have many writes to a single partition key - causing a hot partition in Scylla? (You could use this tool: https://docs.scylladb.com/operating-scylla/nodetool-commands/toppartitions/ to check it. Note that it works "in real-time", not after the fact)

dhgokul commented 3 years ago

Machine type e2-standard-16 (16 vCPUs, 64 GB memory) -HDD

3 Node cluster

Moderately using the processor.

While using sink connection taking 30-35 mins for dumbing 6 Million records, i.e 3000-4000 records per second.

Is this Kafka-Scylla-sink connection performance is normal or it needs to be enhanced for better performance?

We tried increasing and changing many methods/configurations in the Kafka server. But in end, it takes above mentioned time dilation.

dhgokul commented 3 years ago

After upgrade from HDD to SSD disk, we achieve 12 million data in 19 minutes with machine configuration (e2-standard-16 (16 vCPUs, 64 GB memory) . Attached log for your reference

We are monitoring scylla using nodetool command nodetool toppartitions mqtest topic1 5000 and nodetool tablehistograms mqtest topic1 (Took log when write data 12 million data's) -PFA Scylla nodes using local SSD disks. MQTT proxy -> Confluent kafka producer time (2.8 minutes) for 12 million data Confluent kafka producer -> kafka sink scylla connector -> dump in scylla DB took 19 minutes Above logs are confirmed using confluent kafka control center image image

dhgokul commented 3 years ago

Messages reached Confluent-Kafka from MQTT-Malaria within 3 mins using Confluent-Kafka-MQTT-Proxy, but still, 6 times [19 mins]slower for dumbing records to Scylla using sink connection in Kafka.

avelanarius commented 3 years ago

Created a Pull Request #36, that improves the way we insert rows to Scylla. I believe it will help the performance and scalability in this issue.

dhgokul commented 3 years ago

Have to check the above method. Have another doubt, the machine we use 16 core process. Scylla-shard is using 2 cpu only shard Is this setup correct!.

Tried to increase using below method: https://docs.scylladb.com/getting-started/scylla_in_a_shared_environment/

Got an Error while starting the Scylla error_shards

avelanarius commented 3 years ago

What installation method of Scylla did you use in the beginning? (For example in: https://www.scylladb.com/download/#server, we have installers for specific Linux distributions, Docker and Unified Installer)

Could you post the output of running hwloc-ls command? (you might need to install a package to have this command on your system)

dhgokul commented 3 years ago

Installation Method: Linux distribution

Result of running hwloc-ls in scylla machine: Screenshot from 2021-04-21 14-13-41

avelanarius commented 3 years ago

What are the contents of your /etc/scylla.d/cpuset.conf file? Did you manually change it or run scylla_cpuset_setup?

Also your systemctl status scylla-server.service screenshot is truncated (e.g. full command line options are not visible).

Could you post the full log? (piping journalctl into file). That way we could determine if there is also --cpuset command line option there.

dhgokul commented 3 years ago

The default config is left unchanged in /etc/scylla.d/cpuset.conf which is mentioned below

# DO NO EDIT
# This file should be automatically configure by scylla_cpuset_setup
#
# CPUSET="--cpuset 0 --smp 1"
CPUSET="--cpuset 0-1 "

log is attached as text file below, when tried to increase the shard scylla_log.txt

Also Tried the latest Kafka-Scylla-sink Connection of this repository [Removal of CQL batching from connector]. But the performance not tunned up. Same time its taking to complete the process.

avelanarius commented 3 years ago

So /etc/scylla.d/cpuset.conf file with --cpuset 0-1 causes this problem. Scylla tries to launch only on cores 0, 1 - not utilizing all other cores.

A temporary fix would be to change this file to:

CPUSET="--cpuset 0-15 "

I am now trying to determine why the setup configured it that way. Unfortunately, I have not been able to reproduce this issue myself, even on similar Google Cloud instances.

It would be very helpful if you could describe:

  1. What Linux distribution version are you running? (What image did you select in Google Cloud?)
  2. What types of disks and how many did you attach to this instance? (Balanced persistent disk? SSD persistent disk? Standard persistent disk? Local SSD?)
  3. Did you run scylla_setup? Did it complete 100% successfully?
dhgokul commented 3 years ago

1) Ubuntu 18.04 Scylla 4.3 (Latest)

Tested withCPUSET="--cpuset 0-10 " and CPUSET="--cpuset 0-15 "

and cpu info are shared below for CPUSET="--cpuset 0-15 " scylla- master: scylla-1

scylla-1: scylla-2

scylla-2: scylla-3

But result are same as previous of using 0-1 cpu for both CPUSET="--cpuset 0-10 " and CPUSET="--cpuset 0-15 " tesst i.e : avg 8.5k records per sec only.

Inference of defaut cpu config:

2021-04-19 test

Inference of 0-10 cpu config: 2021-04-23 test

Inference of 0-15 cpu config:

2021-04-23-15-test

2) SSD persistent disk

3) Its completed 100%

Is there any benchmark test done on using Kafka-Scylla-sink-connector? Since there is no idea on how much we are lagging in performance.

avelanarius commented 3 years ago

I did a performance test today trying to replicate your environment (same instance types etc.) and I was easily able to achieve 90000 msg/s replicated to Scylla.

Here's how I configured it, so you can compare it with your setup and determine what's different. I tried to make it as similar to your setup according to the information you gave us:

Configuration

I started 4 e2-standard-16 instances. 3 of them used for Scylla, 1 used for Kafka (Confluent Platform). All instances were configured with Ubuntu 20.04 LTS (ubuntu-2004-focal-v20210415), 64 GB SSD persistent disk. Note that Scylla does NOT recommend using SSD persistent disks - they have worse performance than SSD local NVMe disks (those are available on GCP). Best to my knowledge, persistent disk IOPS is related to its size (so smaller disks have slower performance). If I were to guess, if you started GCP instances with smaller disks that could be a reason for slower performance. Just to be thorough: those instances were all started in the same us-east1-b zone. GCP assigned Haswell CPU to those started instances.

I installed Scylla 4.4.1 on 3 instances, sticking to those install instructions for Ubuntu 20.04: https://www.scylladb.com/download/#server

I created the following schema:

CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE ks.mytopic (
    pk text,
    ck text,
    v1 text,
    v2 text,
    PRIMARY KEY (pk, ck)
);

I then installed Confluent Platform on the fourth instance. I installed Confluent 6.1.1 and started it in "local" mode:

bin/confluent local services start

I then created mytopic topic with 10 partitions. I did not use "MQTT - Malaria" to insert the data, but I did write a script myself to insert the data to Kafka. The data is inserted in JSON format. The data in those messages is 100 bytes large (saw that size on one of your screenshots) - 4 fields of 25 bytes. Here's an example message I inserted:

Key:

{"pk":"0834588735394723782854083","ck":"1278455771788978695108659"}

Value:

{"pk":"0834588735394723782854083","ck":"1278455771788978695108659","v1":"4780395579276126211315500","v2":"1765827918224656554653845"}

Here's a script that I used to generate:

import random
import string

for i in range(20000000):
    pk = ''.join(random.choice(string.digits) for i in range(25))
    ck = ''.join(random.choice(string.digits) for i in range(25))
    v1 = ''.join(random.choice(string.digits) for i in range(25))
    v2 = ''.join(random.choice(string.digits) for i in range(25))
    print(f'{{"pk":"{pk}","ck":"{ck}"}}${{"pk":"{pk}","ck":"{ck}","v1":"{v1}","v2":"{v2}"}}')

I piped it into kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=$" > /dev/null, thereby inserting 20 million messages to Kafka (3.93GB of data).

Only after that, I started the connector. The connector will therefore catch up on those 20 million messages. This is a time I measured to determine messages per second.

I started the connector with the following configuration:

{
  "name": "mytopic_connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "10",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "topics": "mytopic",
    "scylladb.contact.points": "10.240.0.73",
    "scylladb.port": "9042",
    "scylladb.keyspace": "ks",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

The connector replicated all those 20M messages in about ~4 minutes. The metrics in Confluent Control Center said it was consuming the messages 90k per second.

Comparing with cassandra-stress

For comparison, I ran cassandra-stress to insert similar data to Scylla. cassandra-stress started with 32 threads was able to insert 64k rows per second. When started with 256 threads, it is able to insert 180k rows per second. I would consider those numbers the upper limit of operations per second that the connector could achieve.

You could run it to benchmark your Scylla cluster directly (to better locate where are the bottlenecks).

Here's the commands I used:

contents of test_profile.yaml:

keyspace: ks
table: t
keyspace_definition: |
  CREATE KEYSPACE IF NOT EXISTS ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
table_definition: |
  CREATE TABLE ks.t (
    pk text,
    ck text,
    v1 text,
    v2 text,
    PRIMARY KEY (pk, ck))
columnspec:
  - name: pk
    population: uniform(1..1B)
    size: fixed(25)
  - name: ck
    population: uniform(1..1B)
    cluster: fixed(1)
    size: fixed(25)
  - name: v1
    size: fixed(25)
  - name: v2
    size: fixed(25)
insert:
  partitions: fixed(1)
  batchtype: UNLOGGED
  select: fixed(1)/1
queries:
  dummy:
    cql: select * from ks.t where pk = ?
    fields: samerow
cassandra-stress user profile="test_profile.yaml ops(insert=1) cl=QUORUM n=100000000" -port jmx=6868 -mode cql3 nativ
e -rate threads=256 -log level=verbose interval=5 -errors retries=999 ignore -node 10.240.0.73

/cc @haaawk @mailmahee - any further suggestions?

dhgokul commented 3 years ago

How to pipe the script to console producer? Is that we have write the output of python script to text file and have to pass like below

kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=$" > /home/msg.txt

https://grokbase.com/t/kafka/users/157b71babg/kafka-producer-input-file

in above link they suggested '<' instead of '>'

But in this one. kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=$" > **/home/msg.txt** , after executing this one, again its asking from terminal only , the message from the file is not written, and also the text file messages are deleted and '>' symbol only exists in that file.

So tried the link, but in that got an error as below:

>>>>>>>org.apache.kafka.common.KafkaException: No key found on line 7: 
    at kafka.tools.ConsoleProducer$LineMessageReader.readMessage(ConsoleProducer.scala:290)
    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)
    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

my msgs.txt file contains following data:

{"pk":"5383050633966007251767624","ck":"2351157012664789339162111"}${"pk":"5383050633966007251767624","ck":"2351157012664789339162111","v1":"1817516021340158203567587","v2":"2910965515187087901012931"}
{"pk":"6994899163657923407089875","ck":"5289415245892327891422090"}${"pk":"6994899163657923407089875","ck":"5289415245892327891422090","v1":"0417224890743411888265830","v2":"9922395026236272145178648"}
{"pk":"6374381658119263847927150","ck":"2726060561016862847553900"}${"pk":"6374381658119263847927150","ck":"2726060561016862847553900","v1":"0907483986010632348576128","v2":"6449874579723786924708913"}
{"pk":"8470349218715220430052984","ck":"3934975212135931424421814"}${"pk":"8470349218715220430052984","ck":"3934975212135931424421814","v1":"6628719981145888385828291","v2":"1946994662626878622031433"}
{"pk":"9027299519289724274975092","ck":"4995944009183369868274592"}${"pk":"9027299519289724274975092","ck":"4995944009183369868274592","v1":"5557268425012649342769310","v2":"2843189179918797221756187"}
{"pk":"8981545887416410257532957","ck":"1002198171808576158529184"}${"pk":"8981545887416410257532957","ck":"1002198171808576158529184","v1":"0658244957680336832084997","v2":"0698490331806279932790852"}
avelanarius commented 3 years ago

How to pipe the script to console producer?

Assuming the script is saved to gen.py:

python3 gen.py > msg.txt
bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=$" > /dev/null < msg.txt
Bouncheck commented 2 months ago

Closing as it seems it was environment problem