zephyrproject-rtos / zephyr

Primary Git Repository for the Zephyr Project. Zephyr is a new generation, scalable, optimized, secure RTOS for multiple hardware architectures.
https://docs.zephyrproject.org
Apache License 2.0
9.99k stars 6.15k forks source link

kernel: k_msgq_peek Add timeout functionalty #73769

Open lopeznegrete opened 3 weeks ago

lopeznegrete commented 3 weeks ago

Is your enhancement proposal related to a problem? Please describe. A consumer thread requires to wait forever for a message to be queued. Once queued, it would be sent to a third process. This third process might fail to process the message, then the message should be returned to the queue for a retrial. At the moment, that can be done only with k_msgq_get and K_FOREVER. The consumer gets the message and, if the third process is unsuccessful it can queue the message again. The problem is that if a producer thread has added another message in the meantime, the message will be returned at the end of the queue, missing the message order.

Describe the solution you'd like Add a timeout functionality in k_msgq_peek. The consumer thread can wait forever to peek at the message. If the message is successfully processed by the third process, then the consumer thread will do a k_msgq_get to clean the message. If not, the message would still be first on the queue for a wait and retrial

github-actions[bot] commented 3 weeks ago

Hi @lopeznegrete! We appreciate you submitting your first issue for our open-source project. 🌟

Even though I'm a bot, I can assure you that the whole community is genuinely grateful for your time and effort. 🤖💙

bjarki-andreasen commented 3 weeks ago

From you description, it sounds like you are describing a synchronous pipeline. but you have one node in your pipeline which tries to add work to its own pipe, which is typically not ideal.

The work passed from producers go to your consumer, and the work has to be processed in order (fifo style)

producer 0   -+->  k_msgq -> consumer
producer 1   -+

Your consumer uses a third process to actually process the work, and the consumer needs to retry processing the work some times before giving up and moving on to the next message. A simple solution to the issue is to have a while loop within your consumer

        while (1) {
                k_msgq_get()
                for (size_t i = 0; i < retries; i++) {
                        if (process_foo()) {
                                /* success */
                                break;
                        }
                }
        }

The work is handled in order, retried in order, and dropped if it failed.

This design is also scalable, as you can send the work further down the pipeline in order, lets add another consumer which uses your consumer as producer:

producer 0   -+->  k_msgq -> consumer -> consumer 2
producer 1   -+

and update first consumer to submit the work, if it succeeds:

        while (1) {
                k_msgq_get(&consumer_msgq)
                for (size_t i = 0; i < retries; i++) {
                        if (process_foo()) {
                                /* success */
                                k_msgq_put(&consumer_2_msgq)
                                break;
                        }
                }
        }

in summary, each node (consumer, consumer 2 etc.) in a pipeline is responsible for processing work internally, passed to it through its pipe (message queue, lifo, fifo), and submitting it further down the pipeline to the next node's pipe. A node should not be aware of any other node in the pipeline, and may only submit work to an "out-pipe", not its own "in-pipe".