Lets go full parallel with bash

Submitted by-badmin- ondim 11/09/2016 - 17:59

It's sunday, it's a nice day to go for a walk, but well... "Computers".

Let's assume the following problem:

You have a folder containing 100 files, let's name it /path. Each file has to be given as argument to a command line program we'll simply call "tool". The tool can be anything, an image converter, a cryptographic program, whatever.

The simplest instruction I can come up with is

for i in /path/*; do tool $i; done

Now there could be some caveats with subdirectories treated like files, there could be issues with files containing spaces, etc.

So first, let's make our command more robust

find ./path -type f -print0 | xargs -0 -I {} tool "{}"

This version should work in most cases, but still could be improved. What if the tool is not threaded ? It could run on one core only of your fresh 24 core server ! What a waste of time. There's indeed  --max-procs option to xargs that would have multiple processes executed at once. We could also have GNU parallel do the job

find ./path -type f | parallel tool

Now let's imagine that you need a bit more control over the commands you need to run. For example you need to get each return code, or you need to interrupt the execution if it takes more than 2 hours.

Also, we might want a fancy spinner in order to see if the script runs in shell, and a regular log message so we know what happened when reading log files.

We might even want to handle (it's actually more bypassing) zombie or uninterruptible processes, meaning that they can be ignored.

Let's code a bash function that does allow running parallel commands while keeping control on the script.
It would take the following arguments

ParallelExec [1] [2] [3] [4] [5] [6] [7] [8] [9] [10]

1: Integer: number of simultaneous processes to run

2: Command list variable, separated by semicolons, or path to file containing commands, one per lne

3: Boolean: set to false if command list variable given, set to true if path to file given

4: Integer: After how much seconds does the function trigger a warning message

5: Integer: After how much seconds does the function forcefully stop execution and triggers a warning message

6: Real number: How much time (in seconds) between function checks for processes

7: Integer: Every X seconds log a message so we know the function is still alive

8: Boolean: set to true in order to count seconds since the beginning of function, set to false in order to count seconds since the beginning of script

9: Boolean: set to true to show a spinner, set to false to hide spinner

10: Boolean: set to true in order to disable error logging, set to false to keep error logging

 

The following one would run 4 simultaneous sleep commands as long as there are commands to run. It would stop execution forcefully after 1800 seconds, check execution every 0.5 seconds, log a status message every 300 seconds.

commands="sleep 10;sleep 10;sleep 5; sleep 7; sleep 10"

ParallelExec 4 "$commands" false 0 1800 .5 300 true true false

The following example shows how to get output from the commands

RUN_DIR=/tmp

function test {

    echo "find ./" >> ./command_file

    echo "du ./" >> ./command_file

    echo "sleep 10" >> ./command_file

    ParallelExec 4 "./command_file" true 0 1800 .5 300 true true false

}

test

echo /tmp.ParallelExec.test

Actual output will be put in file /tmp.ParallelExec.test (function name & caller function name)

 

Now here's the actual source of ParallelExec. Some light changes are done in order to use the function out of context.

Functions Logger, JoinString and Spinner are stripped down versions of actual functions in order to make 

 

_OFUNCTIONS_SPINNER="|/-\\"
function Spinner {
    if [ $_LOGGER_SILENT == true ] || [ "$_LOGGER_ERR_ONLY" == true ]; then
        return 0
    else
        printf " [%c]  \b\b\b\b\b\b" "$_OFUNCTIONS_SPINNER"
        #printf "\b\b\b\b\b\b"
        _OFUNCTIONS_SPINNER=${_OFUNCTIONS_SPINNER#?}${_OFUNCTIONS_SPINNER%%???}
        return 0
    fi
}


function joinString {
    local IFS="$1"; shift; echo "$*";
}

function Logger {

    echo "$2: $1"

}

function ParallelExec {
    local numberOfProcesses="${1}"         # Number of simultaneous commands to run
    local commandsArg="${2}"         # Semi-colon separated list of commands, or path to file containing one command per line
    local readFromFile="${3:-false}"     # commandsArg is a file (true), or a string (false)
    local softMaxTime="${4:-0}"        # If process(es) with pid(s) $pids take longer than $softMaxTime seconds, will log a warning, unless $softMaxTime equals 0.
    local hardMaxTime="${5:-0}"        # If process(es) with pid(s) $pids take longer than $hardMaxTime seconds, will stop execution, unless $hardMaxTime equals 0.
    local sleepTime="${6:-.05}"        # Seconds between each state check, the shorter this value, the snappier it will be, but as a tradeoff cpu power will be used (general values between .05 and 1).
    local keepLogging="${7:-0}"        # Every keepLogging seconds, an alive log message is send. Setting this value to zero disables any alive logging.
    local counting="${8:-true}"        # Count time since function has been launched (true), or since script has been launched (false)
    local spinner="${9:-false}"        # Show spinner (true), don't show spinner (false)
    local noErrorLog="${10:-false}"        # Log errors when reaching soft / hard max time (false), don't log errors on those triggers (true)

    local callerName="${FUNCNAME[1]}"

    local log_ttime=0 # local time instance for comparaison

    local seconds_begin=$SECONDS # Seconds since the beginning of the script
    local exec_time=0 # Seconds since the beginning of this function

    local commandCount
    local command
    local pid
    local counter=0
    local commandsArray
    local pidsArray
    local newPidsArray
    local retval
    local errorCount=0
    local pidState
    local commandsArrayPid

    local hasPids=false # Are any valable pids given to function ?        #__WITH_PARANOIA_DEBUG

    if [ $counting == true ]; then     # If counting == false _SOFT_ALERT should be a global value so no more than one soft alert is shown
        local _SOFT_ALERT=false # Does a soft alert need to be triggered, if yes, send an alert once
    fi

    if [ $readFromFile == true ];then
        if [ -f "$commandsArg" ]; then
            commandCount=$(wc -l < "$commandsArg")
        else
            commandCount=0
        fi
    else
        IFS=';' read -r -a commandsArray <<< "$commandsArg"
        commandCount=${#commandsArray[@]}
    fi

    Logger "Runnning $commandCount commands in $numberOfProcesses simultaneous processes." "DEBUG"

    while [ $counter -lt "$commandCount" ] || [ ${#pidsArray[@]} -gt 0 ]; do

        if [ $spinner == true ]; then
            Spinner
        fi

        if [ $counting == true ]; then
            exec_time=$(($SECONDS - $seconds_begin))
        else
            exec_time=$SECONDS
        fi

        if [ $keepLogging -ne 0 ]; then
            if [ $((($exec_time + 1) % $keepLogging)) -eq 0 ]; then
                if [ $log_ttime -ne $exec_time ]; then # Fix when sleep time lower than 1s
                    log_ttime=$exec_time
                    Logger "Current tasks still running with pids [$(joinString , ${pidsArray[@]})]." "NOTICE"
                fi
            fi
        fi

        if [ $exec_time -gt $softMaxTime ]; then
            if [ "$_SOFT_ALERT" != true ] && [ $softMaxTime -ne 0 ] && [ $noErrorLog != true ]; then
                Logger "Max soft execution time exceeded for task [$callerName] with pids [$(joinString , ${pidsArray[@]})]." "WARN"
                _SOFT_ALERT=true
                Logger "Alert message" "WARN"
            fi
        fi
        if [ $exec_time -gt $hardMaxTime ] && [ $hardMaxTime -ne 0 ]; then
            if [ $noErrorLog != true ]; then
                Logger "Max hard execution time exceeded for task [$callerName] with pids [$(joinString , ${pidsArray[@]})]. Stopping task execution." "ERROR"
            fi
            for pid in "${pidsArray[@]}"; do
                KillChilds $pid true
                if [ $? == 0 ]; then
                    Logger "Task with pid [$pid] stopped successfully." "NOTICE"
                else
                    Logger "Could not stop task with pid [$pid]." "ERROR"
                fi
            done
            if [ $noErrorLog != true ]; then
                SendAlert true
            fi
            # Return the number of commands that haven't run / finished run
            return $(($commandCount - $counter + ${#pidsArray[@]}))
        fi

        while [ $counter -lt "$commandCount" ] && [ ${#pidsArray[@]} -lt $numberOfProcesses ]; do
            if [ $readFromFile == true ]; then
                command=$(awk 'NR == num_line {print; exit}' num_line=$((counter+1)) "$commandsArg")
            else
                command="${commandsArray[$counter]}"
            fi
            Logger "Running command [$command]." "DEBUG"
            eval "$command" >> "$RUN_DIR/${FUNCNAME[0]}.$callerName" 2>&1 &
            pid=$!
            pidsArray+=($pid)
            commandsArrayPid[$pid]="$command"
            counter=$((counter+1))
        done


        newPidsArray=()
        for pid in "${pidsArray[@]}"; do
            if [ $(IsInteger $pid) -eq 1 ]; then
                # Handle uninterruptible sleep state or zombies by ommiting them from running process array (How to kill that is already dead ? :)
                if kill -0 $pid > /dev/null 2>&1; then
                    #pidState=$(ps -p$pid -o state= 2 > /dev/null
                    if [ "$pidState" != "D" ] && [ "$pidState" != "Z" ]; then
                        newPidsArray+=($pid)
                    fi
                else
                    # pid is dead, get it's exit code from wait command
                    wait $pid
                    retval=$?
                    if [ $retval -ne 0 ]; then
                        Logger "Command [${commandsArrayPid[$pid]}] failed with exit code [$retval]." "ERROR"
                        errorCount=$((errorCount+1))
                    fi
                fi
            fi
        done
        pidsArray=("${newPidsArray[@]}")

        # Trivial wait time for bash to not eat up all CPU
        sleep $sleepTime
    done

    return $errorCount
}