apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [Connectors-v2] mongodb Bson convert throw exeception #8042

Open hawk9821 opened 1 week ago

hawk9821 commented 1 week ago

Search before asking

What happened

mongodb Bson convert throw exeception , both connector-mongodb and connector-cdc-mongodb report errors.

mongodb initialization script :

db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000101"),"order_number": 102482, "order_date": "2023-11-12", "quantity": 2 , "product_id": ObjectId("100000000000000000000101")});
db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000102"),"order_number": 102483, "order_date": "2023-11-13", "quantity": 5 , "product_id": ObjectId("100000000000000000000102")});
db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000103"),"order_number": 102484, "order_date": "2023-11-14", "quantity": 6 , "product_id": ObjectId("100000000000000000000103")});
db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000104"),"order_number": 102485, "order_date": "2023-11-15", "quantity": 9 , "product_id": ObjectId("100000000000000000000104")});
db.getCollection('orders').insertOne({"_id": ObjectId("100000000000000000000105"),"order_number": 102486, "order_date": "2023-11-16", "quantity": 8 , "product_id": ObjectId("100000000000000000000105")});

SeaTunnel Version

2.3.9-SNAPSHOT

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
  #spark config
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 1
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}

source {
  MongoDB {
    uri = "mongodb://localhost:27018/inventory"
    database = "inventory"
    collection = "orders"
    result_table_name = "mongodb_null_table"
    cursor.no-timeout = true
    fetch.size = 1000
    max.time-min = 100
    schema = {
      table = "inventory.orders"
      fields {
        "order_number" : int,
        "order_date" : string,
        "quantity" : int,
        "product_id" : string
      }
    }
  }
}

sink {
  Console {}
}

### Running Command

```shell
./seatunnel.sh --config /tmp/mongo_to_console.conf

Error Exception

Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException: ErrorCode:[COMMON-07], ErrorDescription:[Unsupported data type] - Unable to convert to integer from unexpected value 'BsonDouble{value=102491.0}' of type DOUBLE
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters.convertToInt(BsonToRowDataConverters.java:376)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters.access$400(BsonToRowDataConverters.java:54)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters$6.apply(BsonToRowDataConverters.java:142)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters$6.apply(BsonToRowDataConverters.java:137)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters$2.apply(BsonToRowDataConverters.java:91)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters$2.apply(BsonToRowDataConverters.java:83)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.BsonToRowDataConverters$1.convert(BsonToRowDataConverters.java:71)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer.deserialize(DocumentRowDataDeserializer.java:71)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer.deserialize(DocumentRowDataDeserializer.java:32)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader.pollNext(MongodbReader.java:99)
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
    at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
    at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
    at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
    at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:218)
    ... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

java

Screenshots

image

Are you willing to submit PR?

Code of Conduct