Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

combining python watchdog with multiprocessing or threading

I'm using Python's Watchdog to monitor a given directory for new files being created. When a file is created, some code runs that spawns a subprocess shell command to run different code to process this file. This should run for every new file that is created. I've tested this out when one file is created, and things work great, but am having trouble getting it working when multiple files are created, either at the same time, or one after another.

My current problem is this... the processing code run in the shell takes a while to run and will not finish before a new file is created in the directory. There's nothing I can do about that. While this code is running, watchdog will not recognize that a new file has been created, and will not proceed with the code.

So I think I need to spawn a new process for each new file, or do something get things to run concurrently, and not wait until one file is done before processing the next one.

So my questions are:

1.) In reality I will have 4 files, in different series, created at the same time, in one directory. What's the best way get watchdog to run the code on file creation for all 4 files at once?

2.) When the code is running for one file, how do I get watchdog to begin processing the next file in the same series without waiting until processing for the previous file has completed. This is necessary because the files are particular and I need to pause the processing of one file until another file is finished, but the order in which they are created may vary.

Do I need to combine my watchdog with multiprocessing or threading somehow? Or do I need to implement multiple observers? I'm kind of at a loss. Thanks for any help.

class MonitorFiles(FileSystemEventHandler):
    '''Sub-class of watchdog event handler'''

    def __init__(self, config=None, log=None):
        self.log = log
        self.config = config

    def on_created(self, event):
        file = os.path.basename(event.src_path)
        self.log.info('Created file {0}'.format(event.src_path))
        dosWatch.go(event.src_path, self.config, self.log)

    def on_modified(self, event):
        file = os.path.basename(event.src_path)
        ext = os.path.splitext(file)[1]
        if ext == '.fits':
            self.log.warning('Modifying a FITS file is not allowed')
            return

    def on_deleted(self, event):
        self.log.critical('Nothing should ever be deleted from here!')
        return      

Main Monitoring

def monitor(config, log):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code'''

    event_handler = dosclass.MonitorFiles(config, log)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.start()
    log.info('Begin MaNGA DOS!')
    log.info('Start watching directory {0} for new files ...'.format(config.fitsDir))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        log.info('Stop watching directory ...')
        log.info('End MaNGA DOS!')
        log.info('--------------------------')
        log.info('')
    observer.join() 

In the above, my monitor method sets up watchdog to monitor the main directory. The MonitorFiles class defines what happens when a file is created. It basically calls this dosWatch.go method which eventually calls a subprocess.Popen to run a shell command.

like image 427
havok2063 Avatar asked Feb 19 '14 20:02

havok2063


People also ask

Should I use multiprocessing or threading Python?

Multiprocessing is a easier to just drop in than threading but has a higher memory overhead. If your code is CPU bound, multiprocessing is most likely going to be the better choice—especially if the target machine has multiple cores or CPUs.

Is threading better than multiprocessing?

Multiprocessing is used to create a more reliable system, whereas multithreading is used to create threads that run parallel to each other. multithreading is quick to create and requires few resources, whereas multiprocessing requires a significant amount of time and specific resources to create.

Does Python multiprocessing use multiple threads?

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

Is threading faster than multiprocessing?

Threads are faster to start than processes and also faster in task-switching. All Threads share a process memory pool that is very beneficial. Takes lesser time to create a new thread in the existing process than a new process.


2 Answers

Here's what I ended up doing, which solved my problem. I used multiprocessing to start a separate watchdog monitoring process to watch for each file separately. Watchdog already queues up new files for me, which is fine for me.

As for point 2 above, I needed, e.g. a file2 to process before a file1, even though file1 was created first. So during file1 I check for the output of the file2 processing. If it finds it, it goes ahead processing file1. If it doesn't it exits. On file2 processing, I check to see if file1 was created already, and if so, then process file1. (Code for this not shown)

Main Monitoring of Cameras

def monitorCam(camera, config, mainlog):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code.  Monitors each camera.'''

    mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))

    #init cam log
    camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
    camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
    config.camera=camera

    event_handler = dosclass.MonitorFiles(config, camlog, mainlog)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.daemon=True
    observer.start()
    camlog.info('Begin MaNGA DOS!')
    camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
    camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        camlog.info('Stop watching directory ...')
        camlog.info('End MaNGA DOS!')
        camlog.info('--------------------------')
        camlog.info('')
    #observer.join()

    if observer.is_alive():
        camlog.info('still alive')
    else:
        camlog.info('thread ending')    

Start of Multiple Camera Processes

def startProcess(camera,config,log):
    ''' Uses multiprocessing module to start 4 different camera monitoring processes'''

    jobs=[]

    #pdb.set_trace()

    #log.info(mp.log_to_stderr(logging.DEBUG))
    for i in range(len(camera)):
        log.info('Starting to monitor camera {0}'.format(camera[i]))
        print 'Starting to monitor camera {0}'.format(camera[i])
        try:
            p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
            p.daemon=True
            jobs.append(p)
            p.start()
        except KeyboardInterrupt:
            log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
            p.terminate()
            log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))

    for i in range(len(jobs)):
        jobs[i].join()  

    return      
like image 60
havok2063 Avatar answered Oct 21 '22 12:10

havok2063


I'm not sure it would make much sense to do a thread per file. The GIL will probably eliminate any advantage you'd see from doing that and might even impact performance pretty badly and lead to some unexpected behavior. I haven't personally found watchdog to be very reliable. You might consider implementing your own file watcher which can be done fairly easily as in the django framework (see here) by creating a dict with the modified timestamp for each file.

like image 38
David Sanders Avatar answered Oct 21 '22 11:10

David Sanders