Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create jobs that yield results as they complete?

The Problem

Consider you have 4 machines.

  • Machine A is slow
  • Machine B is medium speed,
  • Machine C is fast.
  • LocalHost is ultra-fast.

On each remote machine, you want to sum the first 1 million prime numbers. You can do this from the local host with:

$servers = @("MachineA","MachineB","MachineC")
Invoke-Command -ComputerName $servers -ScriptBlock {
    Sum-FirstMillionPrimes
}

As this is written, results will not be displayed (yielded) until the slowest machine is finished.

To speed this up, you try to perform this as a job:

$servers = @("MachineA","MachineB","MachineC")
Invoke-Command -ComputerName $servers -ScriptBlock {
    Sum-FirstMillionPrimes
} -AsJob

while ($null -ne (Get-Job)) {
    $doneChildJob = Get-Job | Wait-Job -Any
    $processResult = $doneChildJob | Receive-Job -AutoRemoveJob -Wait
    $processResult
}

This still has the same problem, because according to the documentation (example 8):

The command uses the AsJob parameter to run the command as a background
job. This command returns a job object that contains two child job
objects, one for each of the jobs run on the two remote computers.

This means for us that we are running three child jobs, but the parent job will not return until all child jobs are completed.

How can you write this in a way that the results from the child jobs will be yielded back as they finish?

What I've Tried

We have come up with a solution that appears to work, but this problem seems common enough that there should be a PowerShell way to handle this.

# Create a HashSet of jobs that have already been processed. This is important
# because child jobs cannot be removed via Remove-Job. There doesn't seem to be
# a way to determine if the job has been received
[System.Collections.Generic.HashSet[int]]$processedJobIds = @()
while ($null -ne (Get-Job)) {
    # We only want to attempt to process jobs that have no children that we
    # haven't seen. The -IncludeChildJob parameter allows us to see the nested
    # children jobs from Invoke-Command -AsJob. Because we can't determine if a
    # child job has already been received, we filter based on our above hashset.
    $doneChildJob = Get-Job -IncludeChildJob | Where-Object { $_.ChildJobs.Count -eq 0 -and (-not ($processedJobIds.Contains($_.Id))) } | Wait-Job -Any
    if ($null -eq $doneChildJob) {
        #   The $doneChildJob filter will exclude the parent job created by
        # Invoke-Command -AsJob. However, we still need to eventually remove
        # this job, otherwise we'd hit an infinite loop.
        #   The assumption is that the only way that $doneChildJob will evaluate to
        # $null is if all child jobs have completed. If all child jobs are
        # completed, the remaining job(s) should be safe to remove as they are
        # expected to be parent jobs.
        Get-Job | Remove-Job
    }
    else {
        # We need to process the child jobs
        $processResult = $doneChildJob | Receive-Job -Wait
        $processResult
        $processedJobIds.Add($doneChildJob.Id) | Out-Null
        # By default, Get-Job does not return children jobs (i.e they are
        # parents and can be removed by Remove-Job). Based on this behavior,
        # if $processedJobIds contains any of these jobs, they are safe to
        # remove, and should also be removed from our $processedJobIds list.
        Get-Job | Where-Object { $processedJobIds.Contains($_.Id) } | ForEach-Object {
            $processedJobIds.Remove($_.Id) | Out-Null
            Remove-Job $_
        }
    }
}

Given this following code, we have ran it with these examples and it appears to work:

Import-Module ThreadJob

$servers = @("MachineA", "MachineB", "MachineC")
$sessions = New-PSSession -ComputerName $servers

Invoke-Command -Session $sessions -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    $firstMillionPrimes = Sum-FirstMillionPrimes
    Write-Output "$computerName - $firstMillionPrimes"
} -AsJob | Out-Null

# It should also handle when one of the child jobs fails but not all
Invoke-Command -ComputerName $servers -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    if ($computerName -eq "MachineA") {
        Throw "This is a remote invoke FAILURE on $computerName"
    }
    else{
        $computerName = [System.Net.Dns]::GetHostName()
        $firstMillionPrimes = Sum-FirstMillionPrimes
        Write-Output "$computerName - $firstMillionPrimes"
    }
} -AsJob | Out-Null

# In addition to the jobs started on multiple sessions, this also needs
# to be robust enough to handle other jobs running locally.
Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Out-Null

# It also needs to handle jobs created by Start-ThreadJob
Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Out-Null

# It also needs to handle jobs that have a state of Failed
Start-ThreadJob -ScriptBlock { throw "My job State will be Failed" } | Out-Null

