open-metadata / OpenMetadata

OpenMetadata is a unified metadata platform for data discovery, data observability, and data governance powered by a central metadata repository, in-depth column level lineage, and seamless team collaboration.
https://open-metadata.org
Apache License 2.0
5.59k stars 1.05k forks source link

ISSUE-16094: fix s3 storage parquet structureFormat ingestion #18660

Open KylixSerg opened 6 days ago

KylixSerg commented 6 days ago

Describe your changes:

Fixes 16094 Short blurb explaining: This aims at fixing the s3 storage ingestion for parquet structured containers, when the ingestion flow tries to extract columns from a sample file under the dataPath it may happen that a non structureFormat file is picked, example a _SUCCESS file or that the prefix is empty but the s3 lifecycle hasnt kicked in yet, this will result in the column extraction understandably failing as the file cannot be loaded into a dataframe.

For that, do not fail when the column extraction fails but log a warning and continue with the remainder of the paths in the manifest, this is already an improvement on the status quo, but ideally when the column extraction fails, the ingestion flow should have a rety mechanism where it will rety a couple other files under the prefix if available before giving up, and moving on to another prefix.

#

Type of change:

#

Checklist:

github-actions[bot] commented 6 days ago

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

sushi30 commented 6 days ago

We need to choose one of:

  1. ignore failed files are
  2. allow for including/excluding patterns so that the user can define which files to skip.

The risk of performing the "catch return None" (1) is that when the user mis-configures the connector (example: chooses a directory with CSV files when they were looking for PARQUET files) they will receive a "successful" workflow with nothing ingested instead of an error.

We've seen reverse issues where users are complaining about successful workflows that have empty results and digging through the logs is not necessarily the best UX for these cases.

Whichever solution we should stick to it throughout the platform as much as we can.

KylixSerg commented 6 days ago

@sushi30 Thanks for checking this out!

imo I would avoid option 2, the issue I see with that approach from the way we're using OpenMetadata in my organization and that is probably similar to what a lot of folks might have setup, is that the ingestion pipeline (generating the manifests, configuring the connector...) is maintained by a different set of engineers than those publishing the data, with approach number 2, we will open up the door for users of the platform to introduce breaking changes to the connector's setup, ex:

filter out any *_STATUS files(which in itself might be problematic, as it might require changes to a lot of pipelines in case there is no convention on naming files), then say the user uploads some other file like _READY, if the ingestion then picks up that file then it will break the ingestion job.

Furthermore, there might be setups(as it is in my use case) where even different teams are all publishing to the same bucket, therefore a single job for the same s3 bucket, making it so that if a user introduces a non conventional file name to their publishing location, that also might break it for other users publishing to the same bucket.

Personally I like option 1 even considering it's downside

We've seen reverse issues where users are complaining about successful workflows that have empty results and digging through the logs is not necessarily the best UX for these cases.

Because the responsibility of making the ingestion work would fall exclusively on the engineers maintaining the connection, generating the manifest.

github-actions[bot] commented 6 days ago

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

sushi30 commented 3 days ago

Thank you @KylixSerg for taking the time to look it into this solution and providing the context. OpenMetadata aims to increase effective communication between teams. The issue you highlights the kind of discussions we prefer to be handled within the organization instead of letting the platform absorb the discrepancies. Object storage systems have fairly stable conventions in place (like the _SUCCESS file created by spark, using linux-like paths, or hive-like parititon directories) and teams will adopt some of them and might create some of their own. With OpenMetadata, teams are encouraged discuss and agree on the technical specifications that define their data pipelines and, as a result, how the metadata is consumed by the platforms (or others like it).

If multiple teams are publishing the same bucket, how are any other upstream processes reading the objects? I imagine there needs to be some coordination to avoid serving the wrong files to the wrong consumers.

As a counter-example, what happens if a process start dumping corrupted parquet files or CSV files instead of parquet? We expect our platform to be a participant in safe data practices and raise an error for this case rather than be complacent and pass the responsibility to upstream actions that ultimately will result in data we cannot trust.

Suggested Implementation

Given these principal. I suggest the following implementation:

  1. Add a global and/or local containerFilterPatterns to the containerMetadataConfig schema. This will be similar to our filterPatterns used to filter with regular expressions.
  2. Add the appropriate logic to exclude these files in the S3Source class. Once this logic is demonstrated for s3 we can create a separate delivery to generalize the concept for other object storage systems.
KylixSerg commented 3 days ago

@sushi30

If multiple teams are publishing the same bucket, how are any other upstream processes reading the objects? I imagine there needs to be some coordination to avoid serving the wrong files to the wrong consumers.

The upstream process will break but only for that particular segment of the bucket, what OpenMetadata calls container, but it definitely does not break upstream for all users of a bucket, for example if there is a process that would create glue tables out of s3 prefixes, and that process requires a set of partitions on the prefix, then said process will surely not break because a single tenant of the bucket didn't know how to effectively use the platform, similarly I think a single tenant of OpenMetadata should not have the ability to break the OpenMetadata integration.

As a counter-example, what happens if a process start dumping corrupted parquet files or CSV files instead of parquet? We expect our platform to be a participant in safe data practices and raise an error for this case rather than be complacent and pass the responsibility to upstream actions that ultimately will result in data we cannot trust.

With your counter-example in mind, I do agree that the user should be aware of failures in the pipeline, like you laid out, bad parquet or a completely different file type than what was expected, but my ultimate point is that a multi-tenant setup should be setup so that one tenant's setup should not break it for other people using it.

I would suggest we go with your suggested implementation but with the extra point of setting it up so that we dont have single points of failure in the pipeline ie

  1. Do not fail the whole ingestion job when a single container fails, make sure to properly highlight which containers failed

wdyt?

sonarcloud[bot] commented 3 days ago

Quality Gate Passed Quality Gate passed for 'open-metadata-ingestion'

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
25.0% Coverage on New Code
0.0% Duplication on New Code

See analysis details on SonarQube Cloud