apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.75k stars 4.21k forks source link

[Bug]: PulsarIO write() complains of missing ClientUrl even when provided with withClientUrl #30152

Open ton3r opened 7 months ago

ton3r commented 7 months ago

What happened?

I want to write in an apache beam pipeline to a pulsar topic by using

pCollection.apply("Send to Pulsar", PulsarIO.write().withClientUrl(pulsarClientUrl).withTopic(pulsarTopic));

But this throws an 'IllegalStateException: Missing required properties: clientUrl'

During the debug, I see that PulsarIO.write() uses a private AutoValue_PulsarIO_Write.Builder().build() where the build() call checks the clientUrl

PulsarIO.Write build() {
            if (this.clientUrl == null) {
                String missing = " clientUrl";
                throw new IllegalStateException("Missing required properties:" + missing);
            } else {
                return new AutoValue_PulsarIO_Write(this.topic, this.clientUrl);
            }
        }

But I'm only ready to set this 'clientUrl' after the call of 'PulsarIO.write()' which checks the clientUrl

Seems not possible to create a PulsarIO.write()

Issue Priority

Priority: 3 (minor)

Issue Components

ton3r commented 7 months ago

I was able to "bend the spoon" by java reflection and set the package private things to accessible, or writing an own CustomPulsarWriter in the package org.apache.beam.sdk.io.pulsar; to extend abstract classes.. But now


Exception in thread "main" java.lang.IllegalArgumentException: org.apache.beam.sdk.io.pulsar.WriteToPulsarDoFn: Non-splittable, but annotated as @UnboundedPerElement
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2409)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:2415)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.inferBoundedness(DoFnSignatures.java:917)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:848)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1737)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:301)
    at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:614)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:403)
    at org.apache.beam.sdk.io.pulsar.PulsarIO$Write.expand(PulsarIO.java:180)
    at org.apache.beam.sdk.io.pulsar.PulsarIO$Write.expand(PulsarIO.java:151)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:360)
    at de.spx.bucketdataflow.PubSubToGcs.main(PubSubToGcs.java:103)