fkie-cad / Logprep

log data pre processing, generation and shipping in python
https://logprep.readthedocs.io/en/latest/
GNU Lesser General Public License v2.1
30 stars 8 forks source link

raise FatalOutputError serializing of pipeline object #529

Closed djkhl closed 8 months ago

djkhl commented 8 months ago

Placeholder Issue

opensearchpy.exceptions.SerializationError: ({'error': '<logprep.framework.pipeline.Pipeline object at 0x7f118d9f1710>', 'original': KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: public.bw.monitoring.event.legacy: Broker: Unknown topic or partition"}, 'processed': {}, '@timestamp': '2024-02-20T10:58:10.420849+00:00'}, TypeError('Encoding objects of type cimpl.KafkaError is unsupported'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap

self.run()

File "/usr/local/lib/python3.11/multiprocessing/process.py", line 108, in run

self._target(*self._args, **self._kwargs)

File "/opt/venv/lib/python3.11/site-packages/logprep/framework/pipeline.py", line 236, in run

self._shut_down()

File "/opt/venv/lib/python3.11/site-packages/logprep/framework/pipeline.py", line 313, in _shut_down

self._input.shut_down()

File "/opt/venv/lib/python3.11/site-packages/logprep/connector/confluent_kafka/input.py", line 498, in shut_down

self._consumer.close()

File "/opt/venv/lib/python3.11/site-packages/logprep/connector/confluent_kafka/input.py", line 474, in _revoke_callback

self.output_connector._write_backlog()

File "/opt/venv/lib/python3.11/site-packages/logprep/metrics/metrics.py", line 207, in inner

result = func(self, *args, **kwargs)

         ^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/opt/venv/lib/python3.11/site-packages/logprep/connector/elasticsearch/output.py", line 309, in _write_backlog

self._bulk(

File "/opt/venv/lib/python3.11/site-packages/logprep/connector/opensearch/output.py", line 123, in _bulk

self._handle_serialization_error(error)

File "/opt/venv/lib/python3.11/site-packages/logprep/connector/elasticsearch/output.py", line 349, in _handle_serialization_error

raise FatalOutputError(self, f"{error.args[1]} in document {error.args[0]}")

logprep.abc.output.FatalOutputError: FatalOutputError in OpensearchOutput (opensearch) - Opensearch Output: ['zeus-opensearch.core:9200']: Encoding objects of type cimpl.KafkaError is unsupported in document {'error': '<logprep.framework.pipeline.Pipeline object at 0x7f118d9f1710>', 'original': KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: public.bw.monitoring.event.legacy: Broker: Unknown topic or partition"}, 'processed': {}, '@timestamp': '2024-02-20T10:58:10.420849+00:00'}

2024-02-20 11:02:20,625 Logprep PipelineManager INFO : Created new pipeline

ekneg54 commented 8 months ago

514 ??

clumsy9 commented 8 months ago

The KafkaError objects which cannot be encoded by msgspec.json.Encoder are embedded into CriticalInputError exceptions which are raised by _get_raw_event of ConfluentKafkaInput:

 kafka_error = message.error()
 if kafka_error:
         raise CriticalInputError(
                self, "A confluent-kafka record contains an error code", kafka_error
         )

As CriticalInputError has no type annotation for raw_input, and the exception is serialized anyway, we could directly transform the KafkaError object into it's string representation:

raise CriticalInputError(self, "A confluent-kafka record contains an error code", str(kafka_error))

Another solution would be to extend the default encoder in ConfluentKafkaInput so that all KafkaError are transformed into a serializable type.

ekneg54 commented 8 months ago

thank you for your investigation on this. I would prefer the first solution as it seems pretty easy to implement.