Dirk Richter
Software-Entwicklung und Architektur

Hintergrundwissen Threads, Threadpools und Tasks


Inhaltsverzeichnis

Zusammenfassung

  • Der Text gibt einen umfassenden Überblick über
    • Threading-Grundlagen in .NET,
    • insbesondere zu Threads, Threadpools, Tasks und async/await.
  • Er erklärt die Unterschiede zwischen
    • Threadpool-Threads, dedizierten Threads und asynchroner Programmierung,
    • zeigt verschiedene Möglichkeiten zur Ausführung und Parallelisierung von Tasks
    • und beschreibt, wie die Anzahl paralleler Aufgaben begrenzt werden kann.
  • Das Producer-Consumer-Pattern wird vorgestellt, inklusive Beispielcode mit BlockingCollection und Channels. Abschließend wird das Synch-over-Async-Antipattern erläutert, das zu Performance-Problemen führen kann, wenn asynchrone Methoden synchron aufgerufen werden.

Historie

Im Laufe der letzten 20 Jahren hat sich das Threading permanent weiter entwickelt. Die Historie soll hier einen Überblick geben.

  • C# 1-3 (200x): Objekte bzgl. NET threading befinden sich von Anfang in Systems.Threading.Thread Namespace.
  • C# 4 (2010): Einführung - unter anderem - von thread-safe collections im System.Collections.Concurrent Namespace. Hier ist vor allem die BlockingCollection zu nennen.
  • C# 5 (2012): Einführung der TPL (Task Parallel Library) und von async and await mit der Task Klasse im System.Threading.Tasks Namspace.
  • C# 8 (2019): Einführung von Channels im System.Threading.Channels Namespace.

Threadpools, Threads und Async/Await

  1. Threadpools ..
    • eignen sich für kurze Aufgaben.
    • werden vom Laufzeitsystem verwaltet und sind effizienter für einfache Operationen.
    • eignen sich für Aufgaben, die schnell abgeschlossen werden und nicht lange den Thread blockieren.
    • werden z.B. für die Verarbeitung kurzer HTTP-Anfragen, Datenbankabfragen oder andere I/O-gebundene Operationen verwendet.
    • werden pro Prozess einmalig erzeugt.
    • eignen sich nicht für lang laufende Aufgaben. Hier kann es zur sogenannten Threadpool Starvation kommen, wenn es keine freien Threads auf dem Pool mehr gibt und abeschlossene Aufgaben keinen freien Thread für den Rücksprung erhalten können.
  2. Dedizierte Threads..
    • werden für lang laufende Aufgaben benötigt oder wenn spezifische Eigenschaften eines Threads festgelegt werden sollen (z. B. Priorität, Vordergrund, Kultur).
    • gehören nicht zum Threadpool und konkurrieren nicht mit anderen Aufgaben.
    • ist besser geeignet, wenn eine Aufgabe voraussichtlich lange den Thread blockiert (z. B. durch Warten auf eine Sperre oder langsame I/O).
  3. async/await:
    • wird bei asynchroner Programmierung verwendet.
    • das Entwickeln von nicht blockierendem Code, ohne Threads explizit zu verwalten.
    • ist besonders nützlich für I/O-gebundene Operationen (z. B. Dateien lesen, Netzwerkanfragen), bei denen Threads freigegeben werden können, während auf das Ende der Operation gewartet wird.
    • ist eine höhere Abstraktion, die asynchronen Code vereinfacht.

Tasks

Tasks

  1. sind Aufgaben, die z.B. an einem Threadpool zur Ausführung übergeben werden.
  2. können Aufgaben sein, die nicht lange dauern und von der CPU ausgeführt werden
  3. können Aufgaben sein, die länger dauern und I/O Operationen oder Aufrufe an andere Systeme darstellen.

Tasks können auf unterschiedliche Art ausgeführt werden:


Ausführen von Tasks

Threadpool

  1. Run.Task():
    ist ein Spezialfall von Task.Factory.StartNew. Bei diesem Aufruf werden alle verschachtelten Tasks ‘unwrapped’ und einzeln in die Queue übergeben. Für lang laufende Aufgaben sollte diese Möglichkeit nicht genutzt werden, wie oben beschrieben wurde.
  2. Task.Factory.StartNew()
    ist eine komplexere Methode, um Tasks zu verarbeiten.
    Nur wenn z.B. die Option TaskCreationOptions.LongRunning übergeben wird, weiß die Factory, dass der Task in einem zusätzlichen Thread ausgelagert werden könnte.
  3. Benutzung von async/await
    Hier entscheidet das System selbst, ob ein Task in einen eigenständigen Thread ausgelagert wird oder ob der Task innerhalb des Threadpools ausgelagert wird. Es ist auch möglich, dass kein Thread blockiert wird, wenn die I/O Operation ‘awaited’ wird. In der Regel sollte diese Methode bevorzugt werden.

