open-metadata / OpenMetadata

OpenMetadata is a unified metadata platform for data discovery, data observability, and data governance powered by a central metadata repository, in-depth column level lineage, and seamless team collaboration.
https://open-metadata.org
Apache License 2.0
4.76k stars 918 forks source link

AWS S3 nested path with spark-lineage issue #16090

Open mykola-yesypchuk-inflection opened 2 months ago

mykola-yesypchuk-inflection commented 2 months ago

Affected module Ingestion Framework: spark-lineage https://docs.open-metadata.org/v1.3.x/connectors/ingestion/lineage/spark-lineage#configuration

Describe the bug Failed to write data using spark and spark-lineage configuration to nested path like s3a://test_bucket/data/group=db1/source_name=sp_entity

Error: 500 HTTP error in server API request 2024-04-30 13:55:41 - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/search/query?q=fullyQualifiedName%3Aometadata_db.*%2Fdata%2Fdb%2Fsp_entity&size=10 HTTP/1.1" 500 135 "-" "Apache-HttpClient/4.5.14 (Java/1.8.0_412)" 10

To Reproduce

sp_entity = (
    spark.read.format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/openmetadata_db")
    .option("driver", "com.mysql.cj.jdbc.Driver")
    .option("dbtable", "stored_procedure_entity")
    .option("user", "openmetadata_user")
    .option("password", "openmetadata_password")
    .load()
)
# Success example 1
# sp_entity.write.parquet("s3a://test_bucket/sp_entity", mode='overwrite')
# Success example 2
# sp_entity.write.parquet("s3a://test_bucket/data/sp_entity", mode='overwrite')
# Fail example 1
# sp_entity.write.parquet("s3a://test_bucket/data/db1/sp_entity", mode='overwrite')
# Fail example 2
# sp_entity.write.parquet("s3a://test_bucket/data/group=db1/source_name=sp_entity", mode='overwrite')

Spark logs:

4/04/30 13:55:41 ERROR OpenMetadataTransport: Failed to get table entity /data/db/sp_entity from OpenMetadata: 
io.openlineage.client.OpenLineageClientException: code: 500, response: {"code":500,"message":"Search failed due to Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]"}
    at org.openmetadata.transport.OpenMetadataTransport.throwOnHttpError(OpenMetadataTransport.java:499)
    at org.openmetadata.transport.OpenMetadataTransport.sendRequest(OpenMetadataTransport.java:306)
    at org.openmetadata.transport.OpenMetadataTransport.getTableEntity(OpenMetadataTransport.java:212)
    at org.openmetadata.transport.OpenMetadataTransport.getTableEntity(OpenMetadataTransport.java:237)
    at org.openmetadata.transport.OpenMetadataTransport.sendToOpenMetadata(OpenMetadataTransport.java:199)
    at org.openmetadata.transport.OpenMetadataTransport.emit(OpenMetadataTransport.java:132)
    at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46)
    at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:62)
    at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:136)
    at org.openmetadata.spark.agent.OpenMetadataSparkListener.sparkSQLExecEnd(OpenMetadataSparkListener.java:115)
    at org.openmetadata.spark.agent.OpenMetadataSparkListener.onOtherEvent(OpenMetadataSparkListener.java:101)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
24/04/30 13:55:41 ERROR OpenMetadataTransport: failed to emit event to OpenMetadata: io.openlineage.client.OpenLineageClientException: code: 500, response: {"code":500,"message":"Search failed due to Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]"}
io.openlineage.client.OpenLineageClientException: io.openlineage.client.OpenLineageClientException: code: 500, response: {"code":500,"message":"Search failed due to Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]"}
    at org.openmetadata.transport.OpenMetadataTransport.getTableEntity(OpenMetadataTransport.java:228)
    at org.openmetadata.transport.OpenMetadataTransport.getTableEntity(OpenMetadataTransport.java:237)
    at org.openmetadata.transport.OpenMetadataTransport.sendToOpenMetadata(OpenMetadataTransport.java:199)
    at org.openmetadata.transport.OpenMetadataTransport.emit(OpenMetadataTransport.java:132)
    at io.openlineage.client.OpenLineageClient.emit(OpenLineageClient.java:46)
    at io.openlineage.spark.agent.EventEmitter.emit(EventEmitter.java:62)
    at io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext.end(SparkSQLExecutionContext.java:136)
    at org.openmetadata.spark.agent.OpenMetadataSparkListener.sparkSQLExecEnd(OpenMetadataSparkListener.java:115)
    at org.openmetadata.spark.agent.OpenMetadataSparkListener.onOtherEvent(OpenMetadataSparkListener.java:101)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
    at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
    at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
    at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
    at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1471)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: io.openlineage.client.OpenLineageClientException: code: 500, response: {"code":500,"message":"Search failed due to Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]"}
    at org.openmetadata.transport.OpenMetadataTransport.throwOnHttpError(OpenMetadataTransport.java:499)
    at org.openmetadata.transport.OpenMetadataTransport.sendRequest(OpenMetadataTransport.java:306)
    at org.openmetadata.transport.OpenMetadataTransport.getTableEntity(OpenMetadataTransport.java:212)
    ... 22 more

openmetadata_server logs:

