streamnative / pulsar-io-cloud-storage

Cloud Storage Connector integrates Apache Pulsar with cloud storage.
Apache License 2.0
28 stars 25 forks source link

[BUG] error while writing data in Parquet format using JsonSchema as schema format #439

Open grandimk opened 2 years ago

grandimk commented 2 years ago

Describe the bug I was using the Cloud Storage Sink to collect data from Pulsar and write it to AWS S3 in Parquet. Messages were produced using a JsonSchema format. The Sink fails as soon as it tries to convert the collected data into org.apache.avro.generic.GenericRecord (within the convertGenericRecord function).

It tried to produce messages both from Python and from Java and both fail but with different stack traces.

Note: if the formatType specified in the configuration is json everything works fine.

To Reproduce Use this template configuration for the pulsar-io-cloud-storage v2.9.3.6:

tenant: "<theTenant>"
namespace: "schema-registry"
name: "cloud-storage-sink"
inputs: 
  - "persistent://<theTenant>/<theNamespace>/<theTopic>"
  - <otherTopicUrl>
archive: "connectors/pulsar-io-cloud-storage-2.9.3.6.nar"
parallelism: 1

configs:
  provider: "aws-s3"
  accessKeyId: "<yourAccessKeyId>"
  secretAccessKey: "<yourSecretAccessKey>"
  bucket: "<yourS3Bucket>"
  region: "<yourRegion>"
  pathPrefix: "cloud_storage_sink_parquet/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 100
  batchTimeMs: 600000
  withMetadata: false
  withTopicPartitionNumber: false

And produce messages in JsonSchema format. Here the code for a minimal Python producer:

import pulsar
from pulsar.schema import *

class YourMessageClass(Record):
    ...

def generate_message() -> YourMessageClass:
    ...

if __name__ == '__main__':
    client = pulsar.Client('pulsar://host.docker.internal:6650')

    producer = pulsar.Client.create_producer(
        topic='persistent://<theTenant>/<theNamespace>/<theTopic>',
        producer_name='python_producer',
        schema=pulsar.schema.JsonSchema(YourMessageClass)
    )

    for i in range(100):
        msg = generate_message()
        producer.send(msg)

    client.close()

Expected behavior A chunk of data containing a list of collected messages, written to the specified AWS S3 prefix in Parquet format.

Screenshots None

Additional context The tests were done on my laptop, using an Apache Pulsar Docker container where the schema-registry was properly configured (the schema definition of the messages have been uploaded) and the version pulsar-io-cloud-storage-2.9.3.6.nar was loaded.

This is the error occurred while writing data produced with the Python producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-02/180925500946.parquet
java.lang.NullPointerException: null
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:220) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]

This is the error occurred while writing data produced with the Java producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-08/64156074046.parquet
java.util.NoSuchElementException: No value present
    at java.util.Optional.get(Optional.java:148) ~[?:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:207) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apach≈e.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
alpreu commented 2 years ago

Hi @grandimk, thanks for opening this issue. Can you please provide more information about your record schema? It looks like this is where your conversion fails: https://github.com/streamnative/pulsar-io-cloud-storage/blob/branch-2.9.3.6/src/main/java/org/apache/pulsar/io/jcloud/util/AvroRecordUtil.java#L204-L207

grandimk commented 2 years ago

Hi @alpreu, I looked at the code in the AvroRecordUtil.java file but couldn't figure out why the conversion fails. This is the schema_info of the record I used for my tests:

{
  "type": "record",
  "name": "Quiz",
  "fields": [
    {
      "name": "audit",
      "type": {
        "type": "record",
        "name": "Audit",
        "fields": [
          {
            "name": "actor",
            "type": {
              "type": "record",
              "name": "Actor",
              "fields": [
                { "name": "actorId", "type": "string" },
                {
                  "name": "actorType",
                  "type": {
                    "type": "enum",
                    "name": "ActorType",
                    "symbols": ["person", "service"]
                  }
                },
                { "name": "ip", "type": ["null", "string"], "default": null }
              ]
            }
          },
          {
            "name": "producer",
            "type": {
              "type": "record",
              "name": "Producer",
              "fields": [
                { "name": "code", "type": "string" },
                {
                  "name": "instanceId",
                  "type": ["null", "string"],
                  "default": null
                },
                { "name": "producerType", "type": "string" },
                {
                  "name": "version",
                  "type": ["null", "string"],
                  "default": null
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "Metadata",
        "fields": [
          { "name": "eventId", "type": "string" },
          { "name": "eventTimestamp", "type": "long" },
          { "name": "eventType", "type": ["null", "string"], "default": null }
        ]
      }
    },
    {
      "name": "payload",
      "type": {
        "type": "record",
        "name": "QuizPayload",
        "fields": [
          {
            "name": "categoryTreeNodesIds",
            "type": ["null", { "type": "array", "items": "long" }],
            "default": null
          },
          { "name": "id", "type": "string" },
          { "name": "isHidden", "type": "boolean" },
          {
            "name": "knowledgeGraphNodesIds",
            "type": ["null", { "type": "array", "items": "long" }],
            "default": null
          },
          {
            "name": "properties",
            "type": {
              "type": "record",
              "name": "ContentProperties",
              "fields": [
                {
                  "name": "authorId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "copiedFrom",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "createdAt",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "creatorId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "ownerId",
                  "type": ["null", "string"],
                  "default": null
                },
                {
                  "name": "permission",
                  "type": {
                    "type": "record",
                    "name": "ContentPermission",
                    "fields": [
                      {
                        "name": "teams",
                        "type": [
                          "null",
                          { "type": "array", "items": "string" }
                        ],
                        "default": null
                      }
                    ]
                  }
                },
                {
                  "name": "version",
                  "type": ["null", "string"],
                  "default": null
                }
              ]
            }
          },
          {
            "name": "questions",
            "type": { "type": "array", "items": "string" }
          },
          {
            "name": "questionsNumber",
            "type": ["null", "long"],
            "default": null
          },
          { "name": "quizId", "type": ["null", "string"], "default": null },
          {
            "name": "showDetailedFeedback",
            "type": ["null", "boolean"],
            "default": null
          },
          { "name": "slug", "type": ["null", "string"], "default": null },
          { "name": "title", "type": "string" }
        ]
      }
    }
  ]
}

As additional note, I want to point out that we define our events using JSON schema and then generate both the related Python and Java classes.

alpreu commented 2 years ago

@grandimk Thanks for providing the record schema. I had another look but I cannot see an immediate issue either. Would you be able to create a unit test in ParquetFormatTest that recreates the issue from your generated java record/schema?