SagiK-Repository / C_Sharp_Deep_Learn

C#을 깊게 배웁니다.
0 stars 1 forks source link

책 Concurrency in C# Cookbook (C# 동시성 프로그래밍 2/e) - 심화 #3

Open SAgiKPJH opened 6 months ago

SAgiKPJH commented 6 months ago

dotnet source

SAgiKPJH commented 6 months ago

핵심 키워드

SAgiKPJH commented 6 months ago

2

SAgiKPJH commented 6 months ago

3

IAsyncEnumerable Code Running 흐름

class Program
{
    static async Task Main(string[] args)
    {
        async IAsyncEnumerable<int> GetAsyncNumbers()
        {
            for (int i = 0; i < 10; i++)
            {
                await Task.Delay(100); // 비동기 작업을 가정
                yield return i;
            }
        }

        // 사용 예
        await foreach (var number in GetAsyncNumbers())
        {
            Console.WriteLine(number);
        }
    }
}

https://github.com/SagiK-Repository/C_Sharp_Deep_Learn/assets/66783849/e960b08f-9d58-4e07-af37-d59aeea5657c


IAsyncEnumerable Code Running 흐름 2

    async IAsyncEnumerable<int> GetAsyncNumbers()
    {
      for (int i = 0; i < 10; i++)
      {
        await Task.Delay(100); // 비동기 작업을 가정
        yield return i;
        yield return i + 10;
      }
    }

    // 사용 예
    await foreach (var number in GetAsyncNumbers())
    {
      Console.WriteLine(number);
    }

https://github.com/SagiK-Repository/C_Sharp_Deep_Learn/assets/66783849/12581c88-fd59-4b8d-a790-51deb15374ec


yield return

IEnumerable<int> GetNumbers()
{
    for (int i = 0; i < 5; i++)
    {
        // 여기서 i의 값을 반환하고, 다음 호출까지 실행을 중지합니다.
        yield return i;
    }
}

foreach (var number in GetNumbers())
{
    Console.WriteLine(number); // 0, 1, 2, 3, 4를 출력합니다.
}

https://github.com/SagiK-Repository/C_Sharp_Deep_Learn/assets/66783849/83042c00-f3bf-4629-9a84-51832fe7ad43


IEnumerator

class MyList : IEnumerable, IEnumerator
{
    private int[] array;
    int position = -1;  // 컬렉션의 현재 위치를 다루는 변수, 첫 번째 반복 때 0이 되어야 하므로 

    public MyList()
    {
        array = new int[3];
    }

    public int this[int index]
    {
        get { return array[index]; }

        set
        {
            if (index >= array.Length)
            {
                Array.Resize<int>(ref array, index + 1);
                Console.WriteLine($"Array Resized : {array.Length}");
            }
            array[index] = value;
        }
    }

    // IEnumerator 멤버
    public object Current
    {  // 현재 위치의 요소 반환
        get { return array[position]; }
    }

    // IEnumerator 멤버
    public bool MoveNext()
    {  // 다음 위치의 요소로 이동
        if (position == array.Length - 1)
        {
            Reset();
            return false;
        }
        position++;
        return (position < array.Length);
    }

    // IEnumerator 멤버
    public void Reset()
    {  // IEnumerator로부터 상속받은 Reset()메소드, 요소의 위치를 첫 요소의 "앞"으로 옮김
        position = -1;
    }

    // IEnumerable 멤버
    public IEnumerator GetEnumerator()
    {
        return this;
    }
}

class MainApp
{
    static void Main(string[] args)
    {
        MyList list = new MyList();
        for (int i = 0; i < 5; i++)
            list[i] = i;

        foreach (int e in list)
            Console.WriteLine(e);
    }
}
Array Resized : 4
Array Resized : 5
0
1
2
3
4


IAsyncEnumerator

public class MyListAsync : IAsyncEnumerable<int>
{
    private int[] array;

    public MyListAsync()
    {
        array = new int[3];
    }

    public int this[int index]
    {
        get => array[index];
        set
        {
            if (index >= array.Length)
            {
                Array.Resize(ref array, index + 1);
                Console.WriteLine($"Array Resized : {array.Length}");
            }
            array[index] = value;
        }
    }

    public IAsyncEnumerator<int> GetAsyncEnumerator(CancellationToken cancellationToken = default)
    {
        return new AsyncEnumerator(array);
    }

