[posproal] Task Resource Isolation #71

kaori-seasons opened 1 year ago

kaori-seasons commented 1 year ago

Hi, community. I am a developer who is new to rocketmq-eventbridge. I have some opinions on resource consumption in event push. The following is the draft I designed. Can you give me some input and supplements for application scenarios?

document: https://shimo.im/docs/25q5M9QVB8i80XqD/


Due to the large amount of data in the current process of pushing from the event source to the event target, it may be necessary to design a set of logic for reading event source data and multi-threaded push event target logic to meet the concurrency under a large amount of data consumption, and observability during backpressure.


The production end continuously obtains source data and puts it into the blocking queue. BlockQueue sets a length limit. If the consumer end cannot extract the content in time, it will block the production end and cannot continue to store data in it until there is free space. Judging the production end The end flag is, and the last acquired data is null, indicating that there is no new source data. The consumer side will extract the source data from the blockQueue for consumption. Of course, most of the time-consuming logic is in the specific processing details. It will not take too long to fetch the data itself, so the consumer side also adopts a single-threaded system. After fetching the source data , will be thrown to a thread pool executor ThreadPoolTaskExecutor, which will control the execution of specific tasks. ThreadPoolTaskExecutor also has a task queue. In order to prevent the queue from being too long and bursting the memory, there is an upper limit control. If it is less than the limit, the consumer thread will submit tasks to ThreadPoolTaskExecutor

Then a task manager is needed to manage the production and consumption logic of EventTargetPusher. And create corresponding blocking queues for different kinds of tasks (the tasks here can be high, medium and low priority)


private DataServiceDispatch dataServiceDispatch;
public void doHandle(DataContextParam dataContextParam) {
     if (DataUtils. switchOpen == false) {
     // production side
     new Thread(new Runnable() {
         public void run() {
             Object object = null;
             do {
                 if (DataUtils.consumerExecutor.getCurrentQueueSize() > DataUtils.consumerCurrentQueueSizeLimit) {
                     try {
                         Thread. sleep(100);
                     } catch (InterruptedException e) {
                 object = dataServiceDispatch.getDateService(dataContextParam.getBizType()).querySourceData();
                 if (object != null) {
                     try {
                     } catch (Exception e) {
                         logger. error("set queue error!", e);
                 // The task ends, the start switch is turned off
                 if (object == null) {
                     DataUtils. switchOpen = false;
             } while (object != null);
     // Consumer side
     new Thread(new Runnable() {
         public void run() {
             Object object = null;
             while (true) {
                 try {
                     if (DataUtils.consumerExecutor.getCurrentQueueSize() > DataUtils.consumerCurrentQueueSizeLimit) {
                         try {
                             Thread. sleep(100);
                         } catch (InterruptedException e) {
                     object = DataUtils.taskQueue.take();
                 } catch (Exception e) {
                     logger. error("take queue error!", e);
                 DataUtils.consumerExecutor.execute(new ConsumerTask(object, dataContextParam));
// consumer task
private class ConsumerTask implements Runnable {
     private Object object = null;
     private DataContextParam dataContextParam = null;
     public ConsumerTask(Object object, DataContextParam dataContextParam){
         this. object = object;
         this.dataContextParam = dataContextParam;
     public void run() {


public class DataUtils {
     // switch
     public static boolean switchOpen = false;
     // number of source data records
     public static AtomicInteger sourceDataSize = new AtomicInteger(0);
     // Number of records successfully processed
     public static AtomicInteger handleSuccessSize = new AtomicInteger(0);
     // Number of failed records processed
     public static AtomicInteger handleFailSize = new AtomicInteger(0);
     // task queue
     public static BlockingQueue<Object> taskQueue = new ArrayBlockingQueue<Object>(5);
     // Consumer thread executor
     public static ThreadPoolTaskExecutor consumerExecutor = null;
     // The upper limit of the queue length of the consumer thread executor
     public static int consumerCurrentQueueSizeLimit = 100;
     public static void setConsumerExecutor(ThreadPoolTaskExecutor consumerExecutor) {
         DataUtils.consumerExecutor = consumerExecutor;
     public static void setConsumerCurrentQueueSizeLimit(int consumerCurrentQueueSizeLimit) {
         DataUtils.consumerCurrentQueueSizeLimit = consumerCurrentQueueSizeLimit;
     public static void setCustomerMaxThreadSize(int maxThreadSize) {
     public static void setCustomerCorePoolSize(int coreThreadSize) {
     public static void resetRecordCount() {
         sourceDataSize = new AtomicInteger(0);
         handleSuccessSize = new AtomicInteger(0);
         handleFailSize = new AtomicInteger(0);

Custom thread pool ThreadPoolTaskExecutor

When the thread pool occupancy rate is relatively high, you can set tasks for the corresponding blocking queues and monitor the progress of push tasks

private BusinessControlManager businessControlManager;
 * 任务启动
 * url:http://localhost:8091/task/start?bizType=test
@RequestMapping(value = "/task/start")
public String startTask(HttpServletRequest request, HttpServletResponse response) throws Exception {
    try {
        if (DataUtils.switchOpen == true) {
            return "任务已启动,无需重复启动!";
        } else {
            DataUtils.switchOpen = true;
        DataContextParam dataContextParam = new DataContextParam();
        String bizType = request.getParameter("bizType");
    } catch (Exception e) {
        logger.error("[TaskController.startTask] error!", e);
        return "任务启动失败";
    return "任务启动成功";
 * 系统参数调整
 * http://localhost:8091/task/adjust?consumerCurrentQueueSizeLimit=17&maxThreadSize=12&coreThreadSize=12
@RequestMapping(value = "/task/adjust")
public Object paramAdjust(HttpServletRequest request, HttpServletResponse response) throws Exception {
    int coreThreadSize = Integer.valueOf(request.getParameter("coreThreadSize"));
    int maxThreadSize = Integer.valueOf(request.getParameter("maxThreadSize"));
    // 注意:核心线程数不能大于最大线程数,否则线程会不断创建、销毁,浪费系统资源
    if (coreThreadSize > maxThreadSize) {
        coreThreadSize = maxThreadSize;
    return "系统参数调整成功";
 * 任务处理进度
 * url:http://localhost:8091/task/process
@RequestMapping(value = "/task/process")
public ProcessResult processResult(HttpServletRequest request, HttpServletResponse response) throws Exception {
    ProcessResult processResult = new ProcessResult();
    // 业务信息
    // 系统信息
    return processResult;


Disadvantage: The message is stored in the cache, if there is network jitter or the producer hangs up. Then the last successfully consumed message cannot be saved. Can't restore progress either Secondly, when the task traffic is too large, or the delay is high. Back pressure can not be better resolved


The figure above shows the data transfer between two tasks:

Now that we're ready to refactor EventTargetPusher into a producer-consumer model. Then there is bound to be a batching process for production and consumption Tasks. You only need to set a certain threshold for the batched record collection, and after reaching the flush to the downstream event target, you can control the flushing timing of each type of task. In this way, the blocking problem caused by disk brushing based on record records can be avoided.

Let us take the scenario of obtaining the interactive query results of the select statement in the flink-sql-gateway project as an example flink-sql-gateway: https://github.com/ververica/flink-sql-gateway.git In the following code segment, the startRetrieval function connects to the master process query through SocketStreamIterator. And return the result asynchronously. Here startRetrieval corresponds to the production logic of the EventTargetPusher , and the ResultRetrievalThread thread corresponds to the consumption logic. We can set changeRecordBuffer in memory and set the size of maxBufferSize, and automatically push it to the downstream event target after reaching the water level

public class ChangelogResult<C> extends AbstractResult<C, Tuple2<Boolean, Row>> {
   private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
   private final CollectStreamTableSink collectTableSink;
   private final ResultRetrievalThread retrievalThread;
   private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
   private final Object resultLock;
   private AtomicReference<SqlExecutionException> executionException = new AtomicReference<>();
   private final List<Tuple2<Boolean, Row>> changeRecordBuffer;
   private final int maxBufferSize;
   public ChangelogResult(
         RowTypeInfo outputType,
         TableSchema tableSchema,
         ExecutionConfig config,
         InetAddress gatewayAddress,
         int gatewayPort,
         ClassLoader classLoader,
         int maxBufferSize) {
      resultLock = new Object();
      // create socket stream iterator
      retrievalThread = new ResultRetrievalThread();
      // prepare for changelog
      changeRecordBuffer = new ArrayList<>();
      this.maxBufferSize = maxBufferSize;
   public void startRetrieval(JobClient jobClient) {
      // start listener thread
      jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient)
         .thenCompose(client -> client.getJobExecutionResult(classLoader))
         .whenComplete((unused, throwable) -> {
            if (throwable != null) {
                  new SqlExecutionException("Error while submitting job.", throwable));
   public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
      synchronized (resultLock) {
         // retrieval thread is alive return a record if available
         // but the program must not have failed
         if (isRetrieving() && executionException.get() == null) {
            if (changeRecordBuffer.isEmpty()) {
               return TypedResult.empty();
            } else {
               final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
               return TypedResult.payload(change);
         // retrieval thread is dead but there is still a record to be delivered
         else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
            final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
            return TypedResult.payload(change);
         // no results can be returned anymore
         else {
            return handleMissingResult();
   public void close() {
      retrievalThread.isRunning = false;
   private boolean isRetrieving() {
      return retrievalThread.isRunning;
   private void processRecord(Tuple2<Boolean, Row> change) {
      synchronized (resultLock) {
         // wait if the buffer is full
         if (changeRecordBuffer.size() >= maxBufferSize) {
            try {
            } catch (InterruptedException e) {
               // ignore
         } else {
   private class ResultRetrievalThread extends Thread {
      public volatile boolean isRunning = true;
      public void run() {
         try {
            while (isRunning && iterator.hasNext()) {
               final Tuple2<Boolean, Row> change = iterator.next();
         } catch (RuntimeException e) {
            // ignore socket exceptions
         // no result anymore
         // either the job is done or an error occurred
         isRunning = false;

Since the above does not consider the downtime of the producer or consumer machine, perhaps we can persist the task processing progress of the first solution and the information of the current consumption topic when an exception is thrown. The current consumption topic information is as follows:

`consumer_group` varchar(128) NOT NULL DEFAULT '',
`message_id` varchar(255) NOT NULL DEFAULT '',
`topic_name` varchar(255) NOT NULL DEFAULT '',
`ctime` bigint(20) NOT NULL,
`queue_id` int(11) NOT NULL,
`offset` bigint(20) NOT NULL,
`broker_name` varchar(255) NOT NULL DEFAULT '',
`id` bigint(20) NOT NULL AUTO_INCREMENT,

When starting the task next time, start hui'fu from the saved last failed task information

2011shenlin commented 1 year ago

Specifically how to achieve back pressure, can you expand it in detail?

2011shenlin commented 1 year ago

The persistence of the site can depend on the upstream Event BUS

2011shenlin commented 1 year ago

How is isolation achieved here? Prevent some tasks from taking up too many resources, so that some tasks have not been processed?

kaori-seasons commented 1 year ago


first, In the code segment of the above producer-consumer mode, it shows how to set up the logic of blocking queues for data synchronization tasks of different business lines. In the description of the above background, it needs to be clear that the user can push tasks for different types. For example, eventRecord in CirculatorContext will have different transform types. Each type of transform corresponds to a different blocking queue. Push events to the downstream in eventbridge At this time, we can refactor this part of the logic to support multi-threaded consumption. Currently, the solution is still being dismantled, and this is not a final proposal.

I will refer to some event-driven products such as eventmesh, and compare and analyze how to introduce this feature is more appropriate

Jashinck commented 1 year ago

we are not only consider EventTargetPush module, but also EventBusListener, EventRuleTransfer to deal with back pressure problem. Try to stand on the consumer point, design the water line proposal for its producer

kaori-seasons commented 1 year ago

Event BUS

At present, I am still in the familiarization stage of eventbridge. Next week, I will carefully familiarize myself with the details of the project and discuss it again.

kaori-seasons commented 1 year ago

The formula for calculating the current production and consumption speed is as follows:

Production speed: quenes * PullBatchSize * 1000/pullInterval
PullBatchSize Single pull quantity, Rocketmq single queue pull quantity at the same time, you can modify the maximum value is generally 32, if you need to expand again, you need to adjust the service parameters
pullInterval is the interval of a single pull, the interval time for each pull message of Rocketmq, the default value is 0 no interval, in milliseconds, and 1000 is one second
Quene is the number of queues, which is related to the number of brokers. For example, if the broker is 2 masters, each master and two slaves are set, and each broker has 8 queues, then there are 16 queues.

Consumption speed: the amount of messages being consumed / the amount of unconsumed messages + the amount of messages being consumed + the amount of messages waiting to be pulled
The amount of unconsumed messages ConsumerLag = MaxOffset - consumerOffset
Amount of messages being consumed InflightMessageCount = PullOffset - ConsumerOffset
Amount of messages waiting to be pulled AvaliableMessageCount = MaxOffset- PullOffset

I will coordinate whether the upstream and downstream ConnectRecord records are refreshed according to the speed of the two

Roiocam commented 1 year ago

This is only the internal implementation within a program. If we are talking about back pressure between multiple applications, I believe there are two ways:

PUSH Mode: In this mode, the consumer explicitly returns an ACK message. The producer delivers messages in an "ack-pre-record" manner. To improve throughput, messages can be sent in batches, and the ACK returned would be the UUID at the end of the batch. When the producer receives this ACK, it considers the preceding messages as consumed.

POLL Mode: In this mode, the consumer actively pulls data from the producer. It's evident that the traffic will never exceed the processing limit of the consumer.