I am trying to create a script that will be executed every 10 minutes. Each time I have to check if there are new files in specific folder in my computer and if yes, there are some functions that would run on this file in order to get some values. These values will be written to excel file. The problem is that every time this function will be executed, the variables that contain the path to all the files will be generated again, and the program will go over all the files. How can I handle this problem? Thanks
You can use the modification time by replacing st_ctime with st_mtime . You can pass this newly created file new_file to your function, and assign the new path that is generated by this function to the variable path . Show activity on this post.
Start by initializing variables:
savedSet=set()
mypath=… #YOUR PATH HERE
At the end of each cycle, save a set of file names, creation times and sizes in tuple format to another variable. When retrieving files, do the following:
-Retrieve a set of file paths
nameSet=set()
for file in os.listdir(path):
fullpath=os.path.join(mypath, file)
if os.path.isfile(fullpath):
nameSet.add(file)
-Create tuples
retrievedSet=set()
for name in nameSet:
stat=os.stat(os.path.join(mypath, name))
time=ST_CTIME
#size=stat.ST_SIZE If you add this, you will be able to detect file size changes as well.
#Also consider using ST_MTIME to detect last time modified
retrievedSet.add((name,time))
-Compare set with saved set to find new files
newSet=retrievedSet-savedSet
-Compare set with saved set to find removed files
deletedSet=savedSet-retrievedSet
-Run your functions on files with names from newSet -Update saved set
savedSet=newSet
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
def on_any_event(self, event):
print(event.event_type, event.src_path)
def on_created(self, event):
print("on_created", event.src_path)
print(event.src_path.strip())
if((event.src_path).strip() == ".\test.xml"):
print("Execute your logic here!")
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path='.', recursive=False)
observer.start()
while True:
try:
pass
except KeyboardInterrupt:
observer.stop()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With