server <- function(input, output, session) {
out1_rows <- reactiveVal()
observeEvent(input$run1, {
prog <- Progress$new(session)
prog$set(message = "Analysis in progress",
detail = "This may take a while...",
value = NULL)
fut1 = future({
system(paste("Command1" , input$file ">", "out1.txt"))
system(paste("Command2" , out1.txt ">", "out2.txt"))
head_rows <- read.delim("out2.txt")
return(head_rows)
}) %...>%
out1_rows() %>%
finally( ~ prog$close())
NULL
})
observeEvent(req(out1_rows()), {
output$out_table <-
DT::renderDataTable(DT::datatable(
out1_rows(),
)
))
observeEvent(input$cancel, {
async_pid <- fut1$job$pid ##this is empty
#async_pid <- Sys.getpid() ##this return PID for main process and kills "/opt/shiny-server/R/SockJSAdapter.R" but not for subprocesses inside future()
system(paste("kill -15", async_pid))
})
}
Here, i would need to kill the process running the commands inside future(). I tried in the above way to fetch the PID running the future() process and kill when input$cancel
is triggered. However, fut1$job$pid
is not returning any PID value and hence the kill operation is not successful.
This link from future vignettes shows how to fetch PID for future() jobs. However, in my case i am not able to use Sys.getpid()
inside future() as i am not sure how to store the PID value as the process is already returning some output from my system commands.
This page future GIT shows an alternate way of External Kill with the syntax fut1$job$pid
. But this fails to fetch the PID.
I couldn't figure this out after trying different ways or blinded with the syntax. Could someone hint the way to do this.
Can you please provide us with a full reproducible example?
You might want to have a look at library(future.callr):
using plan(callr)
you can get the pid and kill the process like this:
library(future)
library(promises)
library(future.callr)
plan(callr)
myFuture <- future({
Sys.sleep(5)
return(runif(1))
})
myFuture$process$get_pid()
myFuture$process$is_alive()
# myFuture$process$kill()
# myFuture$process$is_alive()
then(myFuture, onFulfilled = function(value){
print(value)
}, onRejected = NULL)
Edit - adapted from your code:
library(shiny)
library(DT)
library(future)
library(promises)
library(future.callr)
library(shinyjs)
library(V8)
plan(callr)
ui <- fluidPage(
useShinyjs(),
titlePanel("Trigger & kill future"),
sidebarLayout(
sidebarPanel(
actionButton(inputId="run1", label="run future"),
actionButton(inputId="cancel", label="kill future")
),
mainPanel(
dataTableOutput('out_table')
)
)
)
server <- function(input, output, session) {
disable("cancel")
out1 <- reactiveValues(rows=NULL)
observeEvent(input$run1, {
disable("run1")
enable("cancel")
out1$rows <- NULL
prog <- Progress$new(session)
prog$set(message = "Analysis in progress",
detail = "This may take a while...",
value = NULL)
fut1 <<- future({
# system(paste("Command1" , input$file, ">", "out1.txt"))
# system(paste("Command2" , out1.txt, ">", "out2.txt"))
# head_rows <- read.delim("out2.txt")
head_rows <- data.frame(replicate(5, sample(runif(20, 0, 1), 20, rep=TRUE)))
Sys.sleep(5)
return(head_rows)
})
print(paste("Running async process with PID:", fut1$process$get_pid()))
then(fut1, onFulfilled = function(value){
out1$rows <<- value
}, onRejected = function(error){NULL})
finally(fut1, function(){
prog$close()
disable("cancel")
enable("run1")
})
return(NULL)
}, ignoreInit = TRUE)
observeEvent(req(out1$rows), {
output$out_table <- DT::renderDataTable(DT::datatable(out1$rows))
})
observeEvent(input$cancel, {
async_pid <- fut1$process$get_pid()
print(paste("Killing PID:", async_pid))
# system(paste("kill -9", async_pid)) # Linux - kill
# system(paste("taskkill /f /pid", async_pid)) # Windows - kill
fut1$process$kill() # library(future.callr) - kill
out1$rows <- NULL
disable("cancel")
enable("run1")
}, ignoreInit = TRUE)
}
shinyApp(ui = ui, server = server)
Errors occuring occasionally:
Unhandled promise error: callr failed, could not start R, exited with non-zero status, has crashed or was killed
Warning: Error in : callr failed, could not start R, exited with non-zero status, has crashed or was killed
95: <Anonymous>
Maybe this statement from @HenrikB tells us what we are running into:
However, to get this working properly you probably also need to make your future expression / future code interrupt aware using withCallingHandlers() etc. It'll also unknown to me what happens if you signal too many interrupts in a row - it might be that you manage to interrupt the main R-loop of the worker, which then will cause the R worker to terminate. That'll result in a missing R worker and you've got that problem you mention at the beginning.
The error is also mentioned here but currently in the future.callr-context I don't know how to work around it.
2nd Edit: By now I got some further feedback from Henrik Bengtsson. He once again mentions that the core future API currently isn't supporting the termination of futures. So, in the end no matter what backend we use, we might run into problems.
Disregarding this info, I'd have another look at the library(ipc)
vignette which provides two examples on the topic Killing a long running process. But I pointed to this here already - which probably led to this question.
In the end all of this might be useless regarding your scenario because you are using system()
calls which create subprocesses of their own (and have their own pid accordingly). Therefore why don't you use wait = FALSE
in your system command (as I mentioned in the comments here already) to get your async behaviour and catch their pid using something like myCommand & echo $!
(see this). This way you don't need any futures.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With