Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Copying one file to multiple remote hosts in parallel over SFTP

I want to use Python to copy a local file up to several remote hosts in parallel. I'm trying to do that with asyncio and Paramiko, since I'm already using these libraries for other purposes in my program.

I'm using BaseEventLoop.run_in_executor() and the default ThreadPoolExecutor, which is effectively a new interface to the old threading library, along with Paramiko's SFTP feature to do the copying.

Here's a simplified example of how.

import sys
import asyncio
import paramiko
import functools


def copy_file_node(
        *,
        user: str,
        host: str,
        identity_file: str,
        local_path: str,
        remote_path: str):
    ssh_client = paramiko.client.SSHClient()
    ssh_client.load_system_host_keys()
    ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())

    ssh_client.connect(
        username=user,
        hostname=host,
        key_filename=identity_file,
        timeout=3)

    with ssh_client:
        with ssh_client.open_sftp() as sftp:
            print("[{h}] Copying file...".format(h=host))
            sftp.put(localpath=local_path, remotepath=remote_path)
            print("[{h}] Copy complete.".format(h=host))


loop = asyncio.get_event_loop()

tasks = []

# NOTE: You'll have to update the values being passed in to
#      `functools.partial(copy_file_node, ...)`
#       to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
    task = loop.run_in_executor(
        None,
        functools.partial(
            copy_file_node,
            user='user',
            host=host,
            identity_file='/path/to/identity_file',
            local_path='/path/to/local/file',
            remote_path='/path/to/remote/file'))
    tasks.append(task)

try:
    loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
    print("At least one node raised an error:", e, file=sys.stderr)
    sys.exit(1)

loop.close()

The problem I'm seeing is that the file gets copied up serially to the hosts instead of in parallel. So if the copy takes 5 seconds for a single host, it takes 10 seconds for two hosts, and so on.

I've tried various other approaches, including ditching SFTP and piping the file to dd on each of the remote hosts via exec_command(), but the copies always happen serially.

I'm probably misunderstanding some basic idea here. What's keeping the different threads from copying the file in parallel?

From my testing, it appears that the holdup happens on the remote write, not on reading the local file. But why would that be, since we are attempting network I/O against independent remote hosts?

like image 790
Nick Chammas Avatar asked Oct 24 '15 04:10

Nick Chammas


2 Answers

There is nothing wrong with your usage of asyncio.

To prove it, let's try a simplified version of your script - no paramiko, just pure Python.

import asyncio, functools, sys, time

START_TIME = time.monotonic()

def log(msg):
    print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))

def dummy(thread_id):
    log('Thread {} started'.format(thread_id))
    time.sleep(1)
    log('Thread {} finished'.format(thread_id))

loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
    task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
    tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

With two threads, this will print:

$ python3 async.py 2
  0.001 Thread 0 started
  0.002 Thread 1 started       <-- 2 tasks are executed concurrently
  1.003 Thread 0 finished
  1.003 Thread 1 finished      <-- Total time is 1 second

This concurrency scales up to 5 threads:

$ python3 async.py 5
  0.001 Thread 0 started
  ...
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  ...
  1.005 Thread 4 finished      <-- Total time is still 1 second

If we add one more thread, we hit the thread pool limit:

$ python3 async.py 6
  0.001 Thread 0 started
  0.001 Thread 1 started
  0.002 Thread 2 started
  0.003 Thread 3 started
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  1.003 Thread 5 started       <-- 6th task is executed after 1 second
  1.003 Thread 1 finished
  1.004 Thread 2 finished
  1.004 Thread 3 finished
  1.004 Thread 4 finished      <-- 5 task are completed after 1 second
  2.005 Thread 5 finished      <-- 6th task is completed after 2 seconds

Everything goes as expected, and overall time grows by 1 second for every 5 items. Magic number 5 is documented in ThreadPoolExecutor docs:

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

How can a third-party library block my ThreadPoolExecutor?

  • Library uses some kind of global lock. It means that library does not support multi-threading. Try using ProcessPoolExecutor, but with caution: library may contain other anti-patterns, such as using the same hardcoded temporary file name.

  • Function executes for a long time and doesn't release GIL. It may indicate a bug in C extension code, but the most popular reason to holding the GIL is doing some CPU-intensive computations. Again, you can try ProcessPoolExecutor, as it isn't affected by GIL.

None of these is expected to happen with a library like paramiko.

How can a third-party library block my ProcessPoolExecutor?

It usually can't. Your tasks are executed in separate processes. If you see that two tasks in ProcessPoolExecutor take twice as much time, suspect resource bottleneck (such as consuming 100% of the network bandwidth).

like image 116
alexanderlukanin13 Avatar answered Nov 18 '22 18:11

alexanderlukanin13


I'm not sure this is the best way to approach it, but it works for me

#start
from multiprocessing import Process

#omitted

tasks = []
for host in hosts:
    p = Process(
        None,
        functools.partial(
          copy_file_node,
          user=user,
          host=host,
          identity_file=identity_file,
          local_path=local_path,
          remote_path=remote_path))

    tasks.append(p)

[t.start() for t in tasks]
[t.join() for t in tasks]

based on comment, added a datestamp and captured the output from multiprocessing and got this:

2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.
like image 3
tlastowka Avatar answered Nov 18 '22 18:11

tlastowka