datahub-project / datahub

The Metadata Platform for your Data and AI Stack
https://datahubproject.io
Apache License 2.0
9.89k stars 2.93k forks source link

Ingestion for GCS ingest fails with stateful ingestion #11790

Open josges opened 3 days ago

josges commented 3 days ago

Describe the bug When running ingestion for gcs with stateful_ingestion: enabled: true, I get the error

datahub.ingestion.run.pipeline.PipelineInitError: Failed to configure the source (gcs): Checkpointing provider DatahubIngestionCheckpointingProvider already registered.

Even if disabling stateful ingestion, the log will show the lines

INFO     {datahub.ingestion.source.state.stateful_ingestion_base:241} - Stateful ingestion will be automatically enabled, as datahub-rest sink is used or `datahub_api` is specified
[...]
INFO     {datahub.ingestion.run.pipeline:571} - Processing commit request for DatahubIngestionCheckpointingProvider. Commit policy = CommitPolicy.ALWAYS, has_errors=False, has_warnings=False
WARNING  {datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:95} - No state available to commit for DatahubIngestionCheckpointingProvider
INFO     {datahub.ingestion.run.pipeline:591} - Successfully committed changes for DatahubIngestionCheckpointingProvider.

To Reproduce

Prerequisites:

Steps to reproduce the behavior:

  1. Minimal recipe:
    
    source:
    type: gcs
    config:
    path_specs: 
      - include: gs://<my bucket>/*.parquet
    stateful_ingestion:
      enabled: true
      remove_stale_metadata: true
    credential:
      hmac_access_id: <my hmac_access_id>
      hmac_access_secret: <my hmac_access_secret>

pipeline_name: gcs-pipeline

sink: type: "datahub-rest" config: server: token:


2. Run `datahub ingest run -c <my minimal recipe>.yml`

**Expected behavior**
The pipeline should run without errors and write the state correctly, removing any stale metadata if configured.

**Additional context**
I think I have already found a fix that I will commit in the near future. The root cause is the creation of a equivalent s3 source that leads to re-registering of the checkpointing provider. Also, the platform attribute is missing which leads to the state being written to the platform "default".
josges commented 3 days ago

This is also related to https://github.com/datahub-project/datahub/issues/10736