numaproj / numaflow-java

Numaflow Java SDK
Apache License 2.0
22 stars 10 forks source link

Numaflow reducer doesn't output grouped message right after fixed time window #81

Open borkox opened 11 months ago

borkox commented 11 months ago

I have a numaflow v0.10.1 pipeline with a reduce vertex numaflow-java 0.5.5. I am using keyed messages with size one and 2 partitions. After several messages entering the group node I expect when the fixed time window gets expired my vertex to output one message with grouped things, but my method is called after many minutes or only when new messages are coming. If no messages are coming, then things in memory of the vertex are not output.

    - imagePullSecrets:
        - name: gitlab-registry
      name: group-urls
      partitions: 2
      scale:
        min: 1
      udf:
        container:
          args:
            - '-cp'
            - numaflow-app.jar
            - mypackage.GroupUrlReduceFactory
          command:
            - /usr/bin/java
          image: >-
            myimage
          imagePullPolicy: Always
          resources: {}
        groupBy:
          keyed: true
          storage:
            emptyDir: {}
          window:
            fixed:
              length: 300s

My group handler looks like this (Ideleted imports and actual code trying to make it anonymous)

package mypackage;

@Slf4j
@NoArgsConstructor
public class GroupUrlReduceFactory extends ReducerFactory<GroupUrlReduceFactory.GroupUrlHandler> {

    public static void main(String[] args) throws Exception {
        new Server(new GroupUrlReduceFactory()).start();
    }

    @Override
    public GroupUrlHandler createReducer() {
        return new GroupUrlHandler();
    }

    public static class GroupUrlHandler extends Reducer {

        @Override
        public void addMessage(String[] strings, Datum datum, Metadata metadata) {
        }

        @Override
        public MessageList getOutput(String[] strings, Metadata md) {
        }

    }
}

The method getOutput(...) is not invoked from akka actors if new messages are not coming to the vertex. I think this is a bug.

Sometimes getOutput(...) is invoked after 20 minutes, sometimes it is invoked after 2 hours. Sometimes it is not invoked for a whole day (when new messages are not coming in the vertex)

yhl25 commented 11 months ago

Hi @borkox, the behaviour you're experiencing is due to the way the watermark progresses in the current implementation. The watermark, which is used to determine when a window has expired, stops progressing when the source is idling. This means that if no new messages are coming into the vertex, the watermark will not advance and thus the window will not expire, causing getOutput() not to be invoked.

This is not an issue with the SDK itself, but rather a behaviour that needs to be addressed on the numaflow side. We are aware of this and are actively working on a solution. You can track the progress of this issue at https://github.com/numaproj/numaflow/issues/633. It's a work in progress and we will update as soon as we have a fix available.

borkox commented 11 months ago

Thanks for explanation. Why this cannot be addressed in this project by putting a call from time to time to check expired time windows and fire getOutput() respectively ?

borkox commented 11 months ago

And another question how to workaround this problem. If I generate a message towards my grouping vertex, does it have to be keyed, because I have 2 partitions at the moment? How a random message without keys would help me if it goes to the wrong pod? What about the other pod, it will be silent I assume ?

borkox commented 11 months ago

I also made few more tests and I put in the pipeline same message again and again and it seems reducer still keep old data inside and doesn't fire getOutput() even window of 2 minutes expires. Seems like random behavior. Any ideas ?

2023-11-08 17:03:08.818 [reduce-akka.actor.default-dispatcher-10] INFO  PageErrorsReduceFactory  - 1397898547 Creating reducer with capacity: 100000
2023-11-08 17:03:08.818 [reduce-akka.actor.default-dispatcher-11] INFO  PageErrorsReduceFactory  - 1344905083 Creating reducer with capacity: 100000
2023-11-08 17:03:08.854 [reduce-akka.actor.default-dispatcher-8] INFO  PageErrorsReduceFactory  - 1344905083 Handle incoming message: id: 1695653350502, orgId: 986c6b48-9954-48ef-b6dd-fee4f5c48cab, projectId: d16b5d0e-6a20-4003-a73f-07bc2b43f064
2023-11-08 17:03:08.854 [reduce-akka.actor.default-dispatcher-9] INFO  PageErrorsReduceFactory  - 1397898547 Handle incoming message: id: 1695653350502, orgId: 986c6b48-9954-48ef-b6dd-fee4f5c48cab, projectId: d16b5d0e-6a20-4003-a73f-07bc2b43f064
2023-11-08 17:07:43.322 [reduce-akka.actor.default-dispatcher-15] INFO  PageErrorsReduceFactory  - 606067806 Creating reducer with capacity: 100000
2023-11-08 17:07:43.323 [reduce-akka.actor.default-dispatcher-14] INFO  PageErrorsReduceFactory  - 606067806 Handle incoming message: id: 1695653350502, orgId: 986c6b48-9954-48ef-b6dd-fee4f5c48cab, projectId: d16b5d0e-6a20-4003-a73f-07bc2b43f064
2023-11-08 17:09:24.391 [reduce-akka.actor.default-dispatcher-15] INFO  PageErrorsReduceFactory  - 1344905083 Output message for window - 2023-11-08T17:00:00Z 2023-11-08T17:02:00Z, keys: [d16b5d0e-6a20-4003-a73f-07bc2b43f064]
2023-11-08 17:09:24.392 [reduce-akka.actor.default-dispatcher-17] INFO  PageErrorsReduceFactory  - 1397898547 Output message for window - 2023-11-08T17:02:00Z 2023-11-08T17:04:00Z, keys: [d16b5d0e-6a20-4003-a73f-07bc2b43f064]
2023-11-08 17:09:24.401 [reduce-akka.actor.default-dispatcher-15] INFO  PageErrorsReduceFactory  - 1344905083 Outputting 1 group messages
2023-11-08 17:09:24.403 [reduce-akka.actor.default-dispatcher-17] INFO  PageErrorsReduceFactory  - 1397898547 Outputting 1 group messages