Dedizierter Thread

  1. new Thread() erzeugt einen neuen eigenständigen Thread. Die Erzeugung benötigt mehr Ressourcen als die Verlagerung auf den ThreadPool, blockiert jedoch keinen anderen Threads.

async/await

  1. task = await FooAsync() Der Thread wird nicht explizit erzeugt, sondern über das TPL Framework verwaltet. Das Framework merkt sich den SynchronizationContext, um die Aufgabe später zurückgeben zu können. Diese Operation verwendet in der Regel keinen Thread während auf den Abschluss der I/O Operation gewartet wird. In der Folge gibt es auch kein ‘Thread-Switching’, sodass die abgeschlossene Operation später im selben Thread weiterlaufen kann.

Ausführen paralleler Tasks

Task.WhenAll()

Für das Ausführen mehrerer paralleler Tasks bietet sich die Methode Task.WhenAll() an. Die Methode ist asynchron und blockiert nicht den aufrufenden Thread. Alle Tasks werden parallel verarbeitet.

Es werden auch alle Exceptions als AggregateExceptions korrekt in der InnerException von Task.WhenAll() angezeigt.

 1public async Task RunMultipleTasksAsync()
 2{
 3    Task task1 = ThrowExceptionAfterDelay(2000);
 4    Task task2 = ThrowExceptionAfterDelay(3000);
 5
 6    try
 7    {
 8        await Task.WhenAll(task1, task2);
 9    }
10    catch(AggregateException ae)
11    {
12        foreach(var ex in ae.InnerExceptions)
13        {
14            Console.WriteLine(ex.Message);
15        }
16    }
17}
18
19public Task ThrowExceptionAfterDelay(int delay)
20{
21    return Task.Run(async () =>
22    {
23        await Task.Delay(delay);
24        throw new InvalidOperationException($"Exception after {delay}");
25    });
26}

Beschränkung der Anzahl der Threads

Task.WhenAll()

Bei der Benutzung von Task.WhenAll() lassen sich die Anzahl der Threads über die Nutzung von SemaporeSlim reduzieren.

 1public static async Task ProcessTasksAsync(IEnumerable<Func<Task>> taskFactories, int degreeOfParallelism)
 2{
 3    var tasks = new List<Task>();
 4    var semaphore = new SemaphoreSlim(degreeOfParallelism);
 5
 6    foreach (var taskFactory in taskFactories)
 7    {
 8        Task t = ProcessTaskAsync(semaphore, taskFactory());
 9        tasks.Add(t);
10    }
11
12    await Task.WhenAll(tasks);
13}
14
15private static async Task ProcessTaskAsync(SemaphoreSlim semaphore, Task task)
16{
17    await semaphore.WaitAsync();
18
19    try
20    {
21        await task;
22    }
23    finally
24    {
25        semaphore.Release();
26    }
27}

Parallel.ForEachAsync()

Es gibt auch die Möglichkeit über Parallel.ForEachAsync() Tasks parallel in einer beschränkten Anzahl von Threads durchzuführen.

1IEnumerable<MyTask> tasks = ... // Collection of tasks
2
3var options = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; // concurrency level
4
5await Parallel.ForEachAsync(tasks, options, async (task, cancellationToken) =>
6{
7    await task.ExecuteAsync(); // Assuming ExecuteAsync is method to perform the I/O operation
8});

Hier kann eine zuvor erstellte Liste von Tasks parallel ausgeführt werden.


Verarbeitung kontinuierlich neuer Tasks

Falls die Anzahl der zu verarbeitenden Tasks nicht beschränkt ist, sondern kontinuierlich neue Tasks ’entstehen’, so kann das sogenannten Producer-Consumer-Pattern angewendet werden.

Producer-Consumer Problem

