markekraus / ConcurrentPowerShellProducerConsumer

Concurrent Programming in PowerShell with the Producer Consumer Pattern
MIT License
39 stars 4 forks source link

Concurrent Programming in PowerShell with the Producer Consumer Pattern

Topics

Who Is Mark Kraus

markekraus

What is Concurrent Computing?

Concurrent computing is a form of computing in which several computations are executed during overlapping time periods—concurrently—instead of sequentially (one completing before the next starts). (Wikipedia)

Serial vs Concurrent vs Parallel

Terms

Serial

Task 1: +
Task 2: =

Core 1: +++++++++++++++++++++++++++===========================

Parallel

Task 1: +
Task 2: =

Core 1:   +++++++++++++++++++++++++++
Thread 1: +++++++++++++++++++++++++++

Core 2:   ===========================
Thread 1: ===========================

Concurrent

Task 1: +
Task 2: =

Core 1:   +++===++++++======+++===++++++======+++++++++=========
Thread 1: +++++++++++++++++++++++++++
Thread 2: ===========================

PowerShell Concurrency Options

A quick recap of the available concurrent programming methods in PowerShell.

Producer-Consumer Pattern

What is the Produce-Consumer Pattern?

Get-Job | Wait-Job | Receive-Job

Producer-Consumer in Concurrent Programming

Widget Factory Analogy

The Widget Factory turns monads into widgets.

Receiving:

Manufacturing:

Shipping:

Back to Programming

We want all or workers 100% utilized at all times unless there is no work to be done!

Real World Example: Inbox Rules

Gist

Secret Ingredients

using namespace System.Collections.Concurrent
$Queue = [BlockingCollection[PSObject]]::new(
    [ConcurrentQueue[PSObject]]::new()
)
$RunspacePool = [runspacefactory]::CreateRunspacePool(1,4)
$RunspacePool.Open()
$Runspace = [PowerShell]::Create()
$Runspace.RunspacePool = $RunspacePool
$Runspace.AddScript($ScriptBlock).AddArgument($Queue)
$Runspace.BeginInvoke()

Blocking

Thread 1:

 foreach ($LogEntry in $LogQueue.GetConsumingEnumerable()) {
     # do stuff
 }

Thread 2:

$LogQueue.Add($Message1)
$LogQueue.Add($Message2)
$LogQueue.Add($Message3)
$LogQueue.CompleteAdding()

Stacks for Thread Tracking

 $FileConsumerStack.Add(
    [System.Threading.Thread]::CurrentThread.ManagedThreadId
)
# do stuff
# Remove a thread from the stack
$null = $FileConsumerStack.Take()
# Close LogQueue if this is the last thread
if($FileConsumerStack.Count -lt 1) {
    $FileConsumerStack.CompleteAdding()
    $LogQueue.CompleteAdding()
}

Demo

Main Demo

Demo Notes

Links