GoogleCloudPlatform / DataflowTemplates

Cloud Dataflow Google-provided templates for solving in-Cloud data tasks
https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
Apache License 2.0
1.11k stars 931 forks source link

[Bug]: Mongo to Bigquery fails if Mongo document contains BSON timestamp #1716

Open ggprod opened 6 days ago

ggprod commented 6 days ago

Related Template(s)

mongodb_to_bigquery

Template Version

2024-06-18-00_rc01

What happened?

When attempting to use the template with a Mongo collection that has documents with a bson timestamp, the Job will fail with error.

Relevant log output

Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Unable to encode element 'Document{{_id=664ca1a214e2486dcb269ff9, historyDoc=664ca1a214e2486dcb269ff3, opType=update, opDatedAt=Timestamp{value=7371444407255433218, seconds=1716298146, inc=2}, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024, change=Document{{newValue=Document{{isTrusted=true, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024}}}}, facilityNurse=664bbab3077870b0be2551ce, facility=62cd8ec60918180034f3fe51, nurse=5d8292819a5bca002ffc43f1, __v=0}}' with coder 'SerializableCoder(org.bson.Document)'.
    org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1794)
    org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2219)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1100)
    org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:143)
    org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:659)
    org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:654)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
    org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
    org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
    org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
    org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'Document{{_id=664ca1a214e2486dcb269ff9, historyDoc=664ca1a214e2486dcb269ff3, opType=update, opDatedAt=Timestamp{value=7371444407255433218, seconds=1716298146, inc=2}, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024, change=Document{{newValue=Document{{isTrusted=true, changedBy=62a757fe19bda3004761fdaf, changedAt=Tue May 21 13:29:06 UTC 2024}}}}, facilityNurse=664bbab3077870b0be2551ce, facility=62cd8ec60918180034f3fe51, nurse=5d8292819a5bca002ffc43f1, __v=0}}' with coder 'SerializableCoder(org.bson.Document)'.
    org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
    org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:509)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:336)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
Caused by: java.io.NotSerializableException: org.bson.BsonTimestamp
    java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
    java.base/java.util.LinkedHashMap.internalWriteEntries(LinkedHashMap.java:333)
    java.base/java.util.HashMap.writeObject(HashMap.java:1412)
    java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.base/java.lang.reflect.Method.invoke(Method.java:566)
    java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1016)
    java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1487)
    java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
    java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
    java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
    java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
    java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
    java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
    java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
    org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:192)
    org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:57)
    org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
    org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$SampleByteSizeDistribution.tryUpdate(PCollectionConsumerRegistry.java:509)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:336)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
    org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)
    org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
    org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2219)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)
    org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1100)
    org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:143)
    org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:659)
    org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:654)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
    org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
    org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
    org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
    org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
    org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    java.base/java.lang.Thread.run(Thread.java:829)
ggprod commented 5 days ago

Should this actually be a bug logged in the beam github for the MongoDbIO? Should it be stripping out or converting BSON timestamps to some serializable format?

ggprod commented 5 days ago

I found this similar error described by a Spark user: https://www.mongodb.com/community/forums/t/timestamp-giving-error/229311