    private class AsyncEnumerator : IAsyncEnumerator<int>
    {
        private readonly int[] _array;
        private int _position = -1;

        public AsyncEnumerator(int[] array)
        {
            _array = array;
        }

        public int Current => _array[_position];

        public ValueTask<bool> MoveNextAsync()
        {
            if (_position < _array.Length - 1)
            {
                _position++;
                return new ValueTask<bool>(true);
            }
            else
            {
                return new ValueTask<bool>(false);
            }
        }

        public ValueTask DisposeAsync()
        {
            // 여기에 필요한 정리 작업을 수행합니다.
            return new ValueTask();
        }
    }

}

class MainApp
{
    static async Task Main(string[] args)
    {
        MyListAsync list = new MyListAsync();
        for (int i = 0; i < 5; i++)
            list[i] = i;

        await foreach (int e in list)
            Console.WriteLine(e);
    }
}
Array Resized : 4
Array Resized : 5
0
1
2
3
4
SAgiKPJH commented 6 months ago

4 병렬 처리의 기초

Parallel의 용어



Parallel.ForEach 속성

(번외) Func와 Action의 차이

// Func Func fa = delegate { return _state == 0; }; Func fb = () => _state == 0;

// Predicate


* 델리게이트(delegate)?
  * 대리자
  *  "값"이 아닌 "코드" 자체를 매개변수에 넘김
