Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

running each element in array in parallel in bash script

Tags:

bash

shell

Lets say I have a bash script that looks like this:

array=( 1 2 3 4 5 6 )

for each in "${array[@]}"
do
  echo "$each"

  command --arg1 $each

done

If I want to run the everything in the loop in parallel, I could just change command --arg1 $each to command --arg1 $each &.

But now lets say I want to take the results of command --arg1 $each and do something with those results like this:

array=( 1 2 3 4 5 6 )
for each in "${array[@]}"
do
  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done

done

If I just add a & to the end of command --arg1 $each, everything after command --arg1 $each will run without command --arg1 $each finishing first. How do I prevent that from happening? Also, how do I also limit the amount of threads the loop can occupy?

Essentially, this block should run in parallel for 1,2,3,4,5,6

  echo "$each"

  lags=($(command --arg1 $each)

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done

-----EDIT--------

Here is the original code:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

get_lags () {

  echo "$1"

  lags=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe  --group $1 --command-config /etc/kafka/client.properties --new-consumer))

  lngth_lags=${#lags[*]}

  for (( i=1; i<=$(( $lngth_lags -1 )); i++))
  do

    result=${lags[$i]}
    echo -e "$timestamp\t$result" >> $log_file
    echo "result piped"

  done
}

for each in "${array[@]}"
do 

   get_lags $each &

done

------EDIT 2-----------

Trying with answer below:

#!/bin/bash
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka.client.jaas.conf"
IFS=$'\n'
array=($(kafka-consumer-groups --bootstrap-server kafka1:9092 --list --command-config /etc/kafka/client.properties --new-consumer))

lngth=${#array[*]}

echo "array length: " $lngth

timestamp=$(($(date +%s%N)/1000000))

log_time=`date +%Y-%m-%d:%H`

echo "log time: " $log_time

log_file="/home/ec2-user/laglogs/laglog.$log_time.log"

echo "log file: " $log_file

echo "timestamp: " $timestamp

max_proc_count=8

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$'\n' read -r -d '' -a lags < <(kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --command-config /etc/kafka/client.properties --new-consumer --group "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'
like image 233
lightweight Avatar asked Apr 09 '17 15:04

lightweight


2 Answers

The convenient thing to do is to push your background code into a separate script -- or an exported function. That way xargs can create a new shell, and access the function from its parent. (Be sure to export any other variables that need to be available in the child as well).

array=( 1 2 3 4 5 6 )
max_proc_count=8
log_file=out.txt

run_for_each() {
  local each=$1
  echo "Processing: $each" >&2
  IFS=$' \t\n' read -r -d '' -a lags < <(yourcommand --arg1 "$each" && printf '\0')
  for result in "${lags[@]}"; do
    printf '%(%Y-%m-%dT%H:%M:%S)T\t%s\t%s\n' -1 "$each" "$result"
  done >>"$log_file"
}

export -f run_for_each
export log_file # make log_file visible to subprocesses

printf '%s\0' "${array[@]}" |
  xargs -P "$max_proc_count" -n 1 -0 bash -c 'run_for_each "$@"'

Some notes:

  • Using echo -e is bad form. See the APPLICATION USAGE and RATIONALE sections in the POSIX spec for echo, explicitly advising using printf instead (and not defining an -e option, and explicitly defining than echo must not accept any options other than -n).
  • We're including the each value in the log file so it can be extracted from there later.
  • You haven't specified whether the output of yourcommand is space-delimited, tab-delimited, line-delimited, or otherwise. I'm thus accepting all these for now; modify the value of IFS passed to the read to taste.
  • printf '%(...)T' to get a timestamp without external tools such as date requires bash 4.2 or newer. Replace with your own code if you see fit.
  • read -r -a arrayname < <(...) is much more robust than arrayname=( $(...) ). In particular, it avoids treating emitted values as globs -- replacing *s with a list of files in the current directory, or Foo[Bar] with FooB should any file by that name exist (or, if the failglob or nullglob options are set, triggering a failure or emitting no value at all in that case).
  • Redirecting stdout to your log_file once for the entire loop is somewhat more efficient than redirecting it every time you want to run printf once. Note that having multiple processes writing to the same file at the same time is only safe if all of them opened it with O_APPEND (which >> will do), and if they're writing in chunks small enough to individually complete as single syscalls (which is probably happening unless the individual lags values are quite large).
like image 175
Charles Duffy Avatar answered Sep 24 '22 12:09

Charles Duffy


A lot of lenghty and theoretical answers here, I'll try to keep it simple - what about using | (pipe) to connect the commands as usual ?;) (And GNU parallel, which excels for these type of tasks).

seq 6 | parallel -j4 "command --arg1 {} | command2 > results/{}"

The -j4 will limit number of threads (jobs) as requested. You DON'T want to write to a single file from multiple jobs, output one file per job and join them after the parallel processing is finished.

like image 30
liborm Avatar answered Sep 25 '22 12:09

liborm