Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python script use while loop to keep updating job scripts and multiprocess the tasks in queue

I am trying to write a python script scanning a folder and collect updated SQL script, and then automatically pull data for the SQL script. In the code, a while loop is scanning new SQL file, and send to data pull function. I am having trouble to understand how to make a dynamic queue with while loop, but also have multiprocess to run the tasks in the queue.

The following code has a problem that the while loop iteration will work on a long job before it moves to next iteration and collects other jobs to fill the vacant processor.

Update:

  1. Thanks to @pbacterio for catching the bug, and now the error message is gone. After changing the code, the python code can take all the job scripts during one iteration, and distribute the scripts to four processors. However, it will get hang by a long job to go to next iteration, scanning and submitting the newly added job scripts. Any idea how to reconstruct the code?

  2. I finally figured out the solution see answer below. It turned out what I was looking for is

    the_queue = Queue()
    the_pool = Pool(4, worker_main,(the_queue,))

  3. For those stumble on the similar idea, following is the whole architecture of this automation script converting a shared drive to a 'server for SQL pulling' or any other job queue 'server'.

    a. The python script auto_data_pull.py as shown in the answer. You need to add your own job function.

    b. A 'batch script' with following:

    start C:\Anaconda2\python.exe C:\Users\bin\auto_data_pull.py

    c. Add a task triggered by start computer, run the 'batch script' That's all. It works.

Python Code:

from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = compute(func, args)
        output.put(result)

#
# Function used to compute result
#

def compute(func, args):
    result = func(args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)


def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    for sql_query_dir in sql_query_dirs:
        for root, dirs, filenames in os.walk(sql_query_dir):
            [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                     filename in filenames if filename.endswith('.jsl')]
    return files_dict


##### working in single thread
def single_thread():
    path = "Y:/"

    before = check_files(path)
    sql_queue  = [] 

    while True:
        time.sleep(3)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after

        sql_queue = added + updated
        # print sql_queue
        for sql_file in sql_queue:
            try:
                query_sql(sql_file)
            except:
                pass


##### not working in queue
def multiple_thread():

    NUMBER_OF_PROCESSES = 4
    path = "Y:/"

    sql_queue  = [] 
    before = check_files(path) # get the current dictionary of sql_files
    task_queue = Queue()
    done_queue = Queue()

    while True:         #while loop to check the changes of the files
        time.sleep(5)
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   

        TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
        # Create queues

        #submit task
        for task in TASKS:
            task_queue.put(task)

        for i in range(NUMBER_OF_PROCESSES):
                p = Process(target=worker, args=(task_queue, done_queue)).start()          
            # try:
            #     p = Process(target=worker, args=(task_queue))
            #     p.start()

            # except:
            #     pass 

        # Get and print results
        print 'Unordered results:'
        for i in range(len(TASKS)):
            print '\t', done_queue.get()
        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')        

# single_thread()
if __name__ == '__main__':
    # freeze_support()
    multiple_thread()

Reference:

  1. monitor file changes with python script: http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
  2. Multiprocessing:
    https://docs.python.org/2/library/multiprocessing.html
like image 780
ju. Avatar asked Sep 14 '17 15:09

ju.


3 Answers

Where did you define sql_file in multiple_thread() in

multiprocessing.Process(target=query_sql, args=(sql_file)).start()

You have not defined sql_file in the method and moreover you have used that variable in a for loop. The variable's scope is only confined to the for loop.

like image 145
Bharadwaj Yarlagadda Avatar answered Oct 08 '22 17:10

Bharadwaj Yarlagadda


Try replacing this:

result = func(*args)

by this:

result = func(args)
like image 31
pbacterio Avatar answered Oct 08 '22 17:10

pbacterio


I have figured this out. Thank your for the response inspired the thought. Now the script can run a while loop to monitor the folder for new updated/added SQL script, and then distribute the data pulling to multiple threads. The solution comes from the queue.get(), and queue.put(). I assume the queue object takes care of the communication by itself.

This is the final code --

from glob import glob
import os, time
import sys
import pypyodbc
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support

def query_sql(sql_file): #test func
    #jsl file processing and SQL querying, data table will be saved to csv.
    fo_name = os.path.splitext(sql_file)[0] + '.csv'
    fo = open(fo_name, 'w')
    print sql_file
    fo.write("sql_file {0} is done\n".format(sql_file))
    return "Query is done for \n".format(sql_file)


def check_files(path):
    """
    arguments -- root path to monitor
    returns   -- dictionary of {file: timestamp, ...}
    """
    sql_query_dirs = glob(path + "/*/IDABox/")

    files_dict = {}
    try:
        for sql_query_dir in sql_query_dirs:
            for root, dirs, filenames in os.walk(sql_query_dir):
                [files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for 
                         filename in filenames if filename.endswith('.jsl')]
    except:
        pass

    return files_dict


def worker_main(queue):
    print os.getpid(),"working"
    while True:
        item = queue.get(True)
        query_sql(item)

def main():
    the_queue = Queue()
    the_pool = Pool(4, worker_main,(the_queue,))

    path = "Y:/"
    before = check_files(path) # get the current dictionary of sql_files
    while True:         #while loop to check the changes of the files
        time.sleep(5)
        sql_queue  = [] 
        after = check_files(path)
        added = [f for f in after if not f in before]
        deleted = [f for f in before if not f in after]
        overlapped = list(set(list(after)) & set(list(before)))
        updated = [f for f in overlapped if before[f] < after[f]]  

        before = after  
        sql_queue = added + updated   
        if sql_queue:
            for jsl_file in sql_queue:
                try:
                    the_queue.put(jsl_file)
                except:
                    print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0]))
                    pass
        else:
            pass

if __name__ == "__main__":
    main()  
like image 20
ju. Avatar answered Oct 08 '22 16:10

ju.