Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Start process with low priority Popen

I am looking for a way to start multiple process efficiently with low priority in Windows. I tried :

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process(p.pid).nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit

The problem is : the low priority is not set immediately. I get some freeze at the beginning. When I look closer in the process window I can see the priority of the application starting at high_priority and switch to low_priority.

I would like to start immediately at low priority or find another way to block the CPU usage (100% right now).

Then I use the run command inside a multiprocessing pool (few second for each run).

def safe_run(args):
    """Call run(), catch exceptions."""
    try: 
        run(args)
    except Exception as e:
        print(args[0])
        print(e)


def parallel(commands,nb_proc):
    # populate files
    # start processes
    if len(commands) < 10:
        nb_proc = 1
    print('Use of {} cpus\n'.format(nb_proc))
    pool = mp.Pool(nb_proc)
    pool.map(safe_run, commands, chunksize=1) 

UPDATE

Test.exe is a fortran code :

    integer function NumArguments()
        integer :: IARGC
        NumArguments = IARGC()
    end function

    subroutine GetArgument(Idx,Argument)
      integer, intent(in) :: Idx
      character(LEN=*), intent(out) :: Argument
      call GETARG(Idx,Argument)
   end subroutine

    program Console
    implicit none
    integer, parameter :: INTEG = SELECTED_INT_KIND(9)
    integer(INTEG), parameter :: MAX_STRING_LEN = 1024_INTEG
    character(LEN=MAX_STRING_LEN) :: FileName
    integer(INTEG) :: i


    call GetArgument(1,FileName)

    ! Body of Console
    !print *, 'Hello World'
    !print *, FileName

    call sleep(5)
    open(unit=1, file=FileName,status='new')
     Do i=1,1000,1
         write(1,*) i
     Enddo
     close(unit=1)

    end program Console

Full code :

# -*- coding: utf-8 -*-
"""

"""

###############################################################################
###############################################################################
#
#                     IMPORT & INIT                 
# 
###############################################################################
###############################################################################
import psutil
import subprocess
import time
import multiprocessing.dummy as mp
import os

TEST_EXE  = "Console.exe"
nb_proc      =   4



###############################################################################
###############################################################################
#
#                     FUNCTION                 
# 
###############################################################################
###############################################################################

def run(command):
    try :
        print(command[0])
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p.wait()
    except:
        print('Point {} fail'.format(point))
        raise SystemExit

def safe_run(args):
    """Call run(), catch exceptions."""
    try: 
        run(args)
    except Exception as e:
        print('{} error'.format(args[0]))


def parallel(commands,nb_proc):
    print('Use of {} cpus\n'.format(nb_proc))
    pool = mp.Pool(nb_proc) 
    pool.map(safe_run, commands, chunksize=1)


###############################################################################
###############################################################################
#
#                     MAIN SCRIPT                 
# 
###############################################################################
###############################################################################

current_dir = os.path.abspath('')
print('\nCurrent directory {}'.format(current_dir))  
t1 = time.time()

logfiles = list()        
commands = list()
logfiles_obj = list()
for step in range(100):
    logfile = open(os.path.join(current_dir,'logfile_'+ str(step) + '.out'), 'w')
    args = TEST_EXE + ' ' + os.path.join(current_dir,'output_'+str(step) + '.txt')
    temp = (args,logfile)
    commands.append(temp)

# run in parallel
print("Calculation running ...\n")
parallel(commands,nb_proc)


for log in logfiles_obj:
    log.close()

# time for running all the point and complete
t2 = time.time()
print ("\n ########## Overall time : %5.2f secondes ##########" %(t2 - t1))
print("\n ##########       Correct ending       ##########")
like image 846
Coolpix Avatar asked Oct 20 '17 12:10

Coolpix


People also ask

What is the difference between subprocess Popen and run?

The main difference is that subprocess. run() executes a command and waits for it to finish, while with subprocess. Popen you can continue doing your stuff while the process finishes and then just repeatedly call Popen. communicate() yourself to pass and receive data to your process.

What is Popen in subprocess?

The subprocess module defines one class, Popen and a few wrapper functions that use that class. The constructor for Popen takes arguments to set up the new process so the parent can communicate with it via pipes. It provides all of the functionality of the other modules and functions it replaces, and more.

How can I set priority for a process in Linux?

You can change the scheduling priority of a running process to a value lower or higher than the base scheduling priority by using the renice command from the command line. This command changes the nice value of a process.

What is Popen and pipe?

