© VectorPot/Shutterstock.com
TPL Dataflow vs. Go Channels

Asynchronität mit C# und Go


Wie können wir asynchron Nachrichten zwischen Verarbeitungsschritten austauschen? Dazu existieren in C# und Go verschiedene Ansätze – ein guter Anlass, einen Blick über den Tellerrand zu wagen und sich die Programmiersprache Go einmal näher anzusehen. Bevor wir damit starten, rufen wir uns zunächst die TPL Dataflow Library in Erinnerung.

Nebenläufige Programmierung ist heutzutage die Regel, nicht die Ausnahme. Daten werden gelesen oder empfangen, asynchron innerhalb des Prozesses verarbeitet und die Ergebnisse werden danach ausgegeben oder an einen Empfänger gesendet. Natürlich lässt sich die Kommunikation zwischen den Threads eines Prozesses über Puffer im Speicher und Synchronisationsobjekte wie Locks oder Semaphore lösen. Dieses Programmiermodell ist jedoch fehleranfällig und führt nicht selten zu ineffizienten Algorithmen.

Die meisten Programmierplattformen, die ich kenne, enthalten auf Programmiersprachen- oder Framework-Ebene alternative Mechanismen, um asynchron Messages zwischen Verarbeitungsschritten auszutauschen. In diesem Artikel vergleiche ich die Ansätze von C# und Go, wobei der Schwerpunkt auf den Besonderheiten von Go liegt. Der Message-Austausch zwischen den sogenannten Goroutines über sogenannte Channels ist eine charakteristische Eigenschaft der Sprache Go. Dieser Artikel soll C#-Entwicklerinnen und Entwicklern einen Blick über den Tellerrand bieten und vielleicht sogar Lust darauf machen, im einen oder anderen Projekt Go auszuprobieren.

TPL Dataflow Library

Bevor wir auf Go und seine Channels zu sprechen kommen, möchte ich kurz in Erinnerung rufen, was C# und .NET in Sachen asynchronem Message-Austausch innerhalb von Prozessen eingebaut hat: die TPL (Task Parallel Library) Dataflow Library [1]. Diese Bibliothek basiert auf den folgenden Grundprinzipien:

  • Source Blocks (ISourceBlock<T>) sind Datenquellen. Man kann Messages von ihnen lesen.

  • Target Blocks (ITargetBlock<T>) empfangen Messages. Man kann Messages auf sie schreiben.

  • Propagator Blocks (IPropagatorBlock<TIn, TOut>) sind gleichzeitig Source und Target Blocks. Sie verarbeiten Daten in irgendeiner Form (Projizieren, Filtern, Gruppieren etc.).

Die Blöcke kann man flexibel zu Pipelines kombinieren. .NET kommt mit vielen vordefinierten Blöcken (Namespace: System.Threading.Tasks.Dataflow), die man je nach Anwendungsfall zu einer Pipeline zusammenstellt.

Listing 1 zeigt exemplarisch, wie eine Datenverarbeitung mit der TPL Dataflow Library programmiert wird. Ein Producer erzeugt Daten. In der Praxis würden diese beispielsweise aus Dateien oder Datenbanken geladen beziehungsweise über das Netzwerk empfangen. Im Beispiel werden Zufallsdaten in einen BufferBlock geschrieben. Er kann je nach Anwendungsfall als Source-, Propagator- oder Target-Block eingesetzt werden. Der Transformer verarbeitet die Daten und gibt das Verarbeitungsergebnis an den nächsten Schritt der Pipeline weiter. Im Beispiel wird der Mittelwert aus Zahlen in einem Array berechnet. In der Praxis können hier beliebig komplexe Logiken zur Verarbeitung ablaufen. Eine Besonderheit der TPL Dataflow Library ist, dass sie sich automatisch um die Parallelisierung der Message-Verarbeitung kümmert, wenn der jeweilige Algorithmus das erlaubt. Im Beispiel wird MaxDegreeOfParallelism angegeben, wodurch mehrere Transformer parallel laufen. Die Verarbeitung kann dadurch entsprechend beschleunigt werden. Der Consumer ist im Beispiel der letzte Schritt der Pipeline. Er erwartet das Eintreffen von Messages und verarbeitet sie.

Listing 1: TPL Dataflow Pipeline

using System; using static System.Console; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; // Create a buffer (=target block) into which we can write messages var buffer = new BufferBlock<byte[]>(); // Create a transformer (=source and target block) that processes data. // For demo purposes, we specify a degree of parallelism. This allows // .NET to run multiple transformers concurrently. var transform = new TransformBlock<byte[], double>(Transform, new() { MaxDegreeOfParallelism = 5 }); // Link buffer with transformer buffer.LinkTo(transform); // Start asynchronous consumer var consumerTask = ConsumeAsync(transform); // Start producer Produce(buffer); // Producer is done, we can mark transformer as completed. // This will stop the consumer after it will have been finished // consuming buffered messages. transform.Complete(); // Wait for consumer to finish and print number of processed messages var bytesProcessed = await consumerTask; WriteLine($"Processed {bytesProcessed} messages."); /// <summary> /// Produces values and writes them into <c>target</c> /// </summary> static void Produce(ITargetBlock<byte[]> target) {  // Here we generate random bytes. In practice, the producer  // would e.g. read data from disk, receive data over the network,  // get data from a database, etc. var rand = new Random(); for (int i = 0; i < 100; ++i) { var buffer = new byte[1024]; rand.NextBytes(buffer);  // Send message into target block WriteLine("Sending message"); target.Post(buffer); }  // Mark as completed target.Complete(); } /// <summary> /// Transforms incoming message (byte array -> average value) /// </summary> static double Transform(byte[] bytes) {  // For debug purposes, we print the thread id. If you run the program,  // you will see that transformers run in parallel on multiple threads. WriteLine($"Transforming message on thread ${Thread.CurrentThread.ManagedThreadId}"); return bytes.Average(val => (double)val); } /// <summary> /// Consumes message /// <...

Neugierig geworden? Wir haben diese Angebote für dich:

Angebote für Gewinner-Teams

Wir bieten Lizenz-Lösungen für Teams jeder Größe: Finden Sie heraus, welche Lösung am besten zu Ihnen passt.

Das Library-Modell:
IP-Zugang

Das Company-Modell:
Domain-Zugang