wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
123 stars 4 forks source link

Integration with ExtendedTask #157

Closed wlandau closed 6 months ago

wlandau commented 6 months ago

@jcheng5, it was great to talk with you today. I went ahead and made quick improvements to crew based on the end of the call:

The app is much more complicated than I think it needs to be, and ExtendedTask may help a lot. Thanks for the pointer to https://shiny.posit.co/py/docs/nonblocking.html#true-non-blocking-behavior-with-extendedtask. I will have a look.

wlandau commented 6 months ago

Wow, extended tasks work so well and are super easy to use! It only took a quick minute on a Friday afternoon to dramatically simplify the example app. See below for a rendition that uses an extended task. I will migrate this to crew's documentation when the next production version of Shiny is released.

library(crew)
library(shiny)
library(ggplot2)
library(aRtsy)

run_task <- function() {
  Sys.sleep(5)
  canvas_phyllotaxis(
    colors = colorPalette(name = "random", n = 3),
    iterations = 1000,
    angle = runif(n = 1, min = - 2 * pi, max = 2 * pi),
    size = 1,
    p = 1
  )
}

status_message <- function(n) {
  if (n > 0) {
    paste(format(Sys.time()), "tasks in progress:", n)
  } else {
    "All tasks completed."
  }
}

ui <- fluidPage(
  actionButton("task", "Submit a task (5 seconds)"),
  textOutput("status"),
  plotOutput("result")
)

server <- function(input, output, session) {
  # reactive values and outputs
  reactive_status <- reactiveVal("No task submitted yet.")
  output$status <- renderText(reactive_status())
  output$result <- renderPlot(task$result()$result[[1L]], width = 500)

  # crew controller
  controller <- crew_controller_local(workers = 4, seconds_idle = 10)
  controller$start()
  onStop(function() controller$terminate())

  # extended task to get completed results from the controller
  task <- ExtendedTask$new(function() controller$promise(mode = "one"))

  # button to submit a crew task
  observeEvent(input$task, {
    controller$push(
      command = run_task(),
      data = list(run_task = run_task),
      packages = "aRtsy"
    )
    task$invoke()
    reactive_status(status_message(n = length(controller$tasks)))
  })

  # Refresh the status every second and when a task starts or completes.
  observe({
    invalidateLater(millis = 1000)
    reactive_status(status_message(n = length(controller$tasks)))
  })
}

shinyApp(ui = ui, server = server)
wlandau commented 6 months ago

On second thought, updating the docs now would help prepare for conferences. I will update the example app and publish a new release of crew today.

wlandau commented 3 months ago

I have actually noticed that reactive expressions with extended tasks intermittently stop invalidating under heavy load: https://github.com/shikokuchuo/mirai/discussions/118. From crew's perspective, I think it makes sense to emphasize the polling version of the coin flip app and label the promise-based functionality as experimental.