starlake-ai / starlake

Declarative text based tool for data analysts and engineers to extract, load, transform and orchestrate their data pipelines.
http://starlake.ai/
Apache License 2.0
57 stars 22 forks source link

fix: gcp log struct serialization #1025

Closed tiboun closed 2 months ago

tiboun commented 2 months ago

While using spark as load and gcp log sink, output could not be serialized (rejectedRecord here).

We get the following stack trace:

java.lang.IllegalArgumentException: Unsupported protobuf value 2024-09-20 11:36:59.753828 at com.google.cloud.Structs.objectToValue(Structs.java:147) at shade.com.google.common.collect.Maps$9.transformEntry(Maps.java:2117) at shade.com.google.common.collect.Maps$12.getValue(Maps.java:2165) at shade.com.google.protobuf.Struct$Builder.putAllFields(Struct.java:623) at com.google.cloud.Structs.newStruct(Structs.java:101) at com.google.cloud.logging.Payload$JsonPayload.of(Payload.java:121) at ai.starlake.utils.GcpUtils$.sinkToGcpCloudLogging(GcpUtils.scala:53) at ai.starlake.job.ingest.IngestionUtil$.$anonfun$sinkRejected$4(IngestionUtil.scala:76) at ai.starlake.job.ingest.IngestionUtil$.$anonfun$sinkRejected$4$adapted(IngestionUtil.scala:75) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929) at ai.starlake.job.ingest.IngestionUtil$.sinkRejected(IngestionUtil.scala:75) at ai.starlake.job.ingest.IngestionJob.saveRejected(IngestionJob.scala:876) at ai.starlake.job.ingest.IngestionJob.saveRejected$(IngestionJob.scala:843) at ai.starlake.job.ingest.JsonIngestionJob.saveRejected(JsonIngestionJob.scala:49) at ai.starlake.job.ingest.JsonIngestionJob.ingest(JsonIngestionJob.scala:175) at ai.starlake.job.ingest.IngestionJob.$anonfun$ingestWithSpark$1(IngestionJob.scala:410) at scala.util.Try$.apply(Try.scala:210)

This PR adapt the map before calling JsonPayload.of.