Open pawanjuly3 opened 6 years ago
Ok, let's assume you have a CSV file named user.csv
with three columns respectively: first_name
, last_name
, age
. And you want to push messages to Kafka where key would be some autogenerated sequence number, and value would be in format: <first_name> <last_name> - <age>
.
You can have following YAML configuration:
load-generator-configuration:
data-source-configuration-name: Ranger
rate-generator-configuration-name: default
worker-configuration-name: Kafka
metrics-reporter-configuration-name: SimpleConsoleReporter
thread-count: 4
queue-capacity: 1000
data-source-configuration:
values:
usersCsv: csv('user.csv')
kafkaMessage:
key: circular(1..1000000, 1)
value: string("{} {} - {}", get("c0", $usersCsv), get("c1", $usersCsv), get("c3", $usersCsv))
output: $kafkaMessage
rate-generator-configuration:
rates:
r: 100
output: $r
worker-configuration:
async: false
topic: topic1
producer-configuration:
bootstrap.servers: 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
For more information on Ranger CSV support, take a look here.
This fails with below error.
2018-08-19 14:24:08.309 ERROR [async-worker-thread-1] i.s.b.w.InternalWorker - Error while accepting payload at worker: io.smartcat.berserker.kafka.worker.KafkaWorker. Error: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
While my user.csv looks like: pawan,kumar,40 amit,kumar,41 raj,kumar,435
I execute it as
java -jar ./berserker-runner/target/berserker-runner-0.0.13-SNAPSHOT.jar -c ./a.yaml
Ah, sorry, my bad. Within current implementation, Kafka key needs to be of type String. Just change following line within the configuration:
key: circular(1..1000000, 1)
to
key: string("{}", circular(1..1000000, 1))
@pawanjuly3 Does this answer your question?
Thanks Vajda, This issue was resolved but messages are still not getting published. csv has only 3 records and I changed batch-size to 1. I checked Kafka even topic is not getting created.
Logs: 2018-08-29 05:52:31.806 INFO [main] o.a.k.c.u.AppInfoParser - Kafka version : 0.10.2.0 2018-08-29 05:52:31.806 INFO [main] o.a.k.c.u.AppInfoParser - Kafka commitId : 576d93a8dc0cf421 2018-08-29 05:52:31.944 INFO [main] o.r.Reflections - Reflections took 78 ms to scan 1 urls, producing 24 keys and 55 values 2018-08-29 05:52:31.983 INFO [main] i.s.b.LoadGenerator - Load generator started. 2018-08-29 05:52:31.983 INFO [main] i.s.b.LoadGenerator - Load generator started. 2018-08-29 05:52:36.987 ERROR [main] i.s.b.LoadGenerator - Terminating load generator due to error. Error: java.util.NoSuchElementException: No more CSV records available
worker-configuration: async: false topic: topic1 batch-size: 1 producer-configuration: bootstrap.servers: BuildVM:9092
Have you updated the bootstrap.servers:
section? In example above, I placed some default options and I don't expect that your Kafka cluster is on that address.
Yes. I did and tried both hostname and ip address. Magical thing is if I give wrong ip or correct IP response from the program remain same. It should throw exception if not able to connect to broker
io.smartcat.berserker.generatedThroughput count = 10 mean rate = 0.98 events/second 1-minute rate = 0.82 events/second 5-minute rate = 0.80 events/second 15-minute rate = 0.80 events/second io.smartcat.berserker.successProcessedThroughput count = 0 mean rate = 0.00 events/second 1-minute rate = 0.00 events/second 5-minute rate = 0.00 events/second 15-minute rate = 0.00 events/second io.smartcat.berserker.totalProcessedThroughput count = 0 mean rate = 0.00 events/second 1-minute rate = 0.00 events/second 5-minute rate = 0.00 events/second 15-minute rate = 0.00 events/second
Can you share your whole configuration file?
Attached
On Wed, Aug 29, 2018 at 2:03 PM, Pawan Sharma pawanjuly3@gmail.com wrote:
b.yaml : This should continue loading random rows to kafka
load-generator-configuration: data-source-configuration-name: Ranger rate-generator-configuration-name: default worker-configuration-name: Kafka metrics-reporter-configuration-name: SimpleConsoleReporter thread-count: 1 queue-capacity: 1data-source-configuration: values: id: uuid() firstName: random(['Peter', 'Mike', 'Steven', 'Joshua', 'John', 'Brandon']) lastName: random(['Smith', 'Johnson', 'Williams', 'Davis', 'Jackson', 'White', 'Lewis', 'Clark']) age: random(20..45) kafkaMessage: key: string("{}",circular(1..1000000, 1)) value: string("{}",$firstName ) topic: random(['topic1', 'topic2', 'topic3']) output: $kafkaMessagerate-generator-configuration: rates: r: 1 output: $rworker-configuration: async: false producer-configuration: bootstrap.servers: BuildVM:9092
c.yaml : this should post all 3 rows of my user.csv to kafka
load-generator-configuration: data-source-configuration-name: Ranger rate-generator-configuration-name: default worker-configuration-name: Kafka metrics-reporter-configuration-name: SimpleConsoleReporter thread-count: 1 queue-capacity: 1
data-source-configuration: values: usersCsv: csv('./user.csv') kafkaMessage: key: string("{}", circular(1..1000000, 1)) value: string("{} {} - {}", get("c0", $usersCsv), get("c1", $usersCsv), get("c3", $usersCsv)) output: $kafkaMessage
rate-generator-configuration: rates: r: 1 output: $r
worker-configuration: async: false topic: topic1 batch-size: 1 producer-configuration: bootstrap.servers: BuildVM:9092
On Wed, Aug 29, 2018 at 1:22 PM, Vladimir Vajda notifications@github.com wrote:
Can you share your whole configuration file?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/smartcat-labs/berserker/issues/58#issuecomment-416860006, or mute the thread https://github.com/notifications/unsubscribe-auth/AH1yskTxOFMihVtZQmaHf39bhSt4lcUoks5uVkhZgaJpZM4WCoH- .
It's a bit hard to follow configuration in this format. Can you paste it within code formatting in the comment? I started one-node Kafka cluster on my localhost and tried running Berserker with following configuration:
load-generator-configuration:
data-source-configuration-name: Ranger
rate-generator-configuration-name: default
worker-configuration-name: Kafka
metrics-reporter-configuration-name: SimpleConsoleReporter
thread-count: 5
queue-capacity: 100
data-source-configuration:
values:
usersCsv: csv('user.csv')
kafkaMessage:
key: string('{}', circular(long(1)..long(10000000000), long(1)))
value: string("{} {} - {}", get("c0", $usersCsv), get("c1", $usersCsv), get("c2", $usersCsv))
output: $kafkaMessage
rate-generator-configuration:
rates:
r: 10
output: $r
worker-configuration:
async: false
topic: t1
producer-configuration:
bootstrap.servers: localhost:9092
metrics-reporter-configuration:
domain: berserker
filter:
user.csv file has this content:
pawan,kumar,40
amit,kumar,41
raj,kumar,435
I have run console consumer just to check what gets written in Kafka:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1
And I can confirm it works.
How are you testing Kafka cluster to see if messages are written or not? Can you compare your approach with this one? Maybe some small error has slipped.
Ok, I tried files you sent me and I think file uses wrong encoding or first character in several rows is by some other mean non-standard. Can you investigate this? Use plain simple text editor and file?
I need sample yaml for CSV to Kafka load and a bit of description of the configuration