Parallel Programming (TPL Dataflow)
2022. 2. 16. 00:00ㆍCSharp/Advance
반응형
TPL DataFlow
우리는 블록을 만들고 각 블록에는 자체 로컬 데이터와 개인 스레드가 있다.
따라서 더 이상 동기화를 위해 잠금 또는 세마포어를 사용할 필요가 없다.
처리량이 많고 대기 시간이 짧으며 CPU 및 I/O를 많이 사용하는 애플리케이션의
메시지 전달 및 병렬화를 위한 기반을 제공
비동기적으로 서로 통신해야 하는 여러 작업이 있는 경우나
데이터를 사용할 수 있게 될 때 해당 데이터를 처리하려는 경우에 유용
동기적으로 쓰기 및 읽기 (BufferBlock<T>)
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++) bufferBlock.Post(i);
// Receive the messages back from the block.
for (int i = 0; i < 3; i++) Console.WriteLine(bufferBlock.Receive());
// TryReceive 를 통해 읽을수도 있다.
//while (bufferBlock.TryReceive(out int value)) Console.WriteLine(value);
// Output:
// 0
// 1
// 2
Task 를 사용하여 동시에 읽고 쓰기
var bufferBlock = new BufferBlock<int>();
List<Task> tasks = new List<Task>();
tasks.Add(Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
}));
tasks.Add(Task.Run(() =>
{
for (int i = 0; i < 3; i++) Console.WriteLine(bufferBlock.Receive());
}));
tasks.Add(Task.Run(() => bufferBlock.Post(2)));
await Task.WhenAll(tasks);
// Output:
// 0
// 1
// 2
비동기적으로 읽고 쓰기
var bufferBlock = new BufferBlock<int>();
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++) await bufferBlock.SendAsync(i);
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++) Console.WriteLine(await bufferBlock.ReceiveAsync());
// Output:
// 0
// 1
// 2
Producer and Consumer Model
// 생산자
private void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
// target bloc 에 complete 을 알린다.
// consumeasync 쪽의 OutputAvailableAsync 가 true 가 되어 처리된다.
target.Complete();
}
// 소비자
private async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
// Produce 의 target.Complete 에 의해 OutputAvailableAsync 가 true 로 trigger 된다.
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
// try receive 를 통해 consumer 가 여럿일 경우 안전하게 처리 할 수 있다.
private async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
// output
Processed 102,400 bytes.
데이터 흐름블록과 파이프라인을 이용한 병렬 처리
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler { AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
// output
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words pool/loop
Found reversed words live/evil
Found reversed words swam/maws
Found reversed words flow/wolf
Found reversed words strap/parts
Found reversed words reed/deer
Found reversed words loop/pool
Found reversed words deer/reed
Found reversed words spot/tops
Found reversed words wolf/flow
Found reversed words ward/draw
Found reversed words speed/deeps
Found reversed words deeps/speed
Found reversed words moor/room
Found reversed words meed/deem
Found reversed words parts/strap
Found reversed words draw/ward
Found reversed words deem/meed
Found reversed words tops/spot
Found reversed words teem/meet
Found reversed words evil/live
Found reversed words maws/swam
Found reversed words meet/teem
Found reversed words room/moor
결국 TPL Dataflow 는 databloc 을 만들고 pipeline 연결을 하여 실행하는 작업니다.
그 작업중에 여러가지 bloc (TransfromBloc , ActionBloc, BufferBloc, etc...) 등을 사용하여
병렬 처리 및 동기화 처리를 dataflow 에 위임하여 프로그램이 단순해 지게 만든다.
관련영상
반응형
'CSharp > Advance' 카테고리의 다른 글
Parallel Programming (PLINQ Optimizing and Parallel Class) (0) | 2022.02.18 |
---|---|
Parallel Programming (PLINQ) (0) | 2022.02.17 |
Parallel Programming (TPL) (0) | 2022.02.15 |
Parallel Programming (Thread) (0) | 2022.02.14 |
범위와 인덱스 그리고 슬라이싱 (Range, Index and Slicing) (0) | 2022.02.11 |