airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.36k stars 3.96k forks source link

[destination-postgres] java.lang.RuntimeException: Unable to deserialize PartialAirbyteMessage. #41991

Closed misza80 closed 1 month ago

misza80 commented 1 month ago

Connector Name

destination-postgres

Connector Version

v2.0.12

What step the error happened?

During the sync

Relevant information

Source: airbyte/source-mssql:v4.0.30 Destination: airbyte/destination-postgres:2.0.12 Platform: 0.63.1 Issue: When attempting to ingest all fields, the process fails with the following error:

2024-07-15 14:47:23 destination > ERROR main i.a.c.i.b.AirbyteExceptionHandler(uncaughtException):31 Something went wrong in the connector. See the logs for more details. java.lang.RuntimeException: Unable to deserialize PartialAirbyteMessage.
        at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage$lambda$0(AirbyteMessageDeserializer.kt:33) ~[airbyte-cdk-core-0.35.15.jar:?]
        at java.base/java.util.Optional.orElseThrow(Optional.java:403) ~[?:?]
        at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:32) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:128) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:428) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) ~[?:?]
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939) ~[?:?]
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:418) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:211) ~[io.airbyte.airbyte-integrations.connectors-destination-postgres.jar:?]
        at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt) ~[io.airbyte.airbyte-integrations.connectors-destination-postgres.jar:?]
Stack Trace: java.lang.RuntimeException: Unable to deserialize PartialAirbyteMessage.

image image

Relevant log output

at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) ~[?:?]
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939) ~[?:?]
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) ~[?:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:426) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:418) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116) ~[airbyte-cdk-core-0.35.15.jar:?]
        at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:211) ~[io.airbyte.airbyte-integrations.connectors-destination-postgres.jar:?]
        at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt) ~[io.airbyte.airbyte-integrations.connectors-destination-postgres.jar:?]
Stack Trace: java.lang.RuntimeException: Unable to deserialize PartialAirbyteMessage.
        at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage$lambda$0(AirbyteMessageDeserializer.kt:33)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:32)
        at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:128)
        at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:428)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:426)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:426)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:426)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:418)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:209)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116)
        at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:211)
        at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt)
2024-07-15 14:47:23 platform > readFromDestination: exception caught
io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1
        at io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:493) ~[io.airbyte-airbyte-commons-worker-0.63.1.jar:?]
        at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:235) ~[io.airbyte-airbyte-commons-worker-0.63.1.jar:?]
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2024-07-15 14:47:23 platform > writeToDestination: exception caught
java.io.IOException: Broken pipe

Contribute

marcosmarxm commented 1 month ago

@misza80, is this a new sync, or did you upgrade the platform and connectors?

misza80 commented 1 month ago

@misza80, is this a new sync, or did you upgrade the platform and connectors?

this is new sync. only started with Airbyte few weeks back. Unless the connectors got upgraded at some point, not sure on this, I could have hit upgrade all on the connectors page, cant say if the MSSQL or Postgres were upgraded at that time.

marcosmarxm commented 1 month ago

@misza80 is there a successful sync previously? Can you share the logs for that? Also update the complete log file.

misza80 commented 1 month ago

@marcosmarxm I was not able to perform a successful sync. please see attached. Thank you for looking into this.

airbytelog.txt

https://github.com/user-attachments/assets/21b1bece-0ba6-4619-9b7d-db39e1a72312

marcosmarxm commented 1 month ago

@airbytehq/destinations can someone take a look into this issue? Thanks!

evantahler commented 1 month ago

What we believe is happening here is that your source is sending a record that is either huge (>20mb) or has some unexpected break characters (e.g. lots of \n) we aren't handling properly. To solve this, we need to see an example of the content your source is sending.

If you are running airbyte via docker-compose, can you please add the following environment variable LOG_CONNECTOR_MESSAGES=true, e.g. LOG_CONNECTOR_MESSAGES=true docker-compose up? This will add a log line for every record to your sync logs... and the latest before the crash should be what we need. This will make your sync logs huge, so if you could isolate your sync to just a single stream that reproduces the issue, that would help.