# It should handle nested jobs that are successful
Start-Job -ScriptBlock { Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null
Start-Job -ScriptBlock { Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null
Start-ThreadJob -ScriptBlock { Start-ThreadJob -ScriptBlock { Sum-FirstMillionPrimes } | Receive-Job -Wait} | Out-Null

# It should handle nested jobs that are failures
Start-Job -ScriptBlock { Start-ThreadJob -ScriptBlock { throw "Handles nested thread jobs that fail" } | Receive-Job -Wait} | Out-Null
Start-Job -ScriptBlock { Start-Job -ScriptBlock { throw "Handles nested jobs that fail" } | Receive-Job -Wait} | Out-Null
Start-ThreadJob -ScriptBlock { Start-ThreadJob -ScriptBlock { throw "Handles nested thread jobs in thread jobs that fail" } | Receive-Job -Wait} | Out-Null

Expected output (simulated), this will be yielded back to the terminal as processing finishes. In the case of exceptions, it will be almost instantaneous, but on long calculations, the results may be interspersed as they complete:

This is a remote invoke FAILURE on MachineA
    + CategoryInfo          : OperationStopped: (This is a remote invoke FAILURE on MachineA:String) [], RuntimeException
    + FullyQualifiedErrorId : This is a remote invoke FAILURE on MachineA
    + PSComputerName        : MachineA
My job State will be Failed
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested thread jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Handles nested thread jobs in thread jobs that fail
    + CategoryInfo          : InvalidResult: (:) [], RuntimeException
    + FullyQualifiedErrorId : JobStateFailed
Localhost - (FirstMillionPrimes)
MachineC - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineC - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineB - (FirstMillionPrimes)
Localhost - (FirstMillionPrimes)
MachineB - (FirstMillionPrimes)
MachineA - (FirstMillionPrimes)

This solution that we've come up with appears to work, but it seems really heavy handed. Is there a better way/pattern in PowerShell to yield the results as they complete?

like image 673
Nick Kimbrough Avatar asked Nov 06 '22 01:11

Nick Kimbrough


1 Answers

Sounds like the PSRemotingJob.StateChanged Event might work for you. Something like this:

$global:results = [System.Collections.ArrayList]::new()

# create action scriptblock for eventhandling
$onJobFinish = {
    # only run action if job has terminated
    if ($Event.Sender.State -in @('Completed', 'Failed', 'Stopped', 'Suspended', 'Disconnected')) {
        $localResults = $Event.Sender | Receive-Job

        # immediately send output to screen
        $localResults | Out-Host

        # also add output to collection to work with later
        $global:results.Add($localResults) | Out-Null
    }
}

Invoke-Command -Session $sessions -ScriptBlock {
    $computerName = [System.Net.Dns]::GetHostName()
    $firstMillionPrimes = Sum-FirstMillionPrimes
    Write-Output "$computerName - $firstMillionPrimes"
} -AsJob | 
    Select-Object -ExpandProperty ChildJobs | ForEach-Object {
    # Register our action to run wheneven a child job's state changes
        Register-ObjectEvent -InputObject $_ -EventName 'StateChanged' -Action $onJobFinish
    }

Start-Job -ScriptBlock { Sum-FirstMillionPrimes } | Select-Object -ExpandProperty ChildJobs | ForEach-Object {
    # Register our action to run wheneven a child job's state changes
    Register-ObjectEvent -InputObject $_ -EventName 'StateChanged' -Action $onJobFinish
}

# access all results that have been received thus far
$global:results | Format-Table

Update

You can also do something like this where you just add all the jobs to a single collection and perform a loop while they are running/have data. You can output data as it is available this way instead of having to wait for job completion.

$jobs = @()
$jobs += Invoke-Command -ScriptBlock $sb -ComputerName $computers -AsJob
$jobs += Start-Job -ScriptBlock $sb2
$jobs += Start-ThreadJob -ScriptBlock $sb3

$results = [System.Collections.ArrayList]::new()

while ($jobs | Where-Object { 
        $_.State -notin @('Completed', 'Failed', 'Stopped', 'Suspended', 'Disconnected') 
    }) {
    $localData = $jobs | Receive-Job
    $localData | Format-Table
    $results.Add($localData) | Out-Null

    Start-Sleep -Seconds 1
}
# Add one more collection of data for good measure
$localData = $jobs | Receive-Job
$localData | Format-Table
$results.Add($localData) | Out-Null
like image 145
Daniel Avatar answered Nov 15 '22 13:11

Daniel