```cs
// 1. Compare 대리자를 선언
delegate int Compare( int a, int b );
// 2. Compare 대리자가 참조할 비교 메소드를 작성
delegate int AscendCompare( int a, int b ) {
  if ( a > b ) return 1;
  else if ( a == b ) return 0;
  else return -1;
}
// 3. 정렬 메소드를 작성한다. 이때 매개변수로는 정렬할 배열과 비교할 메소드를 참조하는 대리자를 입력받는다.
static void BubbleSort(int[] DataSet, Compare Comparer){

  int i = 0, j = 0, temp = 0;

  for( i = 0; i < DataSet.length - 1; i++){
    for( j = 0; j < DataSet.length - ( i + 1 ); j++){

      if( Comparer( Dataset[j], DataSet[j+1] ) > 0){
        temp = DataSet[j+1];
        DataSet[j+1] = DataSet[j];
        DataSet[j] = temp;
      }

    }
  }

}
// 4. 메소드를 호출하면 우리가 원하던 대로 정렬 방식이 분리된 정렬 코드를 얻을 수 있다.
int [] array = { 3, 7, 4, 2, 10 };
BubbleSort( array, new Compare( AscendCompare ) ); // array는 { 2 3 4 7 10 }이 된다.
delegate void TherIsAFire( string location );

void Call119( string location ){ Console.WriteLine("소방서죠? 불났어요! 주소는 {0}", location ); }

void ShotOut( string location ){ Console.WriteLine("피하세요! {0}에 불이 났어요!",location ); }

void Escape( string location ){ Console.WriteLine("{0}에서 나갑시다!", location ); }
//...//
ThereIsAFire Fire = new ThereIsAFire( Call119 );
Fire += new ThereIsAFire( ShotOut );
Fire += new ThereIsAFire( Escape );
Fire( "우리집 " );
// 소방서죠? 불났어요! 주소는 우리집
// 피하세요! 우리집에 불이 났어요!
// 우리집에서 나갑시다!
// event와 delegate
  // Step 1 대리자를 선언한다. 이 대리자는 클래스 밖에 선언해도 되고 안에 선언해도 된다.
  delegate void EventHandler( string message );

  class MyNotifier
  {
      // Step 2 클래스 내에 step 1 에서 선언한 대리자의 인스턴스를 event 한정자로 수식해서 선언한다.
      public event EventHandler SomethingHappened;
      public void DoSomething( int number )
      {
          int temp = number % 10;

          // number가 3, 6, 9로 끝나는 값이 될 때마다 이벤트 발생
          if ( temp != 0 && temp % 3 == 0 )
          {
              SomethingHappened( String.Format( "{0} : 짝", number ) );
          }
      }
  }

  class MainApp
  {
      // Step 3 이벤트 핸들러를 작성한다. 이벤트 핸들러는 step 1 에서 선언한 대리자와 일치하는 메소드로 한다.
      static public void MyHandler( string message )
      {
          Console.WriteLine( message );
      }

      static void Main( string[] args )
      {
          // Step 4 클래스의 인스턴스를 생성하고 이 객체의 이벤트에 step 3 에서 작성한 이벤트 핸들러를 등록한다.
          MyNotifier notifier = new MyNotifier();
          notifier.SomethingHappened += new EventHandler(MyHandler);

          for (int i = 1; i < 30; i++)
          {
              notifier.DoSomething(i);  // Step 5 : 이벤트가 발생하면 이벤트 핸들러가 호출된다.
          }
      }
  }



Parallel.ForEach의 localInit 사용방법

int parallelsum(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;
    Parallel.ForEach(
        source: values,
        localInit: () =>
        {
            return 0; // 10이면 36
        },
        body: (item, state, loclValue) =>
        {
            return loclValue + item;
        },
        localFinally: localValue =>
        {
            lock (mutex)
                result += localValue;
        });
    return result;
}

https://github.com/SagiK-Repository/C_Sharp_Deep_Learn/assets/66783849/463557e0-0916-484d-95f8-239e75d6bb40


Parallel.Aggregate

int ParallelSum(IEnumerable<int> values)
{
  return values.AsParallel().Aggregate(
    seed : 0,
    func : (sum, item) => sum + item
  );
}
internal TOutput Aggregate()
        {
            Debug.Assert(_finalReduce != null);
            Debug.Assert(_resultSelector != null);

            TIntermediate accumulator = default(TIntermediate)!;
            bool hadElements = false;

            // Because the final reduction is typically much cheaper than the intermediate
            // reductions over the individual partitions, and because each parallel partition
            // will do a lot of work to produce a single output element, we prefer to turn off
            // pipelining, and process the final reductions serially.
            using (IEnumerator<TIntermediate> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
            {
                // We just reduce the elements in each output partition. If the operation is associative,
                // this will yield the correct answer. If not, we should never be calling this routine.
                while (enumerator.MoveNext())
                {
                    if (hadElements)
                    {
                        // Accumulate results by passing the current accumulation and current element to
                        // the reduction operation.
                        try
                        {
                            accumulator = _finalReduce(accumulator, enumerator.Current);
                        }
                        catch (Exception ex)
                        {
                            // We need to wrap all exceptions into an aggregate.
                            throw new AggregateException(ex);
                        }
                    }
                    else
                    {
                        // This is the first element. Just set the accumulator to the first element.
                        accumulator = enumerator.Current;
                        hadElements = true;
                    }
                }

                // If there were no elements, we must throw an exception.
                if (!hadElements)
                {
                    if (_throwIfEmpty)
                    {
                        throw new InvalidOperationException(SR.NoElements);
                    }
                    else
                    {
                        accumulator = _seedFactory == null ? _seed : _seedFactory();
                    }
                }
            }

            // Finally, run the selection routine to yield the final element.
            try
            {
                return _resultSelector(accumulator);
            }
            catch (Exception ex)
            {
                // We need to wrap all exceptions into an aggregate.
                throw new AggregateException(ex);
            }
        }


TryCatch


Parallel.Invoke


AsOrdered

Parallel.ForEach

public static ParallelLoopResult ForEach<TSource, TLocal>( OrderablePartitioner source, ParallelOptions parallelOptions, Func localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action localFinally) { // ...

        return PartitionerForEachWorker<TSource, TLocal>(source, parallelOptions, null, null, null, null, body, localInit, localFinally);

}

private static ParallelLoopResult PartitionerForEachWorker<TSource, TLocal>(/.../) {

// ...

TaskReplicator.Run( (ref IEnumerator partitionState, long timeout, out bool replicationDelegateYieldedBeforeCompletion) => {

IEnumerator<KeyValuePair<long, TSource>>? myPartition = partitionState as IEnumerator<KeyValuePair<long, TSource>>; if (myPartition == null) { myPartition = orderablePartitionerSource!.GetEnumerator(); partitionState = myPartition; }

                                if (myPartition == null)
                                    throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator);

                                while (myPartition.MoveNext())
                                {
                                    KeyValuePair<long, TSource> kvp = myPartition.Current;
                                    long index = kvp.Key;
                                    TSource value = kvp.Value;

                                    // Update our iteration index
                                    if (state != null) state.CurrentIteration = index;

                                    if (simpleBody != null)
                                        simpleBody(value);
                                    else if (bodyWithState != null)
                                        bodyWithState(value, state!);
                                    else if (bodyWithStateAndIndex != null)
                                        bodyWithStateAndIndex(value, state!, index);
                                    else if (bodyWithStateAndLocal != null)
                                        localValue = bodyWithStateAndLocal(value, state!, localValue);
                                    else
                                        localValue = bodyWithEverything!(value, state!, index, localValue);

                                    if (sharedPStateFlags.ShouldExitLoop(index)) break;

                                    // Cooperative multitasking:
                                    // Check if allowed loop time is exceeded, if so save current state and return.
                                    // The task replicator will queue up a replacement task. Note that we don't do this on the root task.
                                    if (CheckTimeoutReached(loopTimeout))
                                    {
                                        replicationDelegateYieldedBeforeCompletion = true;
                                        break;
                                    }
                                }
                            }
SAgiKPJH commented 6 months ago

5장 데이터 흐름의 기초

블록연결

메시지 전달

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/

BufferBlock

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});
var post2 = Task.Run(() =>
{
    bufferBlock.Post(2);
});

await Task.WhenAll(post01, receive, post2);

// Output:
//   0
//   1
//   2

ActionBlock

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */

TransformBlock

// Post several messages to the block. transformBlock.Post(10); transformBlock.Post(20); transformBlock.Post(30);

// Read the output messages from the block. for (int i = 0; i < 3; i++) { Console.WriteLine(transformBlock.Receive()); }

/ Output: 3.16227766016838 4.47213595499958 5.47722557505166 /


### TransformManyBlock
```cs
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */

