Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Subprocess memory usage in python

How can one measure/benchmark maximum memory usage of a subprocess executed within python?

like image 389
Realz Slaw Avatar asked Jan 14 '23 21:01

Realz Slaw


2 Answers

I made a little utility class that demonstrates how to do this with the psutil library:

import psutil
import subprocess

class ProcessTimer:
  def __init__(self,command):
    self.command = command
    self.execution_state = False

  def execute(self):
    self.max_vms_memory = 0
    self.max_rss_memory = 0

    self.t1 = None
    self.t0 = time.time()
    self.p = subprocess.Popen(self.command,shell=False)
    self.execution_state = True

  def poll(self):
    if not self.check_execution_state():
      return False

    self.t1 = time.time()

    try:
      pp = psutil.Process(self.p.pid)

      #obtain a list of the subprocess and all its descendants
      descendants = list(pp.get_children(recursive=True))
      descendants = descendants + [pp]

      rss_memory = 0
      vms_memory = 0

      #calculate and sum up the memory of the subprocess and all its descendants 
      for descendant in descendants:
        try:
          mem_info = descendant.get_memory_info()

          rss_memory += mem_info[0]
          vms_memory += mem_info[1]
        except psutil.error.NoSuchProcess:
          #sometimes a subprocess descendant will have terminated between the time
          # we obtain a list of descendants, and the time we actually poll this
          # descendant's memory usage.
          pass
      self.max_vms_memory = max(self.max_vms_memory,vms_memory)
      self.max_rss_memory = max(self.max_rss_memory,rss_memory)

    except psutil.error.NoSuchProcess:
      return self.check_execution_state()


    return self.check_execution_state()


  def is_running(self):
    return psutil.pid_exists(self.p.pid) and self.p.poll() == None
  def check_execution_state(self):
    if not self.execution_state:
      return False
    if self.is_running():
      return True
    self.executation_state = False
    self.t1 = time.time()
    return False

  def close(self,kill=False):

    try:
      pp = psutil.Process(self.p.pid)
      if kill:
        pp.kill()
      else:
        pp.terminate()
    except psutil.error.NoSuchProcess:
      pass

You then use it like so:

import time

#I am executing "make target" here
ptimer = ProcessTimer(['make','target'])

try:
  ptimer.execute()
  #poll as often as possible; otherwise the subprocess might 
  # "sneak" in some extra memory usage while you aren't looking
  while ptimer.poll():

    time.sleep(.5)
finally:
  #make sure that we don't leave the process dangling?
  ptimer.close()

print 'return code:',ptimer.p.returncode
print 'time:',ptimer.t1 - ptimer.t0
print 'max_vms_memory:',ptimer.max_vms_memory
print 'max_rss_memory:',ptimer.max_rss_memory
like image 124
Realz Slaw Avatar answered Jan 21 '23 14:01

Realz Slaw


I found a solution, you must install win32api before.

After the end of process you will have the peak memory. Below an example :

"""Functions for getting memory usage of Windows processes."""

__all__ = ['get_current_process', 'get_memory_info', 'get_memory_usage']

import os
import sys
import ctypes
import json
from ctypes import wintypes
import subprocess
import win32api
from subprocess import Popen, PIPE
from win32process import GetProcessMemoryInfo
from win32con import PROCESS_QUERY_INFORMATION, PROCESS_VM_READ

GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
GetCurrentProcess.argtypes = []
GetCurrentProcess.restype = wintypes.HANDLE

SIZE_T = ctypes.c_size_t

class PROCESS_MEMORY_COUNTERS_EX(ctypes.Structure):
    _fields_ = [
        ('cb', wintypes.DWORD),
        ('PageFaultCount', wintypes.DWORD),
        ('PeakWorkingSetSize', SIZE_T),
        ('WorkingSetSize', SIZE_T),
        ('QuotaPeakPagedPoolUsage', SIZE_T),
        ('QuotaPagedPoolUsage', SIZE_T),
        ('QuotaPeakNonPagedPoolUsage', SIZE_T),
        ('QuotaNonPagedPoolUsage', SIZE_T),
        ('PagefileUsage', SIZE_T),
        ('PeakPagefileUsage', SIZE_T),
        ('PrivateUsage', SIZE_T),
    ]

GetProcessMemoryInfo = ctypes.windll.psapi.GetProcessMemoryInfo
GetProcessMemoryInfo.argtypes = [
    wintypes.HANDLE,
    ctypes.POINTER(PROCESS_MEMORY_COUNTERS_EX),
    wintypes.DWORD,
]
GetProcessMemoryInfo.restype = wintypes.BOOL

def get_memory_info():
    """Return Win32 process memory counters structure as a dict."""
    counters = PROCESS_MEMORY_COUNTERS_EX()
    ret = GetProcessMemoryInfo(process, ctypes.byref(counters),ctypes.sizeof(counters))
      
    if not ret:
        raise ctypes.WinError()
    info = dict((name, getattr(counters, name))
                for name, _ in counters._fields_)
    return info

if __name__ == '__main__':
    import pprint
   
    cmd = "‪C://Program Files//Mozilla Firefox//firefox.exe"
    proc = subprocess.Popen(cmd)
    pid = proc.pid # <--- access `pid` attribute to get the pid of the child process.    
    PROCESS_TERMINATE = 1
    handle = win32api.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)

    result = get_memory_info(handle.handle)
    result_object = json.dumps(result, indent = 4) # convert dict to json
    print result_object
like image 42
Nathan Avatar answered Jan 21 '23 14:01

Nathan