haoch / flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
Apache License 2.0
243 stars 96 forks source link

Add support for flink 1.9 #52

Closed salvalcantara closed 4 years ago

salvalcantara commented 4 years ago

I am running some experiments lately using flink-siddhi and I saw that flink 1.9 is not supported. Is suppoting flink 1.9 in the near-feature roadmap? If so, I might be interested in collaborating with this.

haoch commented 4 years ago

Hi @salvalcantara Great to know that you are interested in collaborating on flink 1.9, your contribution is very welcome.

salvalcantara commented 4 years ago

@haoch Thanks for assigning, I will hopefully work on this during next week.

haoch commented 4 years ago

Good to know that and look forward to your PR @salvalcantara 👍

salvalcantara commented 4 years ago

Ey @haoch just to inform that I've been busier than expected....I will finally start working on this next week. Also (off-topic), have you considered offering a UDF as part of your library (for instance, a coprocess function?) ... I have been experimenting with this approach lately and seems a plausible alternative to relying on the lower-level Operator API.

salvalcantara commented 4 years ago

Hi @haoch, the offensive lines when trying to run on Flink 1.9 come from the DynamicPartitioner class, which implements the StreamPartitioner interface. The thing is that, in Flink 1.9, the selectChannels method has changed to selectChannel as illustrated below:

    // Original: working for Flink 1.7
    //@Override
    public int[] selectChannels(SerializationDelegate<StreamRecord<Tuple2<StreamRoute, Object>>> streamRecordSerializationDelegate,
                                int numberOfOutputChannels) {
        Tuple2<StreamRoute, Object> value = streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            // send to all channels
            int[] channels = new int[numberOfOutputChannels];
            for (int i = 0; i < numberOfOutputChannels; ++i) {
                channels[i] = i;
            }
            return channels;
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfOutputChannels);
        } else {
            returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
        }
        return returnChannels;
    }

    // New: required by Flink 1.9
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<Tuple2<StreamRoute, Object>>> streamRecordSerializationDelegate) {
        Tuple2<StreamRoute, Object> value = streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            /* 
            It is illegal to call this method for broadcast channel selectors and this method can remain not 
            implemented in that case (for example by throwing UnsupportedOperationException).
            */
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfChannels);
        } else {
            returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
        }
        //return returnChannels;
        return returnChannels[0];
    }

The dynamic partitioner is used like this:

public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
    DataStream<Tuple2<StreamRoute, Object>> unionStream = controlStream
        .map(new NamedControlStream(ControlEvent.DEFAULT_INTERNAL_CONTROL_STREAM))
        .broadcast()
        .union(this.toDataStream())
        .transform("add route transform",
             SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)),
             new AddRouteOperator(getCepEnvironment().getDataStreamSchemas()));

    DataStream<Tuple2<StreamRoute, Object>> partitionedStream = new DataStream<>(
        unionStream.getExecutionEnvironment(),
        new PartitionTransformation<>(unionStream.getTransformation(), new DynamicPartitioner())
    );

    return new ExecutionSiddhiStream(partitionedStream, null, getCepEnvironment());
}
salvalcantara commented 4 years ago

@haoch Closing the ticket since support was already added in #59.