Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom Scheduler to have sequential + semi-sequential scripts with timeouts/kill switches?

Below is a big section of my code and basically if you scroll down to the execute_subscripts() function you can see I've got two scripts running via execfile which work beautifully, they show prints, they save traceback errors to an error file.

I'm trying to turn the second script into one that doesn't wait for itself to finish before moving onto the next script.

As you can see I have attempted to use subprocess with Popen to launch a silent, hidden window... however it doesn't seem to run and I have no idea how to use the p.communicate() function correctly to retrieve tracebacks and/or prints.

I also... need help creating some sort of timeout/kill switch so if a subscript either via Popen or the execfile route doesn't complete within 5 minutes for it to skip over it for that loop or retry and skip over if it immediately fails again.

I understand that I probably shouldn't be using strftime for the times.... however that part works fine for me so I don't see the need to change it.

from datetime import date, timedelta
from sched import scheduler
from time import time, sleep, strftime
import random
import traceback
import subprocess

s = scheduler(time, sleep)
random.seed()

def periodically(runtime, intsmall, intlarge, function):

     ## Get current time
    currenttime = strftime('%H:%M:%S')

    ## If currenttime is anywhere between 23:40 and 23:50 then...
    if currenttime > '23:40:00' and currenttime < '23:50:00':

        ## Open the error logging file as the variable "errors"
        errors = open('MISC/ERROR(S).txt', 'a')

        ## Try to...
        try:
            ## Call the clear subscript.
            execfile("SUBSCRIPTS/CLEAR.py", {})
        ## On exception (fail)...
        except Exception:
            ## Write the entire traceback error to file...
            errors.write(traceback.format_exc() + '\n')
            errors.write("\n\n")

        ## Close and exit the error logging file. 
        errors.close()

        ## Update time
        currenttime = strftime('%H:%M:%S')

    ## Idle time
    while currenttime >= '23:40:00' and currenttime <= '23:59:59' or currenttime >= '00:00:00' and currenttime <= '11:30:00':

        ## Update time
        currenttime = strftime('%H:%M:%S')
        print currenttime, "Idling..."
        sleep(10)

        ## Update time
        currenttime = strftime('%H:%M:%S')

    ## Initiate the scheduler.
    runtime += random.randrange(intsmall, intlarge)
    s.enter(runtime, 1, function, ())
    s.run()

