© Swill Klitch/Shutterstock.com
Teil 2: Asynchrone Kommunikation mit Channels und Co.

Koroutinen in Kotlin: Parallelität


Koroutinen ermöglichen es, nichtblockierende sequenzielle Abläufe einfach umzusetzen. Sobald allerdings mehrere Koroutinen nebenläufig mit denselben Daten arbeiten, gibt es die gleichen Probleme wie bei Threads. Asynchrone Kommunikationsmuster, basierend auf Channels, können auch bei Koroutinen die Erstellung von nebenläufigen Anwendungen vereinfachen.

Video: Kotlin: Innovative Sprachfeatures unter der Lupe

Ein kurzer Überblick: Dies ist der zweite Artikel unserer Serie über Koroutinen. Im ersten Teil [1] wurden die Grundlagen behandelt und ein Beispiel-Service zum Erstellen von Collagen entwickelt. Da wir dieses Beispiel weiterverwenden wollen, ist im nachfolgenden Quellcode die entwickelte Koroutine zu sehen. Der vollständige Code zu diesem und den weiteren Beispielen ist unter [2] zu finden.

Im folgenden Beispielcode wird anhand eines Suchbegriffs sowie einer vorgegebenen Anzahl von Bildern ein neues Bild zusammengestellt. Hierfür wird zunächst eine Liste von Bild-URLs ermittelt. Anschließend werden die einzelnen Bilder sequenziell geladen und zum Schluss zu einer Collage kombiniert.

suspend fun createCollage(query: String, count: Int): BufferedImage { val urls = requestImageUrls(query, count) val images = urls.map { requestImageData(it) } return combineImages(images) }

Im Unterschied zu normalen Funktionen wird diese Koroutine suspendiert, sobald man auf asynchrone Ereignisse wartet. Der Ablauf der Koroutine bleibt dabei sequentiell, d. h., es gibt keine Nebenläufigkeit zwischen den Statements.

Probleme mit geteilten Daten

Nebenläufigkeit muss explizit angestoßen werden – gut in Listing 1 zu erkennen. Dort wird für jeden URL mit der Funktion launch eine neue Koroutine gestartet. Innerhalb der Koroutinen wird jeweils ein Bild geladen und der Liste images hinzugefügt. Wenn der umschließende CoroutineScope mit einen Threadpool arbeitet, werden die Koroutinen tatsächlich nebenläufig ausgeführt. Allerdings ist der parallele Zugriff auf die Ergebnisliste dadurch problematisch, dass diese nicht threadsafe ist. Es kann zu Race Conditions innerhalb der add-Funktion kommen und im Ergebnis können Bilder fehlen oder die Datenstruktur der Liste kann ungültige Zustände annehmen.

Listing 1

suspend fun createCollage(query: String, count: Int) = coroutineScope { val urls = requestImageUrls(query, count) val images = mutableListOf<BufferedImage>() urls.forEach { url -> launch { val image = requestImageData(url) images.add(image) } } combineImages(images) }

Sobald Koroutinen nebenläufig ausgeführt werden und auf gemeinsamen Daten arbeiten, muss der parallele Zugriff auf diese Daten synchronisiert werden; das ist nicht anders als bei Threads. Eine Lösung ist die Arbeit mit Locks: Alle Zugriffe auf den gemeinsamen Zustand werden abgesichert (Listing 2). Wird alternativ eine threadsafe Datenstruktur benutzt, findet die Synchronisierung von parallelen Zugriffen innerhalb der Datenstruktur statt.

Listing 2

val images = mutableListOf<BufferedImage>() val imagesLock = ReentrantLock() urls.forEach { launch { val image = requestImageData(it) imagesLock.withLock { images.add(image) } } }

Eine andere und meist stabilere Alternative ist der vollkommene Verzicht auf den veränderbaren geteilten Zustand. Wie eine solche Lösung aussieht, haben wir bereits im ersten Teil der Serie gesehen. Listing 3 zeigt nochmals die Koroutine, die mit async/awaitAll alle Bilder parallel lädt. In dem Beispiel wird von nur einer Koroutine auf die Ergebnisliste zugegriffen. Allerdings wird die Variable url von der äußeren an die innere Koroutine übergeben. Da Strings allerdings unveränderbar (immutable) sind, kann niemand die Daten gleichzeitig ändern und ungültige Zustände erzeugen.

Anders sieht es mit dem Return-Wert der Funktion requestImageData aus, dieser ist ein BufferedImage und veränderbar. Allerdings wird das Bild erst am Ende der inneren Koroutine an die äußere Koroutine übergeben. Es gibt also keinen Zeitpunkt, an dem beide Koroutinen gleichzeitigen Zugriff auf die Daten haben.

Listing 3

suspend fun createCollage (query: String, count: Int) = coroutineScope { val urls = requestImageUrls(query, count) val deferredImages: List<Deferred<BufferedImage>> = urls.map { url -> async { requestImageData(url) } } val images: List<BufferedImage> = deferredImages.awaitAll() combineImages(images) }

Kommunikation über Channels

Ein anderes bekanntes, asynchrones Kommunikationsmuster, um geteilten Zustand zu vermeiden, ist der Einsatz von Channels (auch Queues oder Kanäle genannt).

Die Kommunikation über Channels beruht auf der Theorie der Communicating Sequential Processes (CSP) von Tony Hoare [3]. Die Idee dabei ist, dass Prozesse, in unserem Fall Koroutinen, Daten lediglich über Channels austauschen. Auf der einen Seite gibt es einen oder mehrere Sender, die Daten in einen Channel schreiben, auf der Gegenseite gibt es einen oder mehrere Empfänger, die Daten lesen wollen. Im besten Fall werden dabei nur immutable Daten ausgetauscht.

Listing 4 zeigt ein einfaches Beispiel mit Channels. Ein Channel ist so ähnlich wie eine getypte Liste. Beim Anlegen kann als Konstruktorparameter die Kapazität, also die Anzahl der zu puffernden Elemente angegeben werden. Mit der send-Funktion können Daten vom passenden Typ gesendet werden. Dabei ist send eine Koroutine und diese suspendiert sich, sobald die Kapazität des Channel erreicht ist. Das heißt, entnimmt kein Empfänger die Daten aus dem Channel, wird irgendwann der Sender blockiert. Das Konzept, den Sender auszubremsen, sollte der Empfänger nicht hinterherkommen, ist in der reaktiven Welt unter dem Begriff Back Pressure bekannt. Back Pressure sorgt dafür, dass ein System nicht unter der Last der Sender zusammenbricht. Ein Channel kann geschlossen (close) werden, wenn keine weiteren Daten mehr gesendet werden sollen. Auf der Empfängerseite kann ein Channel wie eine Liste benutzt werden, d. h., mittels Iterator oder For-Schleife können die enthaltenen Nachrichten empfangen werden. Dabei i...

Neugierig geworden? Wir haben diese Angebote für dich:

Angebote für Teams

Für Firmen haben wir individuelle Teamlizenzen. Wir erstellen Ihnen gerne ein passendes Angebot.

Das Library-Modell:
IP-Zugang

Das Company-Modell:
Domain-Zugang