ogirardot commented 1 month ago

reproduced using source: postgres:3.4.23 destination: postgres:2.0.15

evantahler commented 1 month ago

@ogirardot if you could share the content (e.g. a sqldump) of the data that produced this error in your source, that would be very helpful

ogirardot commented 1 month ago

I've tried to narrow the possibilities (columns and lines) and for the column part in my usecase (pg -> pg) it seems to be when handling a specific jsonb field (not all jsonb fields). I'll further deep dive into the data to try and find the problematic column, but a specific issue I'm finding is that through airbyte the data type is lost and it becomes jsonb -> character varying.

ogirardot commented 1 month ago

considering the code for serde PartialAirbyteMessage has changed quite a bit in the last "hours/week" I've tried to upgrade the source to 3.6.2 (released 8 h ago) with ofc the same result.

Would it be possible to release a new version of the destination/postgres @evantahler to get a more recent stacktrace (or check if it wouldn't be fixed) ? I'll try and build it as a new connector on my own but it'd be great to check if the new serialization/deserialization code is changing anything

ogirardot commented 1 month ago

Diving into existing issues it seems to be a common problem related to the future support_refreshes feature c.f. https://github.com/airbytehq/airbyte/pull/39473 , https://github.com/airbytehq/airbyte/issues/38641 and https://github.com/airbytehq/airbyte/issues/37621

I tried to build a custom destination-postgres and deploy, bumping the platform version to 0.63.8 (helm chart 0.293.4) and bumping the cdk version for destination-postgres :

-    cdkVersionRequired = '0.35.15'
+    cdkVersionRequired = '0.41.2'

but as the connector does not support refreshes the deployment fails with a issue like this one https://github.com/airbytehq/airbyte/discussions/40606 and the fix to set support_refreshes to true makes the connector fail with a missing _airbyte_generation_id field...

So I guess the retro-compatibility got broken at some point developing the support for refreshes ? Do we need to have sources and destination using the same cdkVersionRequired ?

If I take into account OP's original issue and my own the specs are :

do we need to align those ?

stephane-airbyte commented 1 month ago

The cdk versions for the convectors shouldn’t have any impact on compatibility. The only compatibility error we’re aware off WRT refreshes feature is between the platform and the destination (you need to upgrade the platform to 0.63 before you upgrade your destination to a version that supports refreshes.

There’s a new destination-postgres coming very soon that introduces support for refreshes


Stephane Geneix

Staff Engineer

GitHub https://github.com/airbytehq/airbyte | Twitter https://twitter.com/AirbyteHQ | LinkedIn https://www.linkedin.com/company/airbytehq/

We're hiring, come work with me! https://airbyte.io/careers [image: 🚀]

On Sun, Jul 21, 2024 at 11:59 PM Olivier Girardot @.***> wrote:

Diving into existing issues it seems to be a common problem related to the future support_refreshes feature c.f. #39473 https://github.com/airbytehq/airbyte/pull/39473 , #38641 https://github.com/airbytehq/airbyte/issues/38641 and #37621 https://github.com/airbytehq/airbyte/issues/37621

I tried to build a custom destination-postgres and deploy, bumping the platform version to 0.63.8 (helm chart 0.293.4) and bumping the cdk version for destination-postgres :

  • cdkVersionRequired = '0.35.15'
  • cdkVersionRequired = '0.41.2'

but as the connector does not support refreshes the deployment fails with a issue like this one #40606 https://github.com/airbytehq/airbyte/discussions/40606 and the fix to set support_refreshes to true makes the connector fail with a missing _airbyte_generation_id field...

So I guess the retro-compatibility got broken at some point developing the support for refreshes ? Do we need to have sources and destination using the same cdkVersionRequired ?

If I take into account OP's original issue and my own the specs are :

  • destination/postgres 2.0.15 : cdkVersionRequired = '0.35.15'
  • source/mssql 4.0.30 : cdkVersionRequired = '0.40.1'
  • source/postgres 3.6.2 : cdkVersionRequired = '0.42.2'

do we need to align those ?

— Reply to this email directly, view it on GitHub https://github.com/airbytehq/airbyte/issues/41991#issuecomment-2242228029, or unsubscribe https://github.com/notifications/unsubscribe-auth/BDDFPOD2HGYJXTEXNP77IKLZNSUVRAVCNFSM6AAAAABK6ZYA22VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENBSGIZDQMBSHE . You are receiving this because you are on a team that was mentioned.Message ID: @.***>

ogirardot commented 1 month ago

Hello @stephane-airbyte I've seen the support for refreshes come and go, sorry to ask so bluntly, but should we wait ? is it still under development ?

misza80 commented 1 month ago

@evantahler Apologies for the late reply, I was away on an annual leave. I have isolated the stream that was causing the error and set the reporting as per your request. Unfortunately each time I want to retrieve the logs I receive and error. image

evantahler commented 1 month ago

@misza80 are you able to see the logs from your airbyte server when that error occurs and get a stack trace?

stephane-airbyte commented 1 month ago

@ogirardot no need to apologize, I understand the support for refreshes in postgres was pulled without warning. We found critical issues that warranted a rollback, and we're still very much working on releasing the feature for good

evantahler commented 1 month ago

With https://github.com/airbytehq/airbyte/pull/42540, destination-postgres version 2.3.0 is out, which should fix this bug, and re-enable support for refreshes.

misza80 commented 1 month ago

after updating connectors to the latest version getting the following error.

java.lang.RuntimeException: Could not deserialize PartialAirbyteMessage: class com.fasterxml.jackson.databind.JsonMappingException at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402) at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1853) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:316) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177) at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:314) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177) at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4825) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3772) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3740) at io.airbyte.commons.json.Jsons.deserializeExactUnchecked(Jsons.kt:147) at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:48) at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:107) at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:385) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:383) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:383) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:383) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:375) at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215) at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113) at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:216) at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt) Caused by class com.fasterxml.jackson.core.exc.StreamConstraintsException at com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:324) at com.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27) at com.fasterxml.jackson.core.util.TextBuffer.finishCurrentSegment(TextBuffer.java:939) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString2(ReaderBasedJsonParser.java:2240) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._finishString(ReaderBasedJsonParser.java:2206) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:323) at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer._deserializeContainerNoRecursion(JsonNodeDeserializer.java:572) at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:100) at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:25) at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:314) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177) at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:314) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177) at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4825) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3772) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3740) at io.airbyte.commons.json.Jsons.deserializeExactUnchecked(Jsons.kt:147) at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:48) at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:107) at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:385) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:383) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:383) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:383) at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:375) at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215) at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119) at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113) at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:216) at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt)