2024-04-30 13:55:41 127.0.0.1 - - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/system/config/jwks HTTP/1.1" 200 454 "-" "Java/17.0.10" 1
2024-04-30 13:55:41 INFO [2024-04-30 10:55:41,927] [dw-393 - PUT /api/v1/pipelines] o.o.s.j.EntityRepository - bd99ba85-8009-42db-b62c-ff618782674d 0.1->0.1 - Fields added [], updated [], deleted []
2024-04-30 13:55:41 INFO [2024-04-30 10:55:41,930] [ForkJoinPool-1-worker-13] o.o.s.e.ChangeEventHandler - Recording change event 1714401080838:bd99ba85-8009-42db-b62c-ff618782674d:entityNoChange:pipeline
2024-04-30 13:55:41 - [30/Apr/2024:10:55:41 +0000] "PUT /api/v1/pipelines HTTP/1.1" 200 718 "-" "Apache-HttpClient/4.5.14 (Java/1.8.0_412)" 14
2024-04-30 13:55:41 127.0.0.1 - - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/system/config/jwks HTTP/1.1" 200 454 "-" "Java/17.0.10" 0
2024-04-30 13:55:41 - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/search/query?q=fullyQualifiedName%3Aometadata_db.*stored_procedure_entity&size=10 HTTP/1.1" 200 1611 "-" "Apache-HttpClient/4.5.14 (Java/1.8.0_412)" 18
2024-04-30 13:55:41 127.0.0.1 - - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/system/config/jwks HTTP/1.1" 200 454 "-" "Java/17.0.10" 0
2024-04-30 13:55:41 ERROR [2024-04-30 10:55:41,969] [dw-393 - GET /api/v1/search/query?q=fullyQualifiedName%3Aometadata_db.*%2Fdata%2Fdb%2Fsp_entity&size=10] o.o.s.e.CatalogGenericExceptionMapper - Error handling a request: 9d47b5219fbbc117
2024-04-30 13:55:41 org.openmetadata.sdk.exception.SearchException: Search failed due to Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]
2024-04-30 13:55:41     at org.openmetadata.service.search.elasticsearch.ElasticSearchClient.search(ElasticSearchClient.java:467)
2024-04-30 13:55:41     at org.openmetadata.service.search.SearchRepository.search(SearchRepository.java:643)
2024-04-30 13:55:41     at org.openmetadata.service.resources.search.SearchResource.search(SearchResource.java:173)
2024-04-30 13:55:41     at jdk.internal.reflect.GeneratedMethodAccessor1283.invoke(Unknown Source)
2024-04-30 13:55:41     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2024-04-30 13:55:41     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
2024-04-30 13:55:41     at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
2024-04-30 13:55:41     at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
2024-04-30 13:55:41     at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
2024-04-30 13:55:41     at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
2024-04-30 13:55:41     at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
2024-04-30 13:55:41     at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
2024-04-30 13:55:41     at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
2024-04-30 13:55:41     at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
2024-04-30 13:55:41     at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
2024-04-30 13:55:41     at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
2024-04-30 13:55:41     at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
2024-04-30 13:55:41     at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
2024-04-30 13:55:41     at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
2024-04-30 13:55:41     at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
2024-04-30 13:55:41     at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
2024-04-30 13:55:41     at io.dropwizard.servlets.ThreadNameFilter.doFilter(ThreadNameFilter.java:35)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
2024-04-30 13:55:41     at io.dropwizard.jersey.filter.AllowedMethodsFilter.handle(AllowedMethodsFilter.java:47)
2024-04-30 13:55:41     at io.dropwizard.jersey.filter.AllowedMethodsFilter.doFilter(AllowedMethodsFilter.java:41)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
2024-04-30 13:55:41     at org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter.doFilter(WebSocketUpgradeFilter.java:292)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
2024-04-30 13:55:41     at org.eclipse.jetty.servlets.HeaderFilter.doFilter(HeaderFilter.java:117)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
2024-04-30 13:55:41     at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
2024-04-30 13:55:41     at com.codahale.metrics.jetty9.InstrumentedHandler.handle(InstrumentedHandler.java:318)
2024-04-30 13:55:41     at io.dropwizard.jetty.RoutingHandler.handle(RoutingHandler.java:52)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
2024-04-30 13:55:41     at io.dropwizard.jetty.ZipExceptionHandlingGzipHandler.handle(ZipExceptionHandlingGzipHandler.java:26)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.RequestLogHandler.handle(RequestLogHandler.java:54)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
2024-04-30 13:55:41     at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
2024-04-30 13:55:41     at org.eclipse.jetty.server.Server.handle(Server.java:516)
2024-04-30 13:55:41     at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
2024-04-30 13:55:41     at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
2024-04-30 13:55:41     at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
2024-04-30 13:55:41     at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
2024-04-30 13:55:41     at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
2024-04-30 13:55:41     at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
2024-04-30 13:55:41     at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
2024-04-30 13:55:41     at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
2024-04-30 13:55:41     at java.base/java.lang.Thread.run(Thread.java:840)
2024-04-30 13:55:41 - [30/Apr/2024:10:55:41 +0000] "GET /api/v1/search/query?q=fullyQualifiedName%3Aometadata_db.*%2Fdata%2Fdb%2Fsp_entity&size=10 HTTP/1.1" 500 135 "-" "Apache-HttpClient/4.5.14 (Java/1.8.0_412)" 10

Expected behavior Run without exceptions

Version:

harshach commented 2 months ago

@ulixius9 can you take a look

mykola-yesypchuk-inflection commented 1 month ago

Do we have any updates on that?

ulixius9 commented 6 days ago

@mykola-yesypchuk-inflection did you try reindexing?

mykola-yesypchuk-inflection commented 6 days ago

Hi @ulixius9. Thanks for your attention. No, I had not try reindexing.

Can you reproduce the issue using my bug description? I hope it is not related to reindexing Best regards, Mykola