The popen() function executes the command specified by the string command. It creates a pipe between the calling program and the executed command, and returns a pointer to a stream that can be used to either read from or write to the pipe.


2 Answers

The normal way on a Posix system would be to use the preexec_fn parameter of subprocess.Popen to call a function before starting the command (detailed lower in this answer). Unfortunately this is intended to occur between the fork and exec system calls and Windows do not create processes that way.

On Windows, the underlying (WinAPI) system call used to create sub-processes is CreateProcess. The page on MSDN says:

BOOL WINAPI CreateProcess(
  ...
  _In_        DWORD                 dwCreationFlags,
  ...
);


dwCreationFlags [in]
The flags that control the priority class and the creation of the process... This parameter also controls the new process's priority class, which is used to determine the scheduling priorities of the process's threads.

Unfortunately, the Python interface has no provision for setting the child priority, because it is explicitely being said:

creationflags, if given, can be CREATE_NEW_CONSOLE or REATE_NEW_PROCESS_GROUP. (Windows only)

But the documentation for dwCreationFlags on the MSDN also says:

... If none of the priority class flags is specified, the priority class defaults to NORMAL_PRIORITY_CLASS unless the priority class of the creating process is IDLE_PRIORITY_CLASS or BELOW_NORMAL_PRIORITY_CLASS. In this case, the child process receives the default priority class of the calling process.

That means that the priority can simply be inherited, to the Windows way of controlling the child priority from Python is to set the priority before starting the subprocess, and reset it immediately after:

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])    # start child at low priority
        psutil.Process().nice(psutil.NORMAL_PRIORITY_CLASS)  # reset current priority
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit

Remaining part of this answer would be relevant on a Posix system like Linux or Unix.

The preexec_fn parameter of Popen is what you need. It allows to call a callable object (for example a function) between the creation of the child process and the execution of the command. You could do:

def set_low_pri():
    psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)

and then use it to start a child at a low priority:

def run(command):
    # command['Program.exe args1 args2','output_file']
    try :
        p = subprocess.Popen(command[0] , stdout = command[1], preexec_fn=set_low_pri)
        p.wait()
    except Exception as e:
        print(e)
        raise SystemExit

That way, Python ensures that the low priority is set before your command is executed.


Ref.: the documentation for the subprocess module states:

17.5.1.2. Popen Constructor
...

class subprocess.Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None,
      stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None,
      universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True,
      start_new_session=False, pass_fds=(), *, encoding=None, errors=None) 

...
If preexec_fn is set to a callable object, this object will be called in the child process just before the child is executed. (POSIX only)


But the above method is not thread safe! If two threads run concurrently, we could get into the following race condition:

  • thread A lowers priority
  • thread A starts it child (at low priority)
  • thread B lowers priority (a no-op)
  • thread A resets normal priority
  • thread B starts its child at normal priority
  • thread B resets normal priority (a no-op)

The problem is that multiprocessing.dummy is a wrapper around threading. The Standard Python Library documentation (3.6) says in 17.2.2.13. The multiprocessing.dummy module

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

Once the problem is identified, the fix is trivial: just use a Lock to protect the critical section:

lock = mp.Lock()

def run(command):
    try :
        print(command[0])
        lock.acquire()
        psutil.Process().nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) # lower priority
        p = subprocess.Popen(command[0] , stdout = command[1])
        psutil.Process().nice(psutil.NORMAL_PRIORITY_CLASS) # normal priority
        lock.release()
        p.wait()
    except:
        print('Point {} fail'.format(point))
        raise SystemExit
like image 125
Serge Ballesta Avatar answered Oct 22 '22 19:10

Serge Ballesta


surprised no-one has suggested it, but just because the subprocessing module doesn't expose the constants needed, doesn't mean we can't pass them to the module to set the priority:

import subprocess

ABOVE_NORMAL_PRIORITY_CLASS = 0x00008000
BELOW_NORMAL_PRIORITY_CLASS = 0x00004000
HIGH_PRIORITY_CLASS         = 0x00000080
IDLE_PRIORITY_CLASS         = 0x00000040
NORMAL_PRIORITY_CLASS       = 0x00000020
REALTIME_PRIORITY_CLASS     = 0x00000100

p = subprocess.Popen(["notepad.exe"], creationflags=BELOW_NORMAL_PRIORITY_CLASS)

p.wait()

this sets the creation flags correctly and starts the process with the set priority, to properly expose it the _winapi and subprocessing modules would both need to be patched (to make the constants part of the module rather than the sketch)

like image 45
James Kent Avatar answered Oct 22 '22 20:10

James Kent