I want to use Python to copy a local file up to several remote hosts in parallel. I'm trying to do that with asyncio
and Paramiko, since I'm already using these libraries for other purposes in my program.
I'm using BaseEventLoop.run_in_executor()
and the default ThreadPoolExecutor
, which is effectively a new interface to the old threading
library, along with Paramiko's SFTP feature to do the copying.
Here's a simplified example of how.
import sys
import asyncio
import paramiko
import functools
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
ssh_client = paramiko.client.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
ssh_client.connect(
username=user,
hostname=host,
key_filename=identity_file,
timeout=3)
with ssh_client:
with ssh_client.open_sftp() as sftp:
print("[{h}] Copying file...".format(h=host))
sftp.put(localpath=local_path, remotepath=remote_path)
print("[{h}] Copy complete.".format(h=host))
loop = asyncio.get_event_loop()
tasks = []
# NOTE: You'll have to update the values being passed in to
# `functools.partial(copy_file_node, ...)`
# to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
task = loop.run_in_executor(
None,
functools.partial(
copy_file_node,
user='user',
host=host,
identity_file='/path/to/identity_file',
local_path='/path/to/local/file',
remote_path='/path/to/remote/file'))
tasks.append(task)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
print("At least one node raised an error:", e, file=sys.stderr)
sys.exit(1)
loop.close()
The problem I'm seeing is that the file gets copied up serially to the hosts instead of in parallel. So if the copy takes 5 seconds for a single host, it takes 10 seconds for two hosts, and so on.
I've tried various other approaches, including ditching SFTP and piping the file to dd
on each of the remote hosts via exec_command()
, but the copies always happen serially.
I'm probably misunderstanding some basic idea here. What's keeping the different threads from copying the file in parallel?
From my testing, it appears that the holdup happens on the remote write, not on reading the local file. But why would that be, since we are attempting network I/O against independent remote hosts?
There is nothing wrong with your usage of asyncio.
To prove it, let's try a simplified version of your script - no paramiko, just pure Python.
import asyncio, functools, sys, time
START_TIME = time.monotonic()
def log(msg):
print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))
def dummy(thread_id):
log('Thread {} started'.format(thread_id))
time.sleep(1)
log('Thread {} finished'.format(thread_id))
loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
With two threads, this will print:
$ python3 async.py 2
0.001 Thread 0 started
0.002 Thread 1 started <-- 2 tasks are executed concurrently
1.003 Thread 0 finished
1.003 Thread 1 finished <-- Total time is 1 second
This concurrency scales up to 5 threads:
$ python3 async.py 5
0.001 Thread 0 started
...
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
...
1.005 Thread 4 finished <-- Total time is still 1 second
If we add one more thread, we hit the thread pool limit:
$ python3 async.py 6
0.001 Thread 0 started
0.001 Thread 1 started
0.002 Thread 2 started
0.003 Thread 3 started
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
1.003 Thread 5 started <-- 6th task is executed after 1 second
1.003 Thread 1 finished
1.004 Thread 2 finished
1.004 Thread 3 finished
1.004 Thread 4 finished <-- 5 task are completed after 1 second
2.005 Thread 5 finished <-- 6th task is completed after 2 seconds
Everything goes as expected, and overall time grows by 1 second for every 5 items. Magic number 5 is documented in ThreadPoolExecutor docs:
Changed in version 3.5: If max_workers is
None
or not given, it will default to the number of processors on the machine, multiplied by5
, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
How can a third-party library block my ThreadPoolExecutor?
Library uses some kind of global lock. It means that library does not support multi-threading. Try using ProcessPoolExecutor, but with caution: library may contain other anti-patterns, such as using the same hardcoded temporary file name.
Function executes for a long time and doesn't release GIL. It may indicate a bug in C extension code, but the most popular reason to holding the GIL is doing some CPU-intensive computations. Again, you can try ProcessPoolExecutor, as it isn't affected by GIL.
None of these is expected to happen with a library like paramiko.
How can a third-party library block my ProcessPoolExecutor?
It usually can't. Your tasks are executed in separate processes. If you see that two tasks in ProcessPoolExecutor take twice as much time, suspect resource bottleneck (such as consuming 100% of the network bandwidth).
I'm not sure this is the best way to approach it, but it works for me
#start
from multiprocessing import Process
#omitted
tasks = []
for host in hosts:
p = Process(
None,
functools.partial(
copy_file_node,
user=user,
host=host,
identity_file=identity_file,
local_path=local_path,
remote_path=remote_path))
tasks.append(p)
[t.start() for t in tasks]
[t.join() for t in tasks]
based on comment, added a datestamp and captured the output from multiprocessing and got this:
2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With