lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
988 stars 362 forks source link

S3 source connector: Reached the end of stream with xx bytes left to read #1255

Open JKCai opened 1 month ago

JKCai commented 1 month ago

What version of the Stream Reactor are you reporting this issue for?

Build from the master branch on May 26, 2024 by myself.

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Running Kafka cluster (MSK) on AWS, under version 2.8.2.tiered

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Have you read the docs?

Yes

What is the expected behaviour?

Restore the backup files into Kafka Topic. It should restore all the messages into Kafka topic without any errors.

What was observed?

java.io.EOFException: Reached the end of stream with 8388608 bytes left to read

What is your Connect cluster configuration (connect-avro-distributed.properties)?

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
exactly.once.source.support=enabled

What is your connector properties configuration (my-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test_topic SELECT * FROM test-bucket:aaaaa/tiered_3_partition_10GiB_format_test STOREAS `Parquet` LIMIT 1000 PROPERTIES('store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=3
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Please provide full log files (redact and sensitive information)

2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] java.io.EOFException: Reached the end of stream with 8388608 bytes left to read
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1850)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:990)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:940)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1082)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:130)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.next(ParquetReaderIteratorAdaptor.scala:33)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.next(ParquetStreamReader.scala:45)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.next(ParquetStreamReader.scala:31)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:53)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:34)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.accumulate(ResultReader.scala:56)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.retrieveResults(ResultReader.scala:45)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$12(ReaderManager.scala:78)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at set @ io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery$.$anonfun$run$9(PartitionDiscovery.scala:56)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$10(ReaderManager.scala:56)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-06-03T20:23:47.000+10:00   [Worker-0cca08d722bc3578f] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
JKCai commented 1 month ago

The parquet file that generated by S3 Sink connector seems valid to us. See below for a few validations:

# Full scan of file
% parquet scan 000000049999_1717383243638_1717383465319.parquet
Scanned 50000 records from 1 file(s)
Time: 0.27 s

# Metadata

% parquet meta 000000049999_1717383243638_1717383465319.parquet

File path:  000000049999_1717383243638_1717383465319.parquet
Created by: parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)
Properties:
  parquet.avro.schema: {"type":"record","name":"ConnectDefault","namespace":"lshaded.confluent.connect.avro","fields":[{"name":"key","type":["null","bytes"],"default":null},{"name":"value","type":["null","bytes"],"default":null},{"name":"metadata","type":["null",{"type":"record","name":"ConnectDefault2","fields":[{"name":"timestamp","type":"long"},{"name":"topic","type":"string"},{"name":"partition","type":"int"},{"name":"offset","type":"long"}]}],"default":null}]}
    writer.model.name: avro
Schema:
message lshaded.confluent.connect.avro.ConnectDefault {
  optional binary key;
  optional binary value;
  optional group metadata {
    required int64 timestamp;
    required binary topic (STRING);
    required int32 partition;
    required int64 offset;
  }
}

Row group 0:  count: 25886  5.052 kB records  start: 4  total(compressed): 127.699 MB total(uncompressed):127.699 MB
--------------------------------------------------------------------------------
                    type      encodings count     avg size   nulls   min / max
key                 BINARY    _ _ R     25886     0.64 B     0       "0x6B65792D31" / "0x6B65792D3936"
value               BINARY    _   _     25886     5.035 kB
metadata.timestamp  INT64     _   _     25886     8.00 B     0       "1717383243638" / "1717383358522"
metadata.topic      BINARY    _ _ R     25886     0.01 B     0       "tiered_3_partition_10GiB_..." / "tiered_3_partition_10GiB_..."
metadata.partition  INT32     _ _ R     25886     0.00 B     0       "0" / "0"
metadata.offset     INT64     _   _     25886     8.00 B     0       "0" / "25885"

Row group 1:  count: 24114  5.052 kB records  start: 133901686  total(compressed): 118.959 MB total(uncompressed):118.959 MB
--------------------------------------------------------------------------------
                    type      encodings count     avg size   nulls   min / max
key                 BINARY    _ _ R     24114     0.64 B     0       "0x6B65792D31" / "0x6B65792D3936"
value               BINARY    _   _     24114     5.035 kB
metadata.timestamp  INT64     _   _     24114     8.00 B     0       "1717383358524" / "1717383465319"
metadata.topic      BINARY    _ _ R     24114     0.01 B     0       "tiered_3_partition_10GiB_..." / "tiered_3_partition_10GiB_..."
metadata.partition  INT32     _ _ R     24114     0.00 B     0       "0" / "0"
metadata.offset     INT64     _   _     24114     8.00 B     0       "25886" / "49999"

# Show schema
% parquet schema 000000049999_1717383243638_1717383465319.parquet
{
"type" : "record",
"name" : "ConnectDefault",
"namespace" : "lshaded.confluent.connect.avro",
"fields" : [ {
  "name" : "key",
  "type" : [ "null", "bytes" ],
  "default" : null
}, {
  "name" : "value",
  "type" : [ "null", "bytes" ],
  "default" : null
}, {
  "name" : "metadata",
  "type" : [ "null", {
    "type" : "record",
    "name" : "ConnectDefault2",
    "fields" : [ {
      "name" : "timestamp",
      "type" : "long"
    }, {
      "name" : "topic",
      "type" : "string"
    }, {
      "name" : "partition",
      "type" : "int"
    }, {
      "name" : "offset",
      "type" : "long"
    } ]
  } ],
  "default" : null
} ]
}

# Cat/print first message in the Parquet file.
% parquet cat 000000049999_1717383243638_1717383465319.parquet -n 1
{"key": "key-53", "value": "\u0003\u0000\u009D['åÿ\u0089Iÿ\u008F\u009Fì\u009A<¤T#\u0016\u0080(ufefxaamxdewurnjl[...]kedqceuespwsba\u0000\u0010\u0096Dð\u0001\u0000", "metadata": {"timestamp": 1717383243638, "topic": "tiered_3_partition_10GiB_format_test", "partition": 0, "offset": 0}}
JKCai commented 1 month ago

Sink connector config that I used

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:aaaaa SELECT * FROM tiered_3_partition_10GiB_format_test STOREAS `JSON` PROPERTIES('flush.count'=50000,'flush.interval'=600,'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=3
topics=tiered_3_partition_10GiB_format_test
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter