acryldata / datahub-actions

DataHub Actions is a framework for responding to changes to your DataHub Metadata Graph in real time.
42 stars 47 forks source link

fix: improve kafka consumer robustness #74

Closed hsheth2 closed 1 year ago

hsheth2 commented 1 year ago

When using the Docker Desktop "pause" feature, kafka sometimes disconnects and causes the kafka consumer to exit permanently. This adds some error handling so that it continues to retry after a consume error.

Example error:

2023-01-18 20:58:01 Exception in thread Thread-1 (run_pipeline):
2023-01-18 20:58:01 Traceback (most recent call last):
2023-01-18 20:58:01   File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
2023-01-18 20:58:02     self.run()
2023-01-18 20:58:02   File "/usr/local/lib/python3.10/threading.py", line 953, in run
2023-01-18 20:58:02     self._target(*self._args, **self._kwargs)
2023-01-18 20:58:02   File "/usr/local/lib/python3.10/site-packages/datahub_actions/pipeline/pipeline_manager.py", line 42, in run_pipeline
2023-01-18 20:58:02     pipeline.run()
2023-01-18 20:58:02   File "/usr/local/lib/python3.10/site-packages/datahub_actions/pipeline/pipeline.py", line 166, in run
2023-01-18 20:58:02     for enveloped_event in enveloped_events:
2023-01-18 20:58:02   File "/usr/local/lib/python3.10/site-packages/datahub_actions/plugin/source/kafka/kafka_event_source.py", line 154, in events
2023-01-18 20:58:02     msg = self.consumer.poll(timeout=2.0)
2023-01-18 20:58:02   File "/usr/local/lib/python3.10/site-packages/confluent_kafka/deserializing_consumer.py", line 131, in poll
2023-01-18 20:58:02     raise ConsumeError(msg.error(), kafka_message=msg)
2023-01-18 20:58:02 confluent_kafka.error.ConsumeError: KafkaError{code=_TIMED_OUT_QUEUE,val=-166,str="FindCoordinator response error: Local: Timed out in queue"}
github-actions[bot] commented 1 year ago

Unit Test Results (build & test)

63 tests  ±0   63 :heavy_check_mark: ±0   3s :stopwatch: ±0s   1 suites ±0     0 :zzz: ±0    1 files   ±0     0 :x: ±0 

Results for commit 6b6c0b27. ± Comparison against base commit c658fedd.