Skip to content

Instantly share code, notes, and snippets.

@zhanxw
Forked from jcheng5/create_forked_task.R
Created June 26, 2019 16:48
Show Gist options
  • Select an option

  • Save zhanxw/0b6920e7b5b335517fd00a86c8db896f to your computer and use it in GitHub Desktop.

Select an option

Save zhanxw/0b6920e7b5b335517fd00a86c8db896f to your computer and use it in GitHub Desktop.

Revisions

  1. @jcheng5 jcheng5 revised this gist Dec 8, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion create_forked_task.R
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    library(shiny)
    # Also uses parallel, shinyjs, tools

    # Create a long-running task, executed in a forked process.
    # Create a long-running task, executed in a forked process. (Doesn't work on Windows)
    #
    # The return value is a promise-like object with three
    # methods:
  2. @jcheng5 jcheng5 revised this gist Dec 8, 2016. 1 changed file with 6 additions and 2 deletions.
    8 changes: 6 additions & 2 deletions create_forked_task.R
    Original file line number Diff line number Diff line change
    @@ -22,20 +22,23 @@ create_forked_task <- function(expr) {

    result <- NULL

    # Launch the task in a forked process. This always returns
    # immediately, and we get back a handle we can use to monitor
    # or kill the job.
    task_handle <- parallel::mcparallel({
    force(expr)
    })

    # Poll every 100 milliseconds until the job completes
    o <- observe({
    res <- parallel::mccollect(task_handle, wait = FALSE)
    if (is.null(res)) {
    invalidateLater(100)
    } else {
    o$destroy()
    str(res)
    if (!is.list(res) || length(res) != 1 || !inherits(res[[1]], "try-error")) {
    state <<- "success"
    result <<- res
    result <<- res[[1]]
    } else {
    state <<- "error"
    result <<- attr(res[[1]], "condition", exact = TRUE)
    @@ -64,6 +67,7 @@ create_forked_task <- function(expr) {
    cancel = function() {
    if (state == "running") {
    state <<- "cancel"
    o$destroy()
    tools::pskill(task_handle$pid, tools::SIGTERM)
    tools::pskill(-task_handle$pid, tools::SIGTERM)
    parallel::mccollect(task_handle, wait = FALSE)
  3. @jcheng5 jcheng5 revised this gist Dec 8, 2016. 1 changed file with 19 additions and 9 deletions.
    28 changes: 19 additions & 9 deletions example.R
    Original file line number Diff line number Diff line change
    @@ -3,47 +3,57 @@ library(shiny)
    source("create_forked_task.R")

    ui <- fluidPage(
    shinyjs::useShinyjs(),
    shinyjs::useShinyjs(), # Initialize shinyjs library

    # Buttons to control job
    actionButton("start", "Start"),
    shinyjs::disabled(actionButton("stop", "Stop")),

    # This will display the job output
    tableOutput("out")
    )

    server <- function(input, output, session) {
    # Make "task" behave like a reactive value
    makeReactiveBinding("task")
    task <- NULL

    output$out <- renderTable({
    # The task starts out NULL but is required. The req() takes
    # care of ensuring that we only proceed if it's non-NULL.
    req(task)$result()
    })

    observeEvent(input$start, {
    shinyjs::enable("stop")
    shinyjs::disable("start")

    task <<- create_forked_task({
    # Pretend this takes a long time
    Sys.sleep(5)
    cars[sample(nrow(cars), 10),]
    })

    # Show progress message during task start
    prog <- Progress$new(session)
    prog$set(message = "Executing task, please wait...")
    oProg <- observe({

    o <- observe({
    # Only proceed when the task is completed (this could mean success,
    # failure, or cancellation)
    req(task$completed())

    oProg$destroy()

    # This observer only runs once
    o$destroy()

    # Close the progress indicator and update button state
    prog$close()
    shinyjs::disable("stop")
    shinyjs::enable("start")
    })
    })

    observeEvent(input$stop, {
    shinyjs::enable("start")
    shinyjs::disable("stop")

    task$cancel()
    })
    }
  4. @jcheng5 jcheng5 created this gist Dec 8, 2016.
    73 changes: 73 additions & 0 deletions create_forked_task.R
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    library(shiny)
    # Also uses parallel, shinyjs, tools

    # Create a long-running task, executed in a forked process.
    #
    # The return value is a promise-like object with three
    # methods:
    # - completed(): FALSE initially, then TRUE if the task succeeds,
    # fails, or is cancelled. Reactive, so when the state changes
    # any reactive readers will invalidate.
    # - result(): Use this to get the return value. While execution is
    # in progress, performs a req(FALSE). If task succeeded, returns
    # the return value. If failed, throws error. Reactive, so when
    # the state changes any reactive readers will invalidate.
    # - cancel(): Call this to prematurely terminate the task.
    create_forked_task <- function(expr) {
    makeReactiveBinding("state")
    state <- factor("running",
    levels = c("running", "success", "error", "cancel"),
    ordered = TRUE
    )

    result <- NULL

    task_handle <- parallel::mcparallel({
    force(expr)
    })

    o <- observe({
    res <- parallel::mccollect(task_handle, wait = FALSE)
    if (is.null(res)) {
    invalidateLater(100)
    } else {
    o$destroy()
    str(res)
    if (!is.list(res) || length(res) != 1 || !inherits(res[[1]], "try-error")) {
    state <<- "success"
    result <<- res
    } else {
    state <<- "error"
    result <<- attr(res[[1]], "condition", exact = TRUE)
    }
    }
    })

    list(
    completed = function() {
    state != "running"
    },
    result = function() {
    if (state == "running") {
    # If running, abort the current context silently.
    # We've taken a reactive dependency on "state" so if
    # the state changes the context will invalidate.
    req(FALSE)
    } else if (state == "success") {
    return(result)
    } else if (state == "error") {
    stop(result)
    } else if (state == "cancel") {
    validate(need(FALSE, "The operation was cancelled"))
    }
    },
    cancel = function() {
    if (state == "running") {
    state <<- "cancel"
    tools::pskill(task_handle$pid, tools::SIGTERM)
    tools::pskill(-task_handle$pid, tools::SIGTERM)
    parallel::mccollect(task_handle, wait = FALSE)
    }
    }
    )
    }
    51 changes: 51 additions & 0 deletions example.R
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,51 @@
    library(shiny)

    source("create_forked_task.R")

    ui <- fluidPage(
    shinyjs::useShinyjs(),
    actionButton("start", "Start"),
    shinyjs::disabled(actionButton("stop", "Stop")),
    tableOutput("out")
    )

    server <- function(input, output, session) {
    makeReactiveBinding("task")
    task <- NULL

    output$out <- renderTable({
    req(task)$result()
    })

    observeEvent(input$start, {
    shinyjs::enable("stop")
    shinyjs::disable("start")

    task <<- create_forked_task({
    # Pretend this takes a long time
    Sys.sleep(5)
    cars[sample(nrow(cars), 10),]
    })

    prog <- Progress$new(session)
    prog$set(message = "Executing task, please wait...")
    oProg <- observe({
    req(task$completed())

    oProg$destroy()

    prog$close()
    shinyjs::disable("stop")
    shinyjs::enable("start")
    })
    })

    observeEvent(input$stop, {
    shinyjs::enable("start")
    shinyjs::disable("stop")

    task$cancel()
    })
    }

    shinyApp(ui, server)