EthicalML / kafka-spark-streaming-zeppelin-docker

One click deploy docker-compose with Kafka, Spark Streaming, Zeppelin UI and Monitoring (Grafana + Kafka Manager)
MIT License
120 stars 74 forks source link

Networking Issues: Zeppelin cannot fetch dependencies #3

Open gregbrowndev opened 4 years ago

gregbrowndev commented 4 years ago

Hi, and thank you for this repo it is incredibly useful!

However, I'm having problems running your notebook. I'm not really sure what exactly the problem is (so sorry from the ramblings below) as I've tried a number of different things and getting different issues even after reverting all changes back to your exact code.

Initially, upon executing the notebook, everything apart from the consumer seemed to work. I could see messages being written to the Kafka topic in Grafana. However, the consumer complained that it didn't have the Kafka dependency available.

I found that the Kafka JAR hadn't been downloaded, as indicated in the image below of Zeppelin's interpreter page: image

I couldn't even wget the JAR from inside the Zeppelin container (I believe the request timedout). However, now (after reverting my changes) it seems I can wget in the container, but Zeppelin is still unable to fetch them itself.

Putting aside the network issue, I've tried to manually add the dependency using:

%sh
mkdir /zeppelin/dep
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8_2.11/2.0.2/spark-streaming-kafka-0-8_2.11-2.0.2.jar

%producer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

but now the producer times out when connecting to Kafka:

KafkaTimeoutErrorTraceback (most recent call last)
<ipython-input-6-49d1d1bf1849> in <module>()
     20         topic=KAFKA_TOPIC,
     21         key=str(row_dict["_c0"]).encode("utf-8"),
---> 22         value=json.dumps(row_dict).encode("utf-8"))
     23 
     24     try:

/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in send(self, topic, value, key, headers, partition, timestamp_ms)
    562         key_bytes = value_bytes = None
    563         try:
--> 564             self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
    565 
    566             key_bytes = self._serialize(

/opt/conda/lib/python2.7/site-packages/kafka/producer/kafka.pyc in _wait_on_metadata(self, topic, max_wait)
    689             if not metadata_event.is_set():
    690                 raise Errors.KafkaTimeoutError(
--> 691                     "Failed to update metadata after %.1f secs." % (max_wait,))
    692             elif topic in self._metadata.unauthorized_topics:
    693                 raise Errors.TopicAuthorizationFailedError(topic)

KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

This is all on a fresh set of containers (docker-compose down && docker-compose up -d).

Any help would be greatly appreciated!

gregbrowndev commented 4 years ago

I think the Kafka servers may have gotten in a twist due to excessively bringing them up/down. Deleting the kafka/kafka_zookeeper Docker volumes seems to have fixed the KafkaTimeoutError raised by the producer but now I'm getting an error again with the consumer:

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving

Py4JErrorTraceback (most recent call last)
<ipython-input-8-63c862b08c08> in <module>()
     17                             ssc,
     18                             [REDDIT_TOPIC],
---> 19                             {"metadata.broker.list": KAFKA_BROKERS})
     20 
     21 stream = stream.map(lambda x: json.loads(x[1]))

/zeppelin/interpreter/spark/pyspark/pyspark.zip/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
    128             func = funcWithoutMessageHandler
    129             jstream = helper.createDirectStreamWithoutMessageHandler(
--> 130                 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
    131         else:
    132             ser = AutoBatchedSerializer(PickleSerializer())

/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/zeppelin/interpreter/spark/pyspark/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/zeppelin/interpreter/spark/pyspark/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    325             raise Py4JError(
    326                 "An error occurred while calling {0}{1}{2}".
--> 327                 format(target_id, ".", name))
    328     else:
    329         type = answer[1]

Py4JError: An error occurred while calling o66.createDirectStreamWithoutMessageHandler

Note this is using my code above to add the JAR to the consumer.dep interpreter.

I've seen some other people with what looks like the same problem (see https://stackoverflow.com/questions/58975545/azure-databricks-kafkautils-createdirectstream-causes-py4jnetworkerroranswer) and they've suggested upgrading the JAR. I've tried editing the Zeppelin interpreters to use the newer "spark-streaming-kafka-0-8_2.11-2.4.5.jar" but this results in the same set of issues.

dotdothu commented 4 years ago

I am facing with the same issues. Have you resolved it @gregbrowndev yet?

dotdothu commented 4 years ago

Using the assembly solved the issue:

%sh
cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.0.2/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar
%consumer.dep
z.reset()
z.load("/zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar")
gregbrowndev commented 4 years ago

@dotdothu sorry for the late reply! Glad you managed to fix it.

I added the Jar manually, just like you did, but never quite got it completely working. I still had issues with the consumer: "Py4JError: An error occurred while calling o66.createDirectStreamWithoutMessageHandler"

puneethrai commented 3 years ago

@gregbrowndev I added your library files to spark dep. below is my entire operation

%sh mkdir /zeppelin/dep cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.0.2/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar cd /zeppelin/dep && wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-8_2.11/2.0.2/spark-streaming-kafka-0-8_2.11-2.0.2.jar cp /zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar /zeppelin/interpreter/spark/dep cp /zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /zeppelin/interpreter/spark/dep

and

%producer.dep z.reset() z.load("/zeppelin/dep/spark-streaming-kafka-0-8_2.11-2.0.2.jar")

%consumer.dep z.reset() z.load("/zeppelin/dep/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar")