Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to timeout a long running program using rxpython?

Say I have a long running python function that looks something like this?

import random
import time
from rx import Observable
def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    time.sleep(y)
    print('end')
    return x

I want to be able to set a timeout of 1000ms.

So I'm dong something like, creating an observable and mapping it through the above intense calculation.

a = Observable.repeat(1).map(lambda x: intns(x))

Now for each value emitted, if it takes more than 1000ms I want to end the observable, as soon as I reach 1000ms using on_error or on_completed

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

above statement does get timeout, and calls on_error, but it goes on to finish calculating the intense calculation and only then returns to the next statements. Is there a better way of doing this?

The last statement prints the following

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

The idea is that if a particular function timesout, i want to be able to continue with my program, and ignore the result.

I was wondering if the intns function was done on a separate thread, I guess the main thread continues execution after timeout, but I still want to stop computing intns function on a thread, or kill it somehow.

like image 853
syllogismos Avatar asked Jul 20 '17 22:07

syllogismos


People also ask

How do you stop a program from a certain time in python?

terminate() function will terminate foo function. p. join() is used to continue execution of main thread. If you run the above script, it will run for 10 seconds and terminate after that.

How do you do a timeout in Python?

Then it's as simple as this to timeout a test or any function you like: @timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError def test_base_regression(self): ... Be careful since this does not terminate the function after timeout is reached!


1 Answers

The following is a class that can be called using with timeout() :

If the block under the code runs for longer than the specified time, a TimeoutError is raised.

import signal

class timeout:
    # Default value is 1 second (1000ms)
    def __init__(self, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

# example usage
with timeout() :
    # infinite while loop so timeout is reached
    while True :
        pass

If I'm understanding your function, here's what your implementation would look like:

def intns(x):
    y = random.randint(5,10)
    print(y)
    print('begin')
    with timeout() :
        time.sleep(y)
    print('end')
    return x
like image 158
Sam Avatar answered Oct 07 '22 00:10

Sam