Parallel Programming (TPL Dataflow)

2022. 2. 16. 00:00CSharp/Advance

반응형

TPL DataFlow

우리는 블록을 만들고 각 블록에는 자체 로컬 데이터와 개인 스레드가 있다.

따라서 더 이상 동기화를 위해 잠금 또는 세마포어를 사용할 필요가 없다.

처리량이 많고 대기 시간이 짧으며 CPU 및 I/O를 많이 사용하는 애플리케이션의

메시지 전달 및 병렬화를 위한 기반을 제공

비동기적으로 서로 통신해야 하는 여러 작업이 있는 경우나

데이터를 사용할 수 있게 될 때 해당 데이터를 처리하려는 경우에 유용

 

 

동기적으로 쓰기 및 읽기 (BufferBlock<T>)

var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++) bufferBlock.Post(i);

// Receive the messages back from the block.
for (int i = 0; i < 3; i++) Console.WriteLine(bufferBlock.Receive());

// TryReceive 를 통해 읽을수도 있다. 
//while (bufferBlock.TryReceive(out int value)) Console.WriteLine(value);

// Output:
//   0
//   1
//   2

Task 를 사용하여 동시에 읽고 쓰기

var bufferBlock = new BufferBlock<int>();
List<Task> tasks = new List<Task>();
tasks.Add(Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
}));
tasks.Add(Task.Run(() =>
{
    for (int i = 0; i < 3; i++) Console.WriteLine(bufferBlock.Receive());
}));
tasks.Add(Task.Run(() => bufferBlock.Post(2)));
await Task.WhenAll(tasks);
// Output:
//   0
//   1
//   2

비동기적으로 읽고 쓰기

var bufferBlock = new BufferBlock<int>();
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++) await bufferBlock.SendAsync(i);
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++) Console.WriteLine(await bufferBlock.ReceiveAsync());

// Output:
//   0
//   1
//   2

Producer and Consumer Model

// 생산자
private void Produce(ITargetBlock<byte[]> target)
{
    var rand = new Random();

    for (int i = 0; i < 100; ++i)
    {
        var buffer = new byte[1024];
        rand.NextBytes(buffer);
        target.Post(buffer);
    }

	// target bloc 에 complete 을 알린다. 
    // consumeasync 쪽의 OutputAvailableAsync 가 true 가 되어 처리된다. 
    target.Complete();
}

// 소비자
private async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
	// Produce 의 target.Complete 에 의해 OutputAvailableAsync 가 true 로 trigger 된다. 
    while (await source.OutputAvailableAsync())
    {
        byte[] data = await source.ReceiveAsync();
        bytesProcessed += data.Length;
    }

    return bytesProcessed;
}

// try receive 를 통해 consumer 가 여럿일 경우 안전하게 처리 할 수 있다. 
private async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}

// output
Processed 102,400 bytes.

 

데이터 흐름블록과 파이프라인을 이용한 병렬 처리 

var downloadString = new TransformBlock<string, string>(async uri =>
{
    Console.WriteLine("Downloading '{0}'...", uri);
    return await new HttpClient(new HttpClientHandler { AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
    Console.WriteLine("Creating word list...");
    // Remove common punctuation by replacing all non-letter characters
    // with a space character.
    char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
    text = new string(tokens);

    // Separate the text into an array of words.
    return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
    Console.WriteLine("Filtering word list...");
    return words
       .Where(word => word.Length > 3)
       .Distinct()
       .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
    Console.WriteLine("Finding reversed words...");
    var wordsSet = new HashSet<string>(words);
    return from word in words.AsParallel()
           let reverse = new string(word.Reverse().ToArray())
           where word != reverse && wordsSet.Contains(reverse)
           select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
    Console.WriteLine("Found reversed words {0}/{1}",
       reversedWord, new string(reversedWord.Reverse().ToArray()));
});

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

// Mark the head of the pipeline as complete.
downloadString.Complete();

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();

// output
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words pool/loop
Found reversed words live/evil
Found reversed words swam/maws
Found reversed words flow/wolf
Found reversed words strap/parts
Found reversed words reed/deer
Found reversed words loop/pool
Found reversed words deer/reed
Found reversed words spot/tops
Found reversed words wolf/flow
Found reversed words ward/draw
Found reversed words speed/deeps
Found reversed words deeps/speed
Found reversed words moor/room
Found reversed words meed/deem
Found reversed words parts/strap
Found reversed words draw/ward
Found reversed words deem/meed
Found reversed words tops/spot
Found reversed words teem/meet
Found reversed words evil/live
Found reversed words maws/swam
Found reversed words meet/teem
Found reversed words room/moor

 

결국 TPL Dataflow 는 databloc 을 만들고 pipeline 연결을 하여 실행하는 작업니다. 

그 작업중에 여러가지 bloc (TransfromBloc , ActionBloc, BufferBloc, etc...) 등을 사용하여

병렬 처리 및 동기화 처리를 dataflow 에 위임하여 프로그램이 단순해 지게 만든다. 

 

 

관련영상

https://youtu.be/6PS6BSjrMpI

 

 

 

반응형