Closed ksingh7 closed 4 years ago
This is my secor docker
sudo docker run \
-e DEBUG=true \
-e ZOOKEEPER_PATH=/ \
-e ZOOKEEPER_QUORUM=secor_zookeeper_1:2181 \
-e KAFKA_SEED_BROKER_HOST=kafka \
-e KAFKA_SEED_BROKER_PORT=29092 \
-e AWS_ACCESS_KEY=<Access_key> \
-e AWS_SECRET_KEY=<Secret_key> \
-e AWS_REGION=ap-south-1 \
-e SECOR_S3_BUCKET=my-kafka-backups-ks \
-e SECOR_GROUP=raw_logs \
-e KAFKA_OFFSETS_STORAGE=zookeeper \
-e SECOR_MAX_FILE_SECONDS=10 \
-e SECOR_MAX_FILE_BYTES=100 \
-e SECOR_WRITER_FACTORY=com.pinterest.secor.io.impl.DelimitedTextFileReaderWriterFactory \
--network secor_beast \
--link secor_zookeeper_1 \
--link secor_kafka_1 \
karansingh/secor-0.29
I am able to see that secor is consusuming Kafka messages but throwing errors like
2020-06-22 15:37:08,044 [Thread-3] (com.pinterest.secor.common.ZookeeperConnector) WARN path /consumers/raw_logs/offsets/my-topic/0 does not exist in zookeeper
2020-06-22 15:37:08,122 [Thread-3] (com.pinterest.secor.common.OffsetTracker) INFO starting to consume topic my-topic partition 0 from offset 0
2020-06-22 15:37:08,125 [Thread-3] (com.pinterest.secor.consumer.Consumer) WARN Failed to parse message Message{topic='my-topic', kafkaPartition=0, offset=0, kafkaKey=, payload=kafkacat, timestamp=1592839972506, headers=[]}
java.lang.NullPointerException
at com.pinterest.secor.parser.ThriftMessageParser.extractTimestampMillis(ThriftMessageParser.java:85)
at com.pinterest.secor.parser.TimestampedMessageParser.getTimestampMillis(TimestampedMessageParser.java:155)
at com.pinterest.secor.parser.TimestampedMessageParser.extractPartitions(TimestampedMessageParser.java:185)
at com.pinterest.secor.parser.MessageParser.parse(MessageParser.java:62)
at com.pinterest.secor.consumer.Consumer.consumeNextMessage(Consumer.java:229)
at com.pinterest.secor.consumer.Consumer.run(Consumer.java:163)
@HenryCaiHaiying can you help me point what am i missing ?
It's a bit weird for that NPE, it seems indicating either message is null or mDeserializer is null, if you trace the call stack, it didn't seem that either can be null.
Can you attach a debugger or modify the code to print out the variable content to validate which variable is null?
timestamp = mDeserializer.partialDeserializeI64(message.getPayload(), mThriftPath);
@HenryCaiHaiying Can you guide me who and where i need to add this line in order to attach debugger. Unfortunately, i am not competent in Java, so will have hard time figure that out.
But i am happy to help, if you can provide me instructions.
If you haven't done java debugging before, it might be easier just modify the source code to add System.out.print in various places to print out the content of the variables and build/compile the source code and deploy/run.
If you want to go into java remote debugging, it's about launching the java program (secor) using command line args, e.g.: java -Xdebug -Xrunjdwp:transport=dt_socket,address=5555,server=y,suspend=y Test, and use a IDE (eclipse or IntelliJ) to attach to port 5555, more in: https://stackify.com/java-remote-debugging/#:~:text=Java%20Platform%20Debugging%20Architecture%20(JPDA,a%20running%20Java%20application%20remotely.
@HenryCaiHaiying thanks for your suggestion, after enabling logs, i was able to look for problem, after using the correct version of Hadoop binaries, i was able to move Kafka messages to AWS S3 buckets.
Sector is able to read the messages but throwing NullPointerException, (see below)
I have produced some messages using Kafka cat and i can verify the message as shown below
Can you guide me if there has to be a specific format that needs to be used while messages are generated into Kafka topic, such that sector can move them to S3 ? I am doing a Proof of Concept , looks like i am very close but still missing certain detail.s
I definitely like to document my learnings in a step by step blog post so that others should not fumble to get secor working ;)
Need your help right now.