onyx-platform / onyx-kafka

Onyx plugin for Kafka
Eclipse Public License 1.0
48 stars 26 forks source link

Troubleshooting simple job not writing to a topic #47

Open twashing opened 6 years ago

twashing commented 6 years ago

Hey there,

I'm playing with a recent onyx kafka plugin ("0.10.0.0-SNAPSHOT"). For some reason, I can't get Onyx to write to an output kafka topic. Code is in this git sample project, in the cloud-orchestration-2 branch. I'm using a very simple workflow below. Here, :process-commands is just Clojure's identity function.

[[:read-commands :process-commands]
 [:process-commands :write-messages]]

My setup uses docker-compose. A simple workflow to stand everything up, might look like the following.

# Build a base image
docker build -f Dockerfile.app.base -t twashing/simple-kafka-onyx-commander-base:latest .

# Bring up the composed services
docker-compose up

# In another terminal, connect to a kafka tools workbench
docker-compose exec kafka-tools /bin/bash

# Create 2 basic topics
kafka-topics --create \
             --zookeeper zookeeper:2181 \
             --topic read-messages \
             --replication-factor 1 \
             --partitions 1 \
             --config cleanup.policy=compact

kafka-topics --create \
             --zookeeper zookeeper:2181 \
             --topic write-messages \
             --replication-factor 1 \
             --partitions 1 \
             --config cleanup.policy=compact

kafka-topics --list --zookeeper zookeeper:2181

# Fire up a console producer
kafka-console-producer \
  --broker-list kafka:9092 \
  --topic read-messages \
  --property "parse.key=true" \
  --property "key.separator=,"

# Produce some messages to the topic
4b6d8c94-4674-4466-9ee2-872bf1678e78,{:foo :bar}
6af3345a-5634-4b41-9762-ce8ad6f6eaa8,{:qwerty :asdf}
310fcfe4-3cf6-4b8e-9c26-5557634dc6b3,{:thing :amabob}

# Listen to the output topic for processed messages
kafka-console-consumer --bootstrap-server kafka:9092 --topic read-messages --new-consumer --from-beginning

# Now cider-connect to localhost:5444, and run the job here:
# https://github.com/twashing/simple-kafka-onyx-commander/blob/cloud-orchestration-2/src/clojure/com/interrupt/streaming/core.clj#L192-L213

Now, the docker service logs for app and kafka look fine. But the zookeeper logs keep giving me these kind of Error:KeeperErrorCode = NoNode for /onyx/dev/... errors, when trying to make Onyx write to kafka. Googling this gave me some leads, but no answers. And producing to and consuming from the topics, using just kafka tools, works fine.

Now, I've set the number of peers for this job, to be 1, as there's only one machine running the job. But somehow I've not configured Onyx incorrectly.

I feel I've overlooked some small detail that I can't put my finger on. Probably something obvious to someone who sees it. Any ideas? Here's zookeeper's full error log, in this pastebin link.

# Presumably, somehow a consumer is trying to access a non-existant node, eventhough I specified KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 for kafka. 
# But this error appears before I start the job. I don't THINK these are related. 

zookeeper_1    | [2017-11-20 17:50:59,092] INFO Got user-level KeeperException when processing sessionid:0x15fda8266af0001 type:create cxid:0x160 zxid:0x151 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/16 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/16 (org.apache.zookeeper.server.PrepRequestProcessor)
zookeeper_1    | [2017-11-20 17:50:59,108] INFO Got user-level KeeperException when processing sessionid:0x15fda8266af0001 type:create cxid:0x163 zxid:0x154 txntype:-1 reqpath:n/a Error Path:/brokers/topics/__consumer_offsets/partitions/2 Error:KeeperErrorCode = NoNode for /brokers/topics/__consumer_offsets/partitions/2 (org.apache.zookeeper.server.PrepRequestProcessor)

# These errors appear in Zookeeper, when I run the job. Presumably Onyx is trying to run a job on a non-existant machine.
# Full output in this pastebin link: https://pastebin.com/dwjvCCsk

zookeeper_1    | [2017-11-20 18:46:22,397] INFO Got user-level KeeperException when processing sessionid:0x15fdab49b0d0008 type:create cxid:0x15 zxid:0x13c txntype:-1 reqpath:n/a Error Path:/onyx/e679ad1d-da33-480f-8a7c-d3854c799078/resume-point Error:KeeperErrorCode = NodeExists for /onyx/e679ad1d-da33-480f-8a7c-d3854c799078/resume-point (org.apache.zookeeper.server.PrepRequestProcessor)
zookeeper_1    | [2017-11-20 18:46:22,404] INFO Got user-level KeeperException when processing sessionid:0x15fdab49b0d0008 type:create cxid:0x16 zxid:0x13d txntype:-1 reqpath:n/a Error Path:/onyx/e679ad1d-da33-480f-8a7c-d3854c799078/origin/origin Error:KeeperErrorCode = NodeExists for /onyx/e679ad1d-da33-480f-8a7c-d3854c799078/origin/origin (org.apache.zookeeper.server.PrepRequestProcessor)
twashing commented 6 years ago

Also, is there an example of multiple calls to submit-job, for separate but overlapping workflows (core.async or kafka)?

So that would be i) an input core.async channel (or kafka topic) that goes into an onyx processing function which then ii) outputs to a channel (or topic), then subsequently iii) is the input to a downstream onyx processing function.

I'm trying and failing (both with kafka and core.async), and don't see any examples here.

Only 1 output path in the workflow :inputA -> :processB -> :outputC (example). And an example with the node in the middle, is just a processing function.