apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.91k stars 1.01k forks source link

[Improve] About the use optimization of AsyncEventBus #2620

Closed senlizishi closed 1 year ago

senlizishi commented 1 year ago

Search before asking

Description

I found that although asyncEventBus was created in ChangeEventBus, the thread pool parameter setting is relatively high, but the subscriber lacks the @AllowConcurrentEvents annotation, which will cause asynchronous multi-threading to be ineffective.

class ChangeEventBus {

  private val execPool = new ThreadPoolExecutor(
    Runtime.getRuntime.availableProcessors * 10,
    Runtime.getRuntime.availableProcessors * 20,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue[Runnable](2048))

  private[kubernetes] val asyncEventBus = new AsyncEventBus("[StreamPark][flink-k8s]AsyncEventBus", execPool)

  private[kubernetes] val syncEventBus = new EventBus("[StreamPark][flink-k8s]SyncEventBus")
}

In com.google.common.eventbus.Subscriber Subscriber is returned if there is @AllowConcurrentEvents concern on the subscriber method, otherwise SynchronizedSubscriber is returned.

static Subscriber create(EventBus bus, Object listener, Method method) {
        return (Subscriber)(isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new Subscriber.SynchronizedSubscriber(bus, listener, method));
    }

private static boolean isDeclaredThreadSafe(Method method) {
        return method.getAnnotation(AllowConcurrentEvents.class) != null;
    }

I made a simple example to verify the effect of this annotation EventBusTest . As a result, as mentioned above, no @AllowConcurrentEvents is created for synchronous subscribers, and there is no asynchronous effect.

Asynchronous if required. The way to modify is to add @AllowConcurrentEvents to the method that needs asynchronous. At the same time, adjust the thread pool parameters to make it more reasonable.

 private val execPool = new ThreadPoolExecutor(
    Runtime.getRuntime.availableProcessors * 2,
    Runtime.getRuntime.availableProcessors * 3,
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue[Runnable](1024))

The methods I have found to receive AsyncEvent are FlinkK8sChangeEventListener.subscribeMetricsChange, FlinkK8sChangeEventListener.subscribeCheckpointChange, BuildInEventListener.subscribeFlinkJobStateEvent. This requires further discussion and confirmation.

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

senlizishi commented 1 year ago

@Al-assad @MonsterChenzhuo cc

Al-assad commented 1 year ago

Wow, thank you for the reminder. It's our negligence that all asynchronous calls for Eventbus should indeed be marked with @AllowConcurrentEvents. Can you submit a corresponding PR?