lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 366 forks source link

GCS Source connector: Reached the end of stream with xx bytes left to read #1378

Closed replikeit closed 1 month ago

replikeit commented 1 month ago

What version of the Stream Reactor are you reporting this issue for?

Release 8.1.4

Are you running the correct version of Kafka/Confluent for the Stream Reactor release?

I am running on Aiven Apache Kafka 3.8.0. My Kafka Connect is deployed using Strimzi on Kubernetes.

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes, I am using GCS (Google Cloud Storage) as the data source and Kafka as the sink.

Have you read the docs?

Yes, I have read the documentation.

What is the expected behaviour?

I expect the connector to transfer Parquet files from GCS to a Kafka topic.

What was observed?

I encountered the following error:
java.io.EOFException: Reached the end of stream with 8861 bytes left to read

What is your Connect cluster configuration (connect-avro-distributed.properties)?

group.id: test-cluster
auto.create.topics.enable: true
offset.storage.topic: test-cluster-offsets
config.storage.topic: test-cluster-configs
status.storage.topic: test-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
publication.autocreate.mode: "filtered"
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider

What is your connector properties configuration (my-connector.properties)?

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: eth-etl-gcs-source-parquet-connector0
  labels:
    strimzi.io/cluster: lambda-kafka-connect
spec:
  class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
  tasksMax: 1
  config:
    connect.gcpstorage.kcql: "
    insert into eth-etl-tokens-from-parquet select * from data-lambda-ethereum-etl-tokens:tokens_parquet2 BATCH=10 STOREAS `Parquet` LIMIT 100;
    "
    topics: "eth-etl-tokens-from-parquet"
    connect.gcpstorage.gcp.auth.mode: "File"
    connect.gcpstorage.gcp.file: "${env:GOOGLE_APPLICATION_CREDENTIALS}"
    connect.gcpstorage.gcp.project.id: "p2p-data-lambda"
    connect.gcpstorage.error.policy: "THROW"
    connect.gcpstorage.http.socket.timeout: 300000
    connect.gcpstorage.source.extension.includes: "parquet"
    connect.gcpstorage.source.partition.search.continuous: true
    connect.gcpstorage.source.partition.search.interval: 300000
    connect.gcpstorage.source.partition.search.recurse.levels: 0
    errors.log.enable: "true"
    errors.log.include.messages: "true"
    log4j.logger.io.lenses.streamreactor.connect: "DEBUG"
    log4j.logger.org.apache.parquet: "DEBUG"
    log4j.logger.org.apache.hadoop: "DEBUG"
    log4j.logger.io.lenses.streamreactor.connect.cloud.common: "DEBUG"
    log4j.logger.io.lenses.streamreactor.connect.cloud.common.formats.reader: "DEBUG"
    log4j.logger.com.google.cloud: "DEBUG"
    log4j.logger.com.google.auth: "DEBUG"
    log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask: "DEBUG"
    log4j.logger.org.apache.kafka.connect.runtime.WorkerTask: "DEBUG"

Please provide full log files (redact and sensitive information)

java.io.EOFException: Reached the end of stream with 8861 bytes left to read
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:126)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:2165)
    at org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1199)
    at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1101)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1051)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1296)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:245)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:140)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.<init>(ParquetReaderIteratorAdaptor.scala:25)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.<init>(ParquetStreamReader.scala:35)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader$.apply(ParquetStreamReader.scala:76)
    at io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection$.toStreamReader(FormatSelection.scala:196)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
    at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
    at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
    at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
    at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
    at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
    at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
replikeit commented 1 month ago

Parquet example:

❯ parquet meta ~/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet                                                                        20:23:49 

File path:  /Users/alinaglumova/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet
Created by: parquet-cpp-arrow version 13.0.0
Properties: (none)
Schema:
message schema {
  required binary address (STRING);
  optional binary symbol (STRING);
  optional binary name (STRING);
  optional binary decimals (STRING);
  optional binary total_supply (STRING);
  required int64 block_timestamp (TIMESTAMP(MICROS,false));
  required int64 block_number;
  required binary block_hash (STRING);
}

Row group 0:  count: 1068  159.64 B records  start: 4  total(compressed): 166.502 kB total(uncompressed):166.502 kB 
--------------------------------------------------------------------------------
                 type      encodings count     avg size   nulls   min / max
address          BINARY    _ _ R     1068      47.51 B    0       "0x005c97569a24303e9ba6de6..." / "0xffffe5b9cb42b4996997c92..."
symbol           BINARY    _ _ R     1068      6.21 B     10      "" / "��"
name             BINARY    _ _ R     1068      10.27 B    10      "" / "����������"
decimals         BINARY    _ _ R     1068      0.47 B     65      "0" / "9"
total_supply     BINARY    _ _ R     1068      6.24 B     9       "0" / "9999999999999999999900000..."
block_timestamp  INT64     _ _ R     1068      9.32 B     0       "2024-02-08T15:50:47.000000" / "2024-09-11T06:57:23.000000"
block_number     INT64     _ _ R     1068      9.32 B     0       "19184445" / "20725728"
block_hash       BINARY    _ _ R     1068      70.31 B    0       "0x0002376d87ff1bbe5310679..." / "0xffae2542617a1ee9204fb27..."
davidsloan commented 1 month ago

Hi there,

We've released a new version today, v8.1.10. We made some major changes to the parquet handling in order to resolve this issue with large parquet files.

https://github.com/lensesio/stream-reactor/releases/tag/8.1.10

Please would you give it a go, if this is still something that is useful to you, and report back?

Kind regards

David.