datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.8k stars 2.89k forks source link

Kafka ingester doesn't permit multiple protobuf schemas with the same message name #9604

Closed tylerbrandt closed 6 months ago

tylerbrandt commented 9 months ago

Describe the bug In our system (RedPanda) we have one Schema Registry entity per topic, and many of those registry entities define a protobuf message with the same package models and name RawEvent. Additionally, we use the Confluent topic naming strategy in order to allow for easy changes in the future to different topic schemas without having to worry about changes to the schema in one topic affecting the other topics in the future. This works fine in RedPanda because those message are defined only in the context of the individual schema. However, when we ingest this into Datahub, we get the error duplicate symbol 'models.RawEvent' (see stack trace below).

2024-01-06T16:00:36-08:00   [2024-01-07 00:00:36,010] ERROR    {datahub.entrypoints:199} - Command failed: Couldn't build proto file into descriptor pool: duplicate symbol 'models.RawEvent'
2024-01-06T16:00:36-08:00   Traceback (most recent call last):
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/entrypoints.py", line 186, in main
2024-01-06T16:00:36-08:00       sys.exit(datahub(standalone_mode=False, **kwargs))
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
2024-01-06T16:00:36-08:00       return self.main(*args, **kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1055, in main
2024-01-06T16:00:36-08:00       rv = self.invoke(ctx)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
2024-01-06T16:00:36-08:00       return _process_result(sub_ctx.command.invoke(sub_ctx))
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
2024-01-06T16:00:36-08:00       return _process_result(sub_ctx.command.invoke(sub_ctx))
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
2024-01-06T16:00:36-08:00       return ctx.invoke(self.callback, **ctx.params)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/core.py", line 760, in invoke
2024-01-06T16:00:36-08:00       return __callback(*args, **kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/click/decorators.py", line 26, in new_func
2024-01-06T16:00:36-08:00       return f(get_current_context(), *args, **kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/telemetry/telemetry.py", line 379, in wrapper
2024-01-06T16:00:36-08:00       raise e
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/telemetry/telemetry.py", line 334, in wrapper
2024-01-06T16:00:36-08:00       res = func(*args, **kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/utilities/memory_leak_detector.py", line 95, in wrapper
2024-01-06T16:00:36-08:00       return func(ctx, *args, **kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/cli/ingest_cli.py", line 198, in run
2024-01-06T16:00:36-08:00       loop.run_until_complete(run_func_check_upgrade(pipeline))
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
2024-01-06T16:00:36-08:00       return future.result()
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/cli/ingest_cli.py", line 158, in run_func_check_upgrade
2024-01-06T16:00:36-08:00       ret = await the_one_future
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/cli/ingest_cli.py", line 149, in run_pipeline_async
2024-01-06T16:00:36-08:00       return await loop.run_in_executor(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
2024-01-06T16:00:36-08:00       result = self.fn(*self.args, **self.kwargs)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/cli/ingest_cli.py", line 140, in run_pipeline_to_completion
2024-01-06T16:00:36-08:00       raise e
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/cli/ingest_cli.py", line 132, in run_pipeline_to_completion
2024-01-06T16:00:36-08:00       pipeline.run()
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/run/pipeline.py", line 365, in run
2024-01-06T16:00:36-08:00       for wu in itertools.islice(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 109, in auto_stale_entity_removal
2024-01-06T16:00:36-08:00       for wu in stream:
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 133, in auto_workunit_reporter
2024-01-06T16:00:36-08:00       for wu in stream:
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 146, in auto_materialize_referenced_tags
2024-01-06T16:00:36-08:00       for wu in stream:
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 60, in auto_status_aspect
2024-01-06T16:00:36-08:00       for wu in stream:
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/source/kafka.py", line 205, in get_workunits_internal
2024-01-06T16:00:36-08:00       yield from self._extract_record(t, t_detail, extra_topic_details.get(t))
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/source/kafka.py", line 235, in _extract_record
2024-01-06T16:00:36-08:00       schema_metadata = self.schema_registry_client.get_schema_metadata(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/source/confluent_schema_registry.py", line 362, in get_schema_metadata
2024-01-06T16:00:36-08:00       schema, fields = self._get_schema_and_fields(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/source/confluent_schema_registry.py", line 249, in _get_schema_and_fields
2024-01-06T16:00:36-08:00       fields = self._get_schema_fields(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/source/confluent_schema_registry.py", line 289, in _get_schema_fields
2024-01-06T16:00:36-08:00       fields = protobuf_util.protobuf_schema_to_mce_fields(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/extractor/protobuf_util.py", line 75, in protobuf_schema_to_mce_fields
2024-01-06T16:00:36-08:00       descriptor: FileDescriptor = _from_protobuf_schema_to_descriptors(
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/datahub/ingestion/extractor/protobuf_util.py", line 256, in _from_protobuf_schema_to_descriptors
2024-01-06T16:00:36-08:00       return grpc.protos(main_schema.name).DESCRIPTOR
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/grpc/_runtime_protos.py", line 100, in protos
2024-01-06T16:00:36-08:00       return _call_with_lazy_import("protos", protobuf_path)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/grpc/_runtime_protos.py", line 62, in _call_with_lazy_import
2024-01-06T16:00:36-08:00       return fn(protobuf_path)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/grpc_tools/protoc.py", line 82, in _protos
2024-01-06T16:00:36-08:00       module = importlib.import_module(module_name)
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
2024-01-06T16:00:36-08:00       return _bootstrap._gcd_import(name[level:], package, level)
2024-01-06T16:00:36-08:00     File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
2024-01-06T16:00:36-08:00     File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
2024-01-06T16:00:36-08:00     File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
2024-01-06T16:00:36-08:00     File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
2024-01-06T16:00:36-08:00     File "/usr/local/lib/python3.10/site-packages/grpc_tools/protoc.py", line 142, in exec_module
2024-01-06T16:00:36-08:00       exec(files[-1][1], module.__dict__)
2024-01-06T16:00:36-08:00     File "<string>", line 16, in <module>
2024-01-06T16:00:36-08:00   TypeError: Couldn't build proto file into descriptor pool: duplicate symbol 'models.RawEvent'

To Reproduce

  1. Set up the registry with SubjectNameStrategy set to TopicNameStrategy
  2. Define two topics, A and B
  3. Define A-values and B-values schemas with identical contents (critically, package and message names)
  4. Attempt to ingest into Datahub using the Kafka crawler. See an error like the one above referring to the type name defined in (3)

Expected behavior Should ingest successfully, since the type namespace is the topic, and there is no collision within a given topic.

Screenshots N/A

Desktop (please complete the following information): N/A

Additional context This problem probably exists for Avro as well, but we haven't tested it.

github-actions[bot] commented 8 months ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

tylerbrandt commented 8 months ago

We're using 0.10.2 but it looks like it is probably still an issue in the head of the repo, based on the way the GRPC code works by importing using grpc.protos which uses importlib under the hood and that isn't "isolated" in any way.

github-actions[bot] commented 7 months ago

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] commented 6 months ago

This issue was closed because it has been inactive for 30 days since being marked as stale.