apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Bug]: Side Input Singleton View throw error when impulse period is short: PCollection with more than one element accessed as a singleton view #26465

Open ballooncross opened 1 year ago

ballooncross commented 1 year ago

What happened?

I am writing a pipeline to consume message from pubsub, do some validation, transform and sink to bigquery.

I need to load some data from external api call to be used in pipeline validation stage, for which I followed Slowly updating global window side inputs to load config into side input. However, I keep getting error, while I already applied Latest.globally():

Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value

Going through online resource doesn't really help. However, I found that this happen only when the impulse duration is short, e.g. < 5s: GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))

Is this expected?

This bothers me, because I am not sure if the short period the root cause. Or will the error shows up again if the pipeline traffic becomes large even I have a minute as impulse period in product environment.

Any one have the same issue, or suggestion?

Beam version: 2.46.0

Here are simplied version my code, that can reproduce the error:


import static com.applovin.array.silk.pipeline.common.Constants.CONFIG_DEV;
import static com.applovin.array.silk.pipeline.common.Constants.CONFIG_PROD;
import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel;

import com.applovin.array.silk.pipeline.EventTransformer.EventTransform;
import com.applovin.array.silk.pipeline.EventTransformer.ExtractMessage;
import com.applovin.array.silk.pipeline.coders.FailureCoder;
import com.applovin.array.silk.pipeline.coders.JsonObjectCoder;
import com.applovin.array.silk.pipeline.coders.MessageContainerCoder;
import com.applovin.array.silk.pipeline.common.UncaughtExceptionLogger;
import com.applovin.array.silk.pipeline.models.Failure;
import com.applovin.array.silk.pipeline.models.MessageContainer;
import com.applovin.array.silk.pipeline.models.StartOptions;
import com.applovin.array.silk.pipeline.models.SystemConfigs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSideInputPipeline {

    private static final Logger logger = LoggerFactory.getLogger(TestSideInputPipeline.class);
    private static final FailureCoder<MessageContainer> FAILURE_CODER = FailureCoder.of(
            MessageContainerCoder.of());
    private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
    public static SystemConfigs systemConfigs;
    private static final CloseableHttpClient httpClient = HttpClients.createDefault();

    public static void main(String[] args) throws IOException {
        UncaughtExceptionLogger.register();

        StartOptions options = loadOptions(args);

        run(options);
    }

    public static StartOptions loadOptions(String[] args) throws IOException {
        StartOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(
                StartOptions.class);

        ClassLoader classLoader = TestSideInputPipeline.class.getClassLoader();
        InputStream configStream = classLoader.getResourceAsStream(
                "prod".equalsIgnoreCase(options.getEnv()) ? CONFIG_PROD : CONFIG_DEV);
        systemConfigs = mapper.readValue(configStream, SystemConfigs.class);

        options.setDefaultSdkHarnessLogLevel(LogLevel.valueOf(
                systemConfigs.getLogLevel().toUpperCase()
        ));

        return options;
    }

    public static void run(StartOptions options) {
        Pipeline pipeline = Pipeline.create(options);
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForClass(JsonObject.class, JsonObjectCoder.of());
        coderRegistry.registerCoderForType(FAILURE_CODER.getEncodedTypeDescriptor(), FAILURE_CODER);
        coderRegistry.registerCoderForClass(MessageContainer.class, MessageContainerCoder.of());

        // Pull Pubsub message with PubsubIO, which automatically acknowledge message after complete processing.
        // https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub
        PCollection<PubsubMessage> messages = pipeline.apply("ReadPubSubMessages",
                PubsubIO.readMessagesWithAttributes()
                        .fromSubscription(systemConfigs.getIngestSubscription()));

//        // extract message from pubsub structure
//        ExtractMessage extractMessage = new ExtractMessage();
//        PCollectionTuple extractMessageResult = messages.apply("ExtractMessage",
//                ParDo.of(extractMessage).withOutputTags(extractMessage.getOutputTag(),
//                        TupleTagList.of(extractMessage.getFailuresTag())));
//
//        // do some transform
//        EventTransform eventTransform = new EventTransform();
//        PCollectionTuple transformResult = extractMessageResult.get(extractMessage.getOutputTag())
//                .apply("EventTransform", ParDo.of(eventTransform)
//                        .withOutputTags(eventTransform.getOutputTag(),
//                                TupleTagList.of(eventTransform.getFailuresTag())));
//
//        PCollection<MessageContainer> transformedEvents = transformResult.get(
//                eventTransform.getOutputTag());

        // get side input view (json schemas), and validate
        PCollectionView<Map<String, String>> map = pipeline.apply("Impulse",
                        GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
                    @ProcessElement
                    public void process(OutputReceiver<Map<String, String>> o) {
                        try {
                            CloseableHttpResponse response = httpClient.execute(
                                    new HttpGet(systemConfigs.getSchemaUrl()));
                            String body = EntityUtils.toString(response.getEntity());
                            Map<String, String> schema = mapper.readValue(body, Map.class);
                            logger.info("Loader got schema: {}", schema);
                            o.output(schema);
                        } catch (Exception e) {
                            logger.error("Failed, {}", e.getMessage());
                            o.output(Map.of());
                        }
                    }
                }))
                .apply(Window.<Map<String, String>>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                        .discardingFiredPanes())
                .apply(Latest.globally())
                .apply(View.asSingleton());

        PCollection<PubsubMessage> validateEventResult = messages.apply("ValidateEvent",
                ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
                    @ProcessElement
                    public void processElement(ProcessContext ctx) {
                        PubsubMessage message = ctx.element();
                        Map<String, String> schemas = ctx.sideInput(map);
                        logger.info("Side input schemas: {}", schemas);

                        // validate message and save result.
                        ctx.output(message);
                    }
                }).withSideInputs(map));

        //more logic

        pipeline.run();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