Der oben beschriebene Prozess wird als Producer-Consumer Problem bezeichnet.

  1. Es gibt einen (oder mehrere) Producer, der Aufträge erzeugt
  2. Es gibt einen (oder mehrere) Consumer, der Aufträge konsumieren.
  3. Sowohl Producer als auch Consumer teilen sich einen Bereich mit einer bestimmten Größe
  4. Der Producer schickt seine Auträge in den gemeinsamen Bereich
  5. Der oder die Consumer holen sich die Aufträge aus dem gemeinsamen Bereich und führen sie aus
Producer-Consumer Problem *(Producer-Consumer Pattern)*

Für dieses Problem gibt Lösungen, z.B. über die Benutzung von BlockingCollection. Seit 2019 (Core 3.1) wird hierbei die Verwendung von Channels empfohlen.

 1var channel = Channel.CreateUnbounded<int>();
 2
 3// Producer
 4Task producer = Task.Factory.StartNew(async () =>
 5{
 6    for (int i = 0; i < 10; i++)
 7    {
 8        await channel.Writer.WriteAsync(i);
 9    }
10    channel.Writer.Complete();
11}, TaskCreationOptions.LongRunning);
12
13// Consumer
14Task consumer = Task.Factory.StartNew(async () =>
15{
16    await foreach (var item in channel.Reader.ReadAllAsync())
17    {
18        Console.WriteLine($"Consumed: {item}");
19    }
20}, TaskCreationOptions.LongRunning);
21
22// Do some other work...
23
24// Wait for tasks to complete
25await Task.WhenAll(producer, consumer);

Konkretes Beispiel für longrunning Tasks

 1using System;
 2using System.Collections.Concurrent;
 3using System.Threading;
 4using System.Threading.Tasks;
 5
 6class Program
 7{
 8    static void Main(string[] args)
 9    {
10        var degreesOfParallelism = 10;
11        // BlockingCollection as the shared resource.
12        BlockingCollection<int> queue = new BlockingCollection<int>(degreesOfParallelism);
13        CancellationTokenSource cts = new CancellationTokenSource();
14
15        // Setting up the producer as a long-running task.
16        Task producer = Task.Factory.StartNew(() =>
17        {
18            int count = 0;
19            while (!cts.Token.IsCancellationRequested)
20            {
21                queue.Add(count);
22                Console.WriteLine("Produced: " + count);
23                count++;
24                Thread.Sleep(1000);  // Simulate time-consuming work
25            }
26        }, TaskCreationOptions.LongRunning);
27
28        // Setting up the consumer as a long-running task.
29        Task consumer = Task.Factory.StartNew(() =>
30        {
31            while (!queue.IsCompleted && !cts.Token.IsCancellationRequested)
32            {
33                int item;
34                if (queue.TryTake(out int item, TimeSpan.FromSeconds(1)))
35                {
36                    Console.WriteLine("Consumed: " + item);
37                    Thread.Sleep(2000);  // Simulate time-consuming work
38                }                
39            }
40        }, TaskCreationOptions.LongRunning);
41
42        Console.WriteLine("Press enter to end tasks...");
43        Console.ReadLine();
44        cts.Cancel();  // Let's signal the tasks to end 
45
46        // Wait for both tasks to complete.
47        await Task.WhenAll(producer, consumer);
48    }
49}

Hinweise:

  • Die BlockingCollection ist ’threadsafe’
  • Der Degree Of Parallelism wird auf 10 gesetzt

Synch-over-Async Antipattern

Das Synch-over-Async Antipattern beschreibt das Problem, wenn eine asynchrone Methode in eine blockierende synchrone Operation mit GetAwaiter().GetResult() umgewandelt wird, weil die aufrufende Methode selbst synchron ist.

 1public async Task<string> FetchDataAsync()
 2{
 3    var webClient = new WebClient();
 4    var result = await webClient.DownloadStringTaskAsync(new Uri("https://example.com"));
 5    return result;
 6}
 7
 8public string FetchData()
 9{
10    return FetchDataAsync().GetAwaiter().GetResult();  // Sync-over-async
11}

Dies führt zu Performance Problemen und im Extremfall zu Deadlocks.

Es ist möglich, dieses Problem durch die Verlagerung auf den ThreadPool zu umgehen, jedoch kann man hierdurch die Vorteile des Async/Await Patterns nicht nutzen und ‘belastet’ womöglich unnötig den ThreadPool.

1 Task.Run(() => FetchDataAsync().GetAwaiter().GetResult()))

#Threads #Cleancode #Technicaldebt