BufferBlock Async

// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
    await bufferBlock.SendAsync(i);
}

// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
    Console.WriteLine(await bufferBlock.ReceiveAsync());
}

// Output:
//   0
//   1
//   2
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (source.TryReceive(out byte[] data))  // 사용 가능한 데이터가 없을 때 False 반환
        // while (await source.OutputAvailableAsync())   // 사용 가능한 데이터가 없을 때 False 반환
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.

CancelToken

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
   // Receives the value from the first provided source that has
   // a message.
   public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
   {
      // Create a WriteOnceBlock<T> object and link it to each source block.
      var writeOnceBlock = new WriteOnceBlock<T>(e => e);
      foreach (var source in sources)
      {
         // Setting MaxMessages to one instructs
         // the source block to unlink from the WriteOnceBlock<T> object
         // after offering the WriteOnceBlock<T> object one message.
         source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
      }
      // Return the first value that is offered to the WriteOnceBlock object.
      return writeOnceBlock.Receive();
   }

   // Demonstrates a function that takes several seconds to produce a result.
   static int TrySolution(int n, CancellationToken ct)
   {
      // Simulate a lengthy operation that completes within three seconds
      // or when the provided CancellationToken object is cancelled.
      SpinWait.SpinUntil(() => ct.IsCancellationRequested,
         new Random().Next(3000));

      // Return a value.
      return n + 42;
   }

   static void Main(string[] args)
   {
      // Create a shared CancellationTokenSource object to enable the
      // TrySolution method to be cancelled.
      var cts = new CancellationTokenSource();

      // Create three TransformBlock<int, int> objects.
      // Each TransformBlock<int, int> object calls the TrySolution method.
      Func<int, int> action = n => TrySolution(n, cts.Token);
      var trySolution1 = new TransformBlock<int, int>(action);
      var trySolution2 = new TransformBlock<int, int>(action);
      var trySolution3 = new TransformBlock<int, int>(action);

      // Post data to each TransformBlock<int, int> object.
      trySolution1.Post(11);
      trySolution2.Post(21);
      trySolution3.Post(31);

      // Call the ReceiveFromAny<T> method to receive the result from the
      // first TransformBlock<int, int> object to finish.
      int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);

      // Cancel all calls to TrySolution that are still active.
      cts.Cancel();

      // Print the result to the console.
      Console.WriteLine("The solution is {0}.", result);

      cts.Dispose();
   }
}

/* Sample output:
The solution is 53.
*/
SAgiKPJH commented 6 months ago

5장 데이터 흐름의 기초 (분석)

BatchBlock 및 BatchedJoinBlock을 사용하여 효율성 향상

SAgiKPJH commented 4 months ago

10장 취소

CancellationToken

CancellationTokenSource

volatile 변수

CancellationTokenSource의 cancel 동작

image