If you check those logs it is visible that after incoming message still 2 more minutes needs to pass(from 17:07:43 to 17:09:24) in addition to initial 2 minutes for the fixed window. To me this seems like random behavior.

Also it is a problem visible on first two rows that 2 messages trigger creation of 2 different reducers. Number after "PageErrorsReduceFactory - " represents hashCode of the created reducer from the factory.

yhl25 commented 11 months ago

In Numaflow the progression of time is data-driven rather than being based on the clock time. This means that the watermark progression or window closure is not dependent on clock-time but rather on the incoming data flow. Is the data coming in at a very high rate? If so, could you please provide some insights into the transactions per second (TPS) you're currently experiencing?

vigith commented 11 months ago

@borkox With idle source watermark, you should be able to fix this (Numaflow 1.1 release). The ideal generic solution will be through early triggers, which will be possible via a streaming-reduce concept, which should be available in the Numaflow 1.2 release.

borkox commented 11 months ago

In Numaflow the progression of time is data-driven rather than being based on the clock time. This means that the watermark progression or window closure is not dependent on clock-time but rather on the incoming data flow. Is the data coming in at a very high rate? If so, could you please provide some insights into the transactions per second (TPS) you're currently experiencing?

We are in testing mode so far, so I am testing with 3 to 4 messages for two hours.

borkox commented 11 months ago

@borkox With idle source watermark, you should be able to fix this (Numaflow 1.1 release). The ideal generic solution will be through early triggers, which will be possible via a streaming-reduce concept, which should be available in the Numaflow 1.2 release.

I tried to attach generator to the reducer as another source but seems I cannot control the watermark. The watermark on messages that are generated with user defined generator are always "-1" which is "1969-12-31T23:59:59.999Z"

whynowy commented 11 months ago

@borkox With idle source watermark, you should be able to fix this (Numaflow 1.1 release). The ideal generic solution will be through early triggers, which will be possible via a streaming-reduce concept, which should be available in the Numaflow 1.2 release.

I tried to attach generator to the reducer as another source but seems I cannot control the watermark. The watermark on messages that are generated with user defined generator are always "-1" which is "1969-12-31T23:59:59.999Z"

@borkox - attaching another source won't help - when we calculate the watermark, we need to check all the sources. Before we have v1.1, if you do not have continuous streaming data coming to the pipeline, one hacky workaround is sending some dummy data to the pipeline and filtering them out before the reduce, then the watermark will be moving hence the windows can be closed.

syayi commented 11 months ago

@borkox thank you for being an early adopter of Numaflow. As we work through upcoming releases, we love to learn about our community use cases and take feedback, wanted to see if you'd be interested to chat with Numaflow team?

borkox commented 11 months ago

@borkox thank you for being an early adopter of Numaflow. As we work through upcoming releases, we love to learn about our community use cases and take feedback, wanted to see if you'd be interested to chat with Numaflow team?

I would love to chat, have discussion or video call. My email is borkox (at) gmail.com, I am Sofia time Zone (EET), so any working time from 9 AM to 18PM is fine for me.

borkox commented 11 months ago

@borkox thank you for being an early adopter of Numaflow. As we work through upcoming releases, we love to learn about our community use cases and take feedback, wanted to see if you'd be interested to chat with Numaflow team?

I tried several examples, summing odd even numbers and so on. I think reduce didn't worked as expected no matter what I do. The only case which it worked not so bad is to have a transformer which sets the time of the message which magically is converted to a watermark. image image

borkox commented 11 months ago

So things around this watermark are getting more weird and weird, seems nothing works as expected around reducer. So I have 2 messages with number 32 which goes to transformer to add new time (hoping it is converted to a watermark). So this is log from "in" tranformer image

my pipeline is this: image

So system never see message "32" before time "2023-11-14 20:26:31.499" and let me show in "reducer" how this message is received: image

I restarted the pod so messages are reposted. Watermark is "2023-11-14T14:50:33.227Z" when I expected "2023-11-14 20:26:31.499" which makes several hours of mistake. Why my watermark is so much in the past. Numbers "32" where never presented to the pipeline at 14:50 but at 20:26.

syayi commented 11 months ago

@borkox thank you for being an early adopter of Numaflow. As we work through upcoming releases, we love to learn about our community use cases and take feedback, wanted to see if you'd be interested to chat with Numaflow team?

I would love to chat, have discussion or video call. My email is borkox (at) gmail.com, I am Sofia time Zone (EET), so any working time from 9 AM to 18PM is fine for me.

Perfect will schedule a call soon. Meanwhile one of us will be helping with you some of the queries above

whynowy commented 11 months ago

So things around this watermark are getting more weird and weird, seems nothing works as expected around reducer. So I have 2 messages with number 32 which goes to transformer to add new time (hoping it is converted to a watermark). So this is log from "in" tranformer image

my pipeline is this: image

So system never see message "32" before time "2023-11-14 20:26:31.499" and let me show in "reducer" how this message is received: image

I restarted the pod so messages are reposted. Watermark is "2023-11-14T14:50:33.227Z" when I expected "2023-11-14 20:26:31.499" which makes several hours of mistake. Why my watermark is so much in the past. Numbers "32" where never presented to the pipeline at 14:50 but at 20:26.

The watermark is correct as expected.

Watermark is not the same as the event time in the message, but an approximate timestamp that guarantees no data earlier that timestamp will be seen in the pipeline. Here is a video that explains event time and watermark in flink, that is not the way we calculate the wartermark, but it might be helpful for you to understand what watermark is.