fybrik / openmetadata-connector

Fybrik OpenMetadata Connector
Apache License 2.0
2 stars 7 forks source link

Ingest Fails to Find Parquet Assets #43

Open simanadler opened 1 year ago

simanadler commented 1 year ago

Parquet file in COS bucket is not successfully ingested.

cdoron commented 1 year ago

Suppose Fybrik creates an asset for writing called object1 in bucket1. The id for that object would be openmetadata-s3.default.bucket1.object1. The arrow-flight-module is then asked to write to that asset.The result is a parquet object with a name like object1/part-0. When discovery is triggered on the bucket1 bucket, asset openmetadata-s3.default.bucket1.object1 will be marked as deleted, because no bucket1/object1 object is found. Also, bucket1/object1/part-0 will not be discovered because is has neither the .csv nor the .parquet postfix.

cdoron commented 1 year ago

I could find no easy workarounds. My first thought was for Fybrik to name the object with the .parquet postfix and to change the arrow-flight-module to write into the specified object name -- that is, to write to an object1.parquet object rather than to an object.parquet/part-0 object. That change would not be easy to do, since the arrow-flight-module uses the pyarrow.dataset library, and the exact same code is used for csv object and parquet objects. Separating these two cases would complicate the code. I suggest that we wait and see whether this problem is fixed in OM 0.13.1 (once it is released)

simanadler commented 1 year ago

@cdoron Is there an issue open in open metadata for this? When is OM 0.13.1 expected?

Also, this issue affects the fybrik quick start write example too. Correct? So the quick start doesn't work either. Should we consider changing it to write a .csv instead?

cdoron commented 1 year ago

@simanadler : OM 0.13.0 introduced a profiler for Datalake connection (including S3). There is a small chance that this would also address our issue with parquet objects. OM 0.13.0 is out, but they have introduced a new bug which prevents OM from working with non-AWS S3 services. I have opened an issue regarding this bug: https://github.com/open-metadata/OpenMetadata/issues/9217 I expect that OM 0.13.1 will be out within a week or two.

Regarding the quick start: It does work. You create an asset with certain properties. When you query the connector regarding the asset, you get the same properties you entered. Things will continue to work even if you trigger an ingestion pipeline. The only difference is that the asset would be marked as deleted. It is not elegant (that OM is not able to discover the asset), but everything works.

simanadler commented 1 year ago

@cdoron As we discussed, let's investigate whether the csv flow works better.

revit13 commented 1 year ago

Ingestion of an asset created by fybrik in the write flow fails with csv as well. This is what @cdoron and myself saw: The bucket created by the AFM in localstack is my-notebook-write-fybrik-notebook-sample5b9b855b5b and the object key is: my-notebook-write5b9b855b5b/part-2022-12-15-19-30-03-153857-0. The asset is marked as deleted in OM possibly because the asset name in openmetadata is my-notebook-write5b9b855b5b which does not match the object that is actually created by the arrow flight module (my-notebook-write5b9b855b5b/part-2022-12-15-19-30-03-153857-0), as @cdoron mentioned above. So the asset name as appears in OM is actually a directory that contains the csv file. The part- suffix in the object name was created by the AFM - https://github.com/fybrik/arrow-flight-module/blob/master/afm/server.py#L76 The object that is created by AFM does not include .csv suffix

The thing we need to investigate is how to make the object name in openmetadata identical to the one that is generated by the module including the .csv suffix. we can do the following to achieve this:

(*) deleting the usage of part- in AFM code does not work as it seems to create the object key name with part-.csv by default. need to further look at how parameters are passed to the pyarrow to create the table. (https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html)

There is another option to investigate which is the ability of openmetadata to specify prefix of the data source when creating a new service.

revit13 commented 1 year ago

Regarding the ability of openmetadata to specify a prefix of the data source when creating an asset: I created a new service with bucket name my-notebook-write-fybrik-notebook-sample5b9b855b5b and data source prefix: my-notebook-write5b9b855b5b. when creating an object in local stack with key: my-notebook-write5b9b855b5b/part-0.csv and doing the ingestion I can see in OM table name: my-notebook-write5b9b855b5b/part-0.csv and the schema inside. I am not sure about predefined tags. when creating the service using the gui I could not specify tags.

cdoron commented 1 year ago

I think we need:

  1. .csv or .parquet suffixes for objects in order for OM to discover them
  2. to write object with names corresponding to the assets created by the OM connector. That is not currently the case

Currently, when asked to write to my-notebook-write-fybrik-notebook-sample5b9b855b5b/my-notebook-write5b9b855b5b, the arrow-flight-module writes to my-notebook-write-fybrik-notebook-sample5b9b855b5b/my-notebook-write5b9b855b5b/part-0.csv. This is not a parquet issue, as I previously thought. This is how the arrow-flight-module is written, to prevent overwriting of assets -- when csv are written in parts, new writes are equivalent to appending to a dataset (rather than overwriting it).

I suggest that:

revit13 commented 1 year ago

@cdoron regarding the part_ issue : this is pyarrow issue please see my comment above. Thanks https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

cdoron commented 1 year ago

@revit13 : Thanks for the clarification. However, I am sure we can find a way to get around it if we so choose.

simanadler commented 1 year ago

@cdoron @revit13 There are two write scenarios. 1) Write a new asset. This is the use case we have been discussing In this case Fybrik generates a new asset in the catalog 2) Append to an existing asset. In this case fybrik receives an existing data catalog assetID.

@cdoron Would your proposed solution for 1 cause a problem with 2?

cdoron commented 1 year ago

@simanadler : yes. What my suggestion would cause a problem with append.

Let me try to summarize my thoughts. There are four players: fybrik, OM, the OM-connector, and the arrow-flight-module. In the sample write use case, fybrik first contacts the OM-connector and requests the creation of an asset (which does not yet exist). Fybrik next configures the arrow-flight-module with information regarding the new asset. Our problems have to do with the OM discovery:

  1. if the asset name does not end with .csv or .parquet, OM will not discover it
  2. Even if OM discovered our asset (if we force a .csv or .parquet postfix), the name of the asset created by the connector would still not the same as the name of the asset discovered by OM (explanation below).

Currently, if fybrik requests the creation of an asset called object1 in bucket bucket1, the OM-connector will create asset openmetadata-s3.default.bucket1.object1. The arrow-flight-module would later create an object called bucket1/object1/part-0. This object would not be discovered by OM, as it does not have the right postfix. If we change either the arrow-flight-module code or its configuration so as to create object bucket1/object1/part-0.parquet instead, then that object would be discovered, but will be known as openmetadata-s3.default.bucket1."object1/part-0.parquet" (an entirely different asset with a different assetID).

One way to address this is to change the behavior of fybrik and the arrow-flight-module (as proposed in my previous comment). fybrik would request the creation of object1.csv instead of object1, and the arrow-flight-module would create object1.csv instead of object1.csv/part-0. I thought that this change would be easy, but I now understand from @revit13 and @Mohammad-nassar10 that it might be difficult. Also, making this option work with the append scenario would be even more difficult, or nearly impossible.

There is at least one other option. fybrik would still request the creation of an asset in bucket bucket1 called object1. The OM-connector, with its knowledge of the arrow-flight-module behavior, would create an OM asset with assetID openmetadata-s3.default.bucket1."object1/part-0.parquet" (or openmetadata-s3.default.bucket1."object1/part-0.csv"). fybrik configuration of the arrow-flight-module will not change, and object bucket1/object1/part-0.parquet will be created. As a result, OM discovery will update openmetadata-s3.default.bucket1."object1/part-0.parquet" as required. This option would require minor changes to the arrow-flight-module (to choose a deterministic name for the dataset parts, and to add the .parquet or .csv postfixes). This option supports the append scenario.

A couple of comments:

  1. the behavior of the OM-connector is tied to the behavior of the arrow-flight-module. The connector knows the name of the object the arrow-flight-module would create, and chooses the assetID accordingly. Other FybrikModules might behave differently.
  2. in the append scenario, different parts of the same dataset would reside in different objects (bucket1/object1/part-0.parquet, bucket1/object1/part-1.parquet, etc.). I am pretty sure that OM would treat them as separate data assets (maybe this would change in the future). Therefore, only changes (e.g. tags) to the first part (whose assetID is openmetadata-s3.default.bucket1."object1/part-0.parquet"), would be seen by fybrik.
simanadler commented 1 year ago

@cdoron Am I correct in assuming that the problem of appending to a given data set is specific to parquet? Would there be a problem appending to a csv or a database table?

Regarding linking the data catalog connector logic and the module logic, I don't believe that fybrik manager knows which module will be deployed at the time that it generates the asset ID to be added to the catalog.

I suggest we meet and discuss all of this in more detail together with @shlomitk1, since this issue may also be related to the new storage allocation design

revit13 commented 1 year ago

btw openmetadata plans to enhance their DataLake metadata for the 1.0 release planned for the beginning of Feb https://github.com/open-metadata/OpenMetadata/discussions/8124. the following scenario seems similar to ours: https://github.com/open-metadata/OpenMetadata/discussions/8124#discussioncomment-3897789. @cdoron what do you think?

revit13 commented 1 year ago

@simanadler from my testing with AFM the appending of rows to csv and parquet files is done similarly: the csv/parquet files with the new rows are placed in the bucket which contains the existing data. This is done by specifying "write_mode": "append" in the notebook when writing the new rows. When reading the asset in the notebook, the different files will appear in different arrow-flight endpoints.

shlomitk1 commented 1 year ago

@cdoron This is not how Fybrik works. Consider the following scenario: an asset X is ingested. Fybrik will invoke the optimizer to find a storage account and the data access module to support the copy operation. Then, Fybrik allocates storage and forms a connection object - for example, {S3, bucket = BUCKET, object_key = DATA}. Next, Fybrik generates a plotter + blueprint + configures and deploys the module. After the orchestration is done successfully (a copy job has been completed), Fybrik requests the new asset to be registered in the catalog. In the write scenario, similarly to the previous one, Fybrik first allocates the storage, forms a connection object, configures the module, and only then it will register the asset to be written in the catalog. In both scenarios, Fybrik does not use the new asset ID returned by the connector except for storing it in the app status.