def execute_subscripts():

    st = time()
    print "Running..."
    errors = open('MISC/ERROR(S).txt', 'a')

    try: 
        execfile("SUBSCRIPTS/TESTSCRIPT.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    try: 
        execfile("SUBSCRIPTS/TEST.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")
##    subprocess.Popen(["pythonw", "SUBSCRIPTS/TEST.py", "0"], shell=True)

    try: 
        execfile("SUBSCRIPTS/TESTSCRIPTTest.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    try: 
        execfile("SUBSCRIPTS/TESTTESTTEST.py", {})
    except Exception:
        errors.write(traceback.format_exc() + '\n')
        errors.write("\n\n")

    errors.close()
    print """The whole routine took %.3f seconds""" % (time() - st)

while True:
    periodically(50, -25, +90, execute_subscripts)

Any ideas would be much appreciated

Added a bounty, hopefully someone knows how to achieve this.

Thanks in advance
Hyflex

Example of what I want the script to be able to do...

  1. Subscript 1 - Run in background, send prints and errorsfrom subscript1.py to main.py, don't wait for it to finish, go to subscript 2, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  2. Subscript 2 - Run in background, send prints and errors from subscript2.py to main.py, wait for it to finish before going onto subscript 3, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  3. Subscript 3 - Run in background, send prints and errors from subscript3.py to main.py, wait for it to finish before going onto subscript 4, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  4. Subscript 4 - Run in background, send prints and errors from subscript4.py to main.py, don't wait for it to finish, go to subscript 5, timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

  5. Subscript 5 - Run in background, send prints and errors from subscript5.py to main.py, wait for it to finish before going onto next subscript (or in this case, end of loop), timeout after 10 seconds (or as close to 10 seconds as we can, or timeout after all subscripts have been called.)

Prints and Traceback for shx2

[pid=9940] main running command: C:\Python27\python.exe SUB/subscript1.py (is_bg=False)
[pid=9940] main running command: C:\Python27\python.exe SUB/subscript1.py (is_bg=True)

Traceback (most recent call last):
  File "C:\Test\main.py", line 21, in <module>
    bg_proc1 = run_subscript(cmd, is_bg = True)
  File "C:\Test\main.py", line 10, in run_subscript
    return (cmd > sys.stdout) & BG  # run in background
  File "C:\Python27\lib\site-packages\plumbum\commands\modifiers.py", line 81, in __rand__
    return Future(cmd.popen(), self.retcode)
  File "C:\Python27\lib\site-packages\plumbum\commands\base.py", line 317, in popen
    return self.cmd.popen(args, **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\commands\base.py", line 233, in popen
    return self.cmd.popen(self.args + list(args), **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\machines\local.py", line 104, in popen
    **kwargs)
  File "C:\Python27\lib\site-packages\plumbum\machines\local.py", line 253, in _popen
    stderr = stderr, cwd = str(cwd), env = env, **kwargs)  # bufsize = 4096
  File "C:\Python27\lib\subprocess.py", line 703, in __init__
    errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  File "C:\Python27\lib\subprocess.py", line 851, in _get_handles
    c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
UnsupportedOperation: fileno

EDIT: http://i.imgur.com/rmXtrOq.png

             | --> # Sub 1.py # --> Sequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.01 (1 Second) --> Timeout at 11:30:10 (10 Seconds) --> # Sub 2.py # --> Sequential with timeout --> Started: 11:30.02 (or after time Sub 1.py's timeout) --> Estimated Completion: 11:30.03 (1 Second) --> Timeout at 11:30:13 (10 Seconds) --> # Sub 3.py # --> Sequential with timeout --> Started: 11:30.04 (or after time Sub 2.py's timeout) --> Estimated Completion: 11:30.08 (3 Seconds) --> Timeout at 11:30:18 (10 Seconds)
             |                                                                                                                                                  ^                                                                                                                                                                             ^
             |                                                                                                                                                  |                                                                                                                                                                             |
             | --------------------------------------------------------------------------------------------------------------------------------------------------                                                                                                                                                                             |
             | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
             |
Scheduler -->|
             | --> Sub 4.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.05 (5 Seconds) --> Timeout at 11:30:10 (15 Seconds)
             |
             | --> Sub 5.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.02 (2 Seconds) --> Timeout at 11:30:10 (10 Seconds)
             |
             | --> Sub 6.py --> Nonsequential with timeout --> Started: 11:30.00 --> Estimated Completion: 11:30.10 (10 Seconds) --> Timeout at 11:30:10 (25 Seconds)

Hopefully this helps with a visual representation of what I'm trying to acheive

like image 933
Ryflex Avatar asked Oct 29 '13 04:10

Ryflex


2 Answers

As you already indicated in your question, you are actually asking two different questions (running in background, and enforcing a timeout). Fortunately, the short answer for both is one and the same:

Use Plumbum!

Plumbum greatly simplifies shell-scripting-like elements of your python script, and among other things, provides clean interfaces for running commands in the background, and for enforcing timeouts.

Below is an example using plumbum for this.

In this example, the subprocesses will all run the same script -- subscript1.py. It does some printing, some sleeping, and it sometimes fails, randomly.

subscript1.py

import os, sys, time, random
print '[pid=%s] STARTING %s' % (os.getpid(), sys.argv[0])
for i in range(3):
    t = random.randint(1,5)
    print '[pid=%s] sleeping for %s seconds' % (os.getpid(), t)
    time.sleep(t)
# fail randomly
if t == 5:
    raise RuntimeError('random error...')
print '[pid=%s] DONE %s' % (os.getpid(), sys.argv[0])


Now, the main script below, main.py, demonstrates how to run subprocesses, in the foreground and background, with and without a timeout, wait for background processes to finish, and handle subprocess errors and timeouts.

main.py

import os, sys, time
from plumbum import FG, BG, ProcessExecutionError, ProcessTimedOut
from plumbum.cmd import python

cmd = python['subscript1.py']  # create the command to run (several times)

def run_subscript(cmd, is_bg = False):
    print '[pid=%s] main running command: %s (is_bg=%s)' % (os.getpid(), cmd, is_bg)
    if is_bg:
        return (cmd > sys.stdout) & BG  # run in background
    else:
        try:
            return cmd & FG  # run in foreground
        except ProcessExecutionError, e:
            print >>sys.stderr, e

# run a process in the foreground        
run_subscript(cmd, is_bg = False)

# run two processes in the background, and one in the foreground
bg_proc1 = run_subscript(cmd, is_bg = True)
time.sleep(1)
bg_proc2 = run_subscript(cmd, is_bg = True)
time.sleep(1)
run_subscript(cmd, is_bg = False)

# wait for the background processes to finish
for bg_proc in ( bg_proc1, bg_proc2 ):
    try:
        bg_proc.wait()
    except ProcessExecutionError, e:
        print >>sys.stderr, e

# run a foreground process, which will time out
print '[pid=%s] main running command: %s (will time out)' % (os.getpid(), cmd)
try:
    cmd.run(timeout = 2)
except ProcessTimedOut, e:
    # command timed out
    print >>sys.stderr, e
except ProcessExecutionError, e:
    # command failed (but did not time out)
    print >>sys.stderr, e

Output:

% python main.py
[pid=77311] main running command: /usr/local/bin/python subscript1.py (is_bg=False)
[pid=77314] STARTING subscript1.py
[pid=77314] sleeping for 1 seconds
[pid=77314] sleeping for 5 seconds
[pid=77314] sleeping for 3 seconds
[pid=77314] DONE subscript1.py
[pid=77311] main running command: /usr/local/bin/python subscript1.py (is_bg=True)
[pid=77316] STARTING subscript1.py
[pid=77316] sleeping for 5 seconds
[pid=77311] main running command: /usr/local/bin/python subscript1.py (is_bg=True)
[pid=77317] STARTING subscript1.py
[pid=77317] sleeping for 1 seconds
[pid=77311] main running command: /usr/local/bin/python subscript1.py (is_bg=False)
[pid=77317] sleeping for 5 seconds
[pid=77318] STARTING subscript1.py
[pid=77318] sleeping for 5 seconds
[pid=77316] sleeping for 2 seconds
[pid=77316] sleeping for 4 seconds
[pid=77317] sleeping for 5 seconds
[pid=77318] sleeping for 2 seconds
[pid=77318] sleeping for 3 seconds
[pid=77316] DONE subscript1.py
[pid=77318] DONE subscript1.py
Command line: ['/usr/local/bin/python', 'subscript1.py']
Exit code: 1
Stderr:  | Traceback (most recent call last):
         |   File "subscript1.py", line 13, in <module>
         |     raise RuntimeError('random error...')
         | RuntimeError: random error...
[pid=77311] main running command: /usr/local/bin/python subscript1.py (will time out)
('Process did not terminate within 2 seconds', ['/usr/local/bin/python', 'subscript1.py'])

EDIT:

I now realize my sample code does not demonstrate running a command in the background and enforcing a timeout on it. For that, simply use cmd.bgrun(...) instead of cmd.run(...).

The error you are getting is about the redirection, and must be related to the fact you are running on Windows. This is either a compatability problem of plumbum on Windows, or my code might not be perfect, i.e. there may be another way to use plumbum to make it work. Unfortunately, I don't have a windows machine to test it on...

I hope this helps.

like image 135
shx2 Avatar answered Oct 31 '22 17:10

shx2


If I understood what you are trying to do, subprocess.Popen() is the way to go. Here's a simple class which I think can provide all functionality you want:

from time import sleep
import subprocess
import datetime
import os

class Worker:

    def __init__(self, cmd):

        print datetime.datetime.now(), ":: starting subprocess :: %s"%cmd
        self.cmd = cmd
        self.log = "[running :: %s]\n"%cmd
        self.subp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.start_time = datetime.datetime.now()

    def wait_to_finish(self, timeout_seconds = None):

        while True:
            retcode = self.subp.poll()
            if retcode is not None:
                self.get_process_output()
                self.log += "\n[subprocess finished, return code: %d]\n"%retcode
                print datetime.datetime.now(), ":: subprocess %s exited, retcode=%d"%(self.cmd, retcode)
                return
            else:
                # process hasn't finished yet
                sleep(1)
                if timeout_seconds is not None:
                    cur_time = datetime.datetime.now()
                    if (cur_time - self.start_time).seconds > timeout_seconds:
                        print datetime.datetime.now(), ":: subprocess %s :: killing after %d seconds"%(self.cmd, timeout_seconds)
                        self.kill()
                        return

    def still_running(self):
        return (self.subp.poll() is None)

    def kill(self):
        self.subp.terminate()
        self.get_process_output()
        self.log += "\n[subprocess killed by explicit request]\n"
        return

    def get_process_output(self):
        out, err = self.subp.communicate()
        self.log += out
        self.log += err

You give the command and the class starts it in the background. You can then wait while it finishes, with optional timeout (counted from the time process has been started). You can get process output, and if needed explicitly kill the process.

Here's just a quick example showing it's functionality:

# Start two subprocesses in the background
worker1 = Worker([r'c:\python26\python.exe', 'sub1.py'])
worker2 = Worker([r'c:\python26\python.exe', 'sub2.py'])

# Wait for both to finish, kill after 10 seconds timeout
worker1.wait_to_finish(timeout_seconds = 10)
worker2.wait_to_finish(timeout_seconds = 10)

# Start another subprocess giving it 5 seconds to finish
worker3 = Worker([r'c:\python26\python.exe', 'sub3.py'])
worker3.wait_to_finish(timeout_seconds = 5)

print "----LOG1----\n" + worker1.log
print "----LOG2----\n" + worker2.log
print "----LOG3----\n" + worker3.log

sub1.py:

from time import sleep
print "sub1 output: start"
sleep(5)
print "sub1 output: finish"

sub2.py:

print "sub2 output: start"
erroneous_command()

sub3.py:

from time import sleep
import sys
print "sub3 output: start, sleeping 15 sec"
sys.stdout.flush()
sleep(15)
print "sub3 output: finish"

Here's the output:

2013-11-06 15:31:17.296000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub1.py']
2013-11-06 15:31:17.300000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub2.py']
2013-11-06 15:31:23.306000 :: subprocess ['c:\\python26\\python.exe', 'sub1.py'] exited, retcode=0
2013-11-06 15:31:23.309000 :: subprocess ['c:\\python26\\python.exe', 'sub2.py'] exited, retcode=1
2013-11-06 15:31:23.310000 :: starting subprocess :: ['c:\\python26\\python.exe', 'sub3.py']
2013-11-06 15:31:29.314000 :: subprocess ['c:\\python26\\python.exe', 'sub3.py'] :: killing after 5 seconds
----LOG1----
[running :: ['c:\\python26\\python.exe', 'sub1.py']]
sub1 output: start
sub1 output: finish

[subprocess finished, return code: 0]

----LOG2----
[running :: ['c:\\python26\\python.exe', 'sub2.py']]
sub2 output: start
Traceback (most recent call last):
  File "sub2.py", line 2, in <module>
    erroneous_command()
NameError: name 'erroneous_command' is not defined

[subprocess finished, return code: 1]

----LOG3----
[running :: ['c:\\python26\\python.exe', 'sub3.py']]
sub3 output: start, sleeping 15 sec

[subprocess killed by explicit request]

As far as implementing the scheduling goes, I can suggest couple of options but the choice really depends on what your task is:

1) If you can specify the precise scheduling at any point in time, then you can implement a fully synchronous scheduler:

while True:
    # check time
    # check currently running processes :: workerX.still_running()
    #   -> if some are past their timeout, kill them workerX.kill()
    # start new subprocesses according to your scheduling logic
    sleep(1)

2) If you have several well-defined sequences of scripts which you want just "fire-and-forget" every 10 seconds, then put each sequence in its own .py script (with 'import Worker'), and start all sequences every 10 seconds, also periodically checking which sequences have exited to collect their logs.

3) If your sequences are defined dynamically and you prefer "fire-and-forget" approach, then threads would be best approach.

like image 21
glexey Avatar answered Oct 31 '22 17:10

glexey