Screenshot 2024-08-02 120622

evantahler commented 4 weeks ago

@misza80 it would be immensely helpful if you could provide an example record that causes this crash. Can you enable your streams 1-by-1 to find the offending content? Can you run your airbyte platform with the environment variable LOG_CONNECTOR_MESSAGES=true (if on docker compose), or dump your source data and share it with us?

misza80 commented 4 weeks ago

@evantahler Is there any other way I can export logs for the given job, beside the GUI?

evantahler commented 3 weeks ago

It depends on how you are running airbyte. If you are using docker compose, the job logs will be the summation of the stdout of the source, destination, and orchestrator containers during the sync - you could grab them from docker, or in the airbyte peristent volume. If you run syncs with abctl, there should be a log volume... and if you deploy with K8s, you should have an S3/GCS bucket which holds your sync logs.

A reminder - we care less about your sync logs, and more about the actual record content in the source that is causing the crash. e.g. can you PGDUMP the table (or part of it) that is causing the problem?

misza80 commented 3 weeks ago

@evantahler Please find attached two entries from the DB I'm trying to sync.File:export.csv First line is the last successful entry that got synced from SQL to postgres. The second line is the next row in that table. Unfortunately I'm unable to upload a full table dump.

evantahler commented 3 days ago

@misza80 it looks like you've got some rather large records, but not that large (e.g. under the 50mb parsing limit we have set). I'm curious if there are some line breaks or other "secret" break characters in that content that aren't serializing properly.