flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
167 stars 50 forks source link

libsubprocess: support stdin flow control #6291

Closed chu11 closed 2 weeks ago

chu11 commented 1 month ago

From #4572

Rather than adding write responses, some kind of credit based flow control scheme seems appropriate here. For example (just brainstorming):

  • change the sender so that it only sends up to credits bytes per channel before pausing (stopping for example the stdin fd watcher in > - flux-exec. This probably requires a callback to expose credits to the user? Like on_credit (int bytes)?
  • add a new exec response type for issuing credits for a given stream
  • each stream's receive buffer sends a credit type response initially for the full buffer size
  • then each time some data is taken out of the buffer, send some credits
  • enhancement: only send credit when a low water mark is reached
chu11 commented 1 month ago

hmmmm,

hiccup 1. We can't guarantee how fast each subprocess launched by flux-exec reads data out of its buffer. Some may read from their stdin buffer faster than others. Simple solution is determine the minimum amount that any of the subprocesses can take and only send that much to each one. But now we're bottle necked by the slowest subprocess, and it could be a hang if one of them is stuck.

        p = zlist_first (subprocesses);                                                                                                     
        while (p) {                                                                                                                         
            if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT                                                                           
                || flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {                                                                  
                if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)                                                                      
                    log_err_exit ("flux_subprocess_write");                                                                                 
            }                                                                                                                               
            p = zlist_next (subprocesses);                                                                                                  
        }                      

sounds acceptable? As I ponder this, it seems that the pros simply outweigh the cons in this scenario.

garlick commented 1 month ago

That seems expected to me since the consumer is in charge of the rate of date transfer.

garlick commented 1 month ago

See flux-framework/rfc#427 for a proposed protocol.