github-actions[bot] commented 1 year ago

Label examples-java cannot be managed because it does not exist in the repo. Please check your spelling.

github-actions[bot] commented 1 year ago

Label cannot be managed because it does not exist in the repo. Please check your spelling.

github-actions[bot] commented 1 year ago

Label cannot be managed because it does not exist in the repo. Please check your spelling.

scwhittle commented 11 months ago

The issue here is that Latest.globally() is a separate triggered global Combine than the global Combine contained in View.asSingleton. Thus if Latest triggers multiple times, the subsequent singleton combine may observe more than 1 element when firing and fail.

See https://www.mail-archive.com/user@beam.apache.org/msg02129.html

A workaround is to use View.asIterable and take the last element of the iterable when consuming the side input. There will be more than 1 element only if Latest triggers multiple times before the side input combine processes the output.

@kennknowles What do you think about adding a new transform View.asLatest() that is logically the same as Latest.globally() + View.asSingleton() but is implemented with a single combine and thus the side input view will always be a single latest value each time it is calculated?

kennknowles commented 11 months ago

Just skimming this last comment, it sounds like this is related to the impetus to https://s.apache.org/beam-triggered-side-inputs

Right now we have a combination of

  1. "definition is whatever happens when you run it" situation with side inputs, always precarious
  2. side inputs pretty much unaware of triggers (as most of the model is) so they don't intelligently respect panes (they see two firings are just two separate elements rather than a revision to the element)
  3. no clear way to really manage deltas versus full replacement values on triggering; presumably side inputs would do this intelligently transparently

I am totally happy with a View.latest(). In my doc I propose that as the semantics for View.asSingleton :-)

If you can implement it without runner changes, or have the bandwidth to make the necessary runner changes, I would just replace View.asSingleton with that. Otherwise, having it as a stopgap until View.asSingleton can be adjusted is great.

https://s.apache.org/beam-side-inputs-1-pager

yelianevich commented 7 months ago

@ballooncross Looking at your code I can suggest an alternative approach by using a custom global combiner that will eliminate subsequent Latest.globally() and View.asSingleton() and will do everything in one step.

PCollectionView<Map<String, String>> map = pipeline.apply("Impulse", 
                                             GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
                .apply(Window.<Map<String, String>>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                        .discardingFiredPanes())
                .apply(Combine.globally(new MaxImpulseFn<>(functionToRead, coderOfFunctionOutput))
                     .withoutDefaults()
                     .asSingletonView());

MaxImpulseFn is a custom combiner (even though quite simple) that finds the max impulse and reads the external data in extractOutput

private static class MaxImpulseFn<T> extends CombineWithContext.CombineFnWithContext<Long, Long, T> {

       @Override
        public T extractOutput(Long accumulator, Context context) {
            return functionToRead.apply(accumulator, context.getPipelineOptions());
        }

        @Override
        public Coder<T> getDefaultOutputCoder(CoderRegistry registry, Coder<Long> inputCoder) {
            return coderOfFunctionOutput;
        }
}
IvanFroehlich commented 5 months ago

Hi, is there a plan to continue working on this? We are also trying to use similarly to this issue the SideInputs described in the https://beam.apache.org/documentation/patterns/side-inputs/ Docu on Google Dataflow and getting the same "java.lang.IllegalArgumentException: ...." After some time Dataflow running in the sideInput using transformation. Confusingly it occurs only in our dataflow Joby, in which we use several workers in parallel.

Can someone clarify what the plans to make this usable are?

Kind regards, Ivan Fröhlich SAP SE

liferoad commented 5 months ago

@IvanFroehlich are the workarounds working for you?

IvanFroehlich commented 5 months ago

@liferoad No, thats the issue. we have tried following meanwhile:

v2: replace the latest globally with custom combine function

...
pipeline
            .apply(GenerateSequence.from(0)
                .withRate(1, Duration.standardSeconds(envConfig.getConfigTtlInSeconds())))
            .apply(ParDo.of(doFunction)) //here we read SideInputDestinations(external Data)
            .apply(
                Window.<SideInputDestinations>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(Combine.globally(new CombineSerializableFunction())
            .apply(View.asSingleton());
...
  static class CombineSerializableFunction implements
      SerializableFunction<Iterable<SideInputDestinations>, SideInputDestinations> {

    @Override
    public @UnknownKeyFor @Nullable @Initialized SideInputDestinations apply(
        Iterable<SideInputDestinations> input) {
      SideInputDestinations last = null;
      Iterator<SideInputDestinations> iterator = input.iterator();
      while (iterator.hasNext()) {
        last = iterator.next();
        log.info("SideInput: processing iteration for object {}", last.hashCode());
      }
      return last;
    }
  }

=> same Exception

v3: replace the View.asSingleton() with asSingletonView():

...
pipeline
            .apply(GenerateSequence.from(0)
                .withRate(1, Duration.standardSeconds(envConfig.getConfigTtlInSeconds())))
            .apply(ParDo.of(doFunction)) //here we read SideInputDestinations(external Data)
            .apply(
                Window.<SideInputDestinations>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(Combine.globally(new CombineSerializableFunction()).asSingletonView());
...
  static class CombineSerializableFunction implements
      SerializableFunction<Iterable<SideInputDestinations>, SideInputDestinations> {

    @Override
    public @UnknownKeyFor @Nullable @Initialized SideInputDestinations apply(
        Iterable<SideInputDestinations> input) {
      SideInputDestinations last = null;
      Iterator<SideInputDestinations> iterator = input.iterator();
      while (iterator.hasNext()) {
        last = iterator.next();
        log.info("SideInput: processing iteration for object {}", last.hashCode());
      }
      return last;
    }
  }

=> same Exception in using ParDo:

      Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.

Should we try another version?

Kind Regards, Ivan

kennknowles commented 5 months ago

I believe the workaround is "use View.asIterable and take the last element of the iterable when consuming the side input."

yelianevich commented 5 months ago

Hi @IvanFroehlich. Have you tried the solution I posted above?