concurrency-streams.html 19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<h4>Futures</h4>

<p>Ein <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Runnable.html"><code>Runnable</code></a> hat die Einschränkung, dass seine Methode <code>run</code> kein Ergebnis zurückgibt. Falls in einem nebenläufigen Thread ein Ergebnis berechnet werden soll, bietet sich dafür das funktionale Interface <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Callable.html"><code>Callable&lt;V></code></a> an, das über die Methode <code>call</code> ein Ergebnis vom Typ <code>V</code> zurückgibt.

<code>Callables</code> werden häufig im Zusammenhang mit <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Future.html"><code>Futures</code></a> verwendet. Sowohl <code>Callables</code> als auch <code>Futures</code> wurden in Java 5 (2004) eingeführt. Ein <code>Future&lt;V></code> ist ein Platzhalter für ein Ergebnis vom Typ <code>V</code>, das noch nicht vollständig berechnet worden ist. Es verspricht, das Ergebnis einer nebenläufigen Berechnung zu speichern, sobald diese abgeschlossen ist (engl. <i>Promise</i>). Die Berechnung findet typischerweise in einem <code>Callable</code> statt, das durch einen <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html"><code>ExecutorService</code></a> ausgeführt wird. Der Aufrufer der nebenläufigen Berechnung kann auf dem <code>Future</code> mittels der Methode <code>isDone</code> prüfen, ob die Berechnung bereits abgeschlossen ist, und mittels der Methode <code>get</code> blockierend auf das fertige Ergebnis warten. Das folgende Code-Beispiel illustriert das Warten auf ein Future in Zeile 20.</p>

<pre><code class="language-java line-numbers">import java.util.concurrent.Callable;
import java.util.concurrent.Future;
// ...

class MyFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // task being executed concurrently
        Callable&lt;Integer> callable = () -> {
            Thread.sleep(1000);
            return 1;
        };

        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future&lt;Integer> f = exec.submit(callable); // start execution of concurrent task

        System.out.println(f.isDone()); // prints 'false'

        System.out.println(f.get()); // waits until the task is done, then prints '1'
		
		exec.shutdown();
    }
}</code></pre> 

<p>In Java 8 (2014) wurden ergänzend <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html"><code>CompletableFutures</code></a> eingeführt. Ein <code>CompletableFuture</code> ist eine Weiterentwicklung eines regulären <code>Future</code> – inspiriert durch <a href="https://google.github.io/guava/releases/23.0/api/docs/com/google/common/util/concurrent/ListenableFuture.html"><code>ListenableFutures</code></a> in Google's <a href="https://github.com/google/guava">Guava</a>-Bibliothek. 

<code>CompletableFutures</code> sollen es ermöglichen, mehrere voneinander abhängige, nebenläufige Berechnungen in einer Verarbeitungskette hintereinander zu schalten. Der Aufrufer kann z.B. formulieren, dass in einem nebenläufigen Thread zunächst Aufgabe A bearbeitet werden soll und anschließend Aufgabe B, die auf das Ergebnis von Aufgabe A als Input angewiesen ist. Es folgt ein einfaches Code-Beispiel zur Veranschaulichung:</p>

<pre><code class="language-java line-numbers">import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.function.Function;
// ...

class MyCompletableFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // task A being executed concurrently
        Supplier&lt;Integer> supplier = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) { }
            return 1;
        };

        // task B being executed concurrently, dependent on the result of task A
        Function&lt;Integer, Integer> plusOne = (x) -> x + 1;

        ExecutorService exec = Executors.newSingleThreadExecutor();

        CompletableFuture&lt;Integer> f = CompletableFuture.supplyAsync(supplier, exec); // start execution of task A
        System.out.println(f.isDone()); // prints 'false'

61
        CompletableFuture&lt;Integer> f2 = f.thenApplyAsync(plusOne, exec); // start task B when task A is done >> chaining of tasks
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
        System.out.println(f2.get()); // waits until both tasks are done, then prints '2'

        exec.shutdown();
    }
}</code></pre>

<h4>Parallele Streams</h4>

<p>Ein <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html"><code>Stream</code></a> wird in Java per Default sequentiell in einem Thread ausgeführt. Streams stehen seit Java 8 (2014) zur Verfügung. Im folgenden Code-Beispiel wird eine Liste von 64 Zahlen in einem Stream verarbeitet, um die größte der Zahlen zu ermitteln (Zeile 3).</p>

<pre><code class="language-java line-numbers">List&lt;Integer> numbers = IntStream.rangeClosed(1, 64).boxed().collect(Collectors.toList());
Collections.shuffle(numbers);

int max = numbers.stream().reduce(Integer.MIN_VALUE, Math::max);</code></pre>

<p>Durch den Aufruf der Methode <code>parallel</code> kann der Stream parallelisiert werden, so dass seine Verarbeitung auf mehrere nebenläufige Threads aufgeteilt wird. Per Default werden so viele Threads genutzt, wie Prozessorkerne gemäß <code>Runtime.getRuntime().availableProcessors()</code> zur Verfügung stehen. Für den Entwickler ist die Parallelisierung von Streams damit sehr einfach gestaltet.</p>

<pre><code class="language-java line-numbers">int max = numbers.stream().parallel().reduce(Integer.MIN_VALUE, Math::max);</code></pre>

Jens Ehlers's avatar
Jens Ehlers committed
81
<p>Die grundsätzliche Erwartung ist, dass durch die Nutzung mehrerer parallel arbeitender Threads der parallelisierte Stream schneller fertig wird als der sequentielle Stream. Das lässt sich mit einem ausreichend großen Berechnungsaufwand für den Stream und entsprechender Multicore-Hardware auch demonstrieren (<a href="/concurrency/src/main/java/streams/ParallelStreamProcessingTime.java" class="repo-link">siehe hier</a>).</p>
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

<p>Grundsätzlich ist die Annahme, dass ein sequentieller und ein paralleler Stream stets zum gleichen Ergebnis führen. Es gibt aber Ausnahmefälle, die entsprechend dokumentiert sind, z.B. die häufig genutzte Methode <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html#forEach(java.util.function.Consumer)"><code>forEach</code></a>, mit der über die Elemente eines Streams iteriert werden kann. Die Methode <code>forEach</code> ist wie folgt dokumentiert: <i>"The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization."</i>  Die folgende Methode <code>log</code> gibt die Elemente eines parallelen Streams daher nicht unbedingt in der richtigen Reihenfolge aus.</p>

<pre><code class="language-java line-numbers">void log(Stream&lt;String> stream) { stream.forEach(System.out::println); }</code></pre>

<p>Über die Methode <code>isParallel</code> kann für einen Stream ermittelt werden, ob es sich um einen sequentiellen oder parallelen Stream handelt. Eine Fallunterscheidung in der Methode <code>log</code> sorgt dafür, dass sie sich deterministisch verhält.</p>

<pre><code class="language-java line-numbers">void log(Stream&lt;String> stream) {
	if (!stream.isParallel()) stream.forEach(System.out::println);
	else stream.forEachOrdered(System.out::println);
}</code></pre>

<p>Tatsächlich könnte auch in beiden Fällen die Methode <code>forEachOrdered</code> genutzt werden, was die obige Fallunterscheidung wieder überflüssig macht.</p>

<p>Die interne Implementierung paralleler Streams basiert auf dem Fork-Join-Framework, das in Java 7 (2011) eingeführt worden ist. 
Das Fork-Join-Framework agiert nach dem Prinzip des Divide & Conquer-Algorithmus. Die grundlegende Idee besteht dabei darin, dass ein Ausgangsproblem so in Teilprobleme zerlegt wird, dass die später zusammengeführten Ergebnisse der Teilprobleme das Ausgangsproblem lösen. Ein <i>Fork</i> ist dabei das Zerlegen in Teilprobleme und ein <i>Join</i> das Zusammenfassen der Teilergebnisse zum Gesamtergebnis des Ausgangsproblems. Der Algorithmus lässt sich rekursiv über mehrere Hierarchiestufen anwenden, d.h. dass die Teilprobleme der ersten Stufen sukzessive in noch kleinere, eigene Teilprobleme zerlegt werden können. Die folgende Abbildung visualisiert das Prinzip des Divide & Conquer-Algorithmus anhand des Ausgangsproblems die größte Zahl im Indexbereich 0..63 eines Arrays oder einer Liste zu finden.</p>

<img src="media/concurrency_forkjoin.png" style="width:900px">
<label>Divide & Conquer-Prinzip des Fork-Join-Framework</label>

<p>Beim ersten Fork wird der Indexbereich in die beiden Teilbereiche 0..31 und 32..63 aufgeteilt. Damit sind 2 Teilaufgaben entstanden: (a) Finden der größten Zahl im Indexbereich 0..31 sowie (b) finden der größten Zahl im Indexbereich 32..63.
Diese Teilaufgaben können in der Fork-Phase rekursiv in noch kleinere Teilaufgaben zerlegt werden: Finden der größten Zahl in den Indexbereichen 0..15, 16..31, 32..47 und 48..63. Entscheidend ist, dass jede Teilaufgabe unabhängig von den anderen in einem nebenläufigen Thread gelöst werden kann. Die Methode <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html#reduce(T,java.util.function.BinaryOperator)"><code>reduce</code></a> (s. oben) führt dazu die Elemente eines Streams paarweise zusammen. In diesem Fall wird der Methode <code>reduce</code> dazu die Methode <code>Math.max</code> als <code>BinaryOperator</code> übergeben, um jeweils die größere von zwei Zahlen zurückzugeben. Angenommen es liegt eine Quadcore-Hardware zugrunde, würden in der Fork-Phase 4 Threads genutzt, die unabhängig voneinander die Teilaufgaben bearbeiten. In der anschließenden Join-Phase werden die Ergebnisse der Teilaufgaben aggregiert, indem sie paarweise als Argumente dem <code>BinaryOperator</code> (hier <code>Math:max</code>) übergeben werden, der in der Methode <code>reduce</code> ausgeführt wird.</p>

Jens Ehlers's avatar
Jens Ehlers committed
105
<p>Wenn N die Anzahl der verfügbaren Prozessorkerne ist, wird das Ausgangsproblem tatsächlich nicht nur in N Teilaufgaben zerlegt sondern in etwa 4·N. Die Anzahl der erzeugten Teilaufgaben entspricht der Zweierpotenz, die größer als 4·(N-1) ist. Auf einer Quadcore-Hardware werden also 16 Teilaufgaben erzeugt, die nacheinander von 4 Threads bearbeitet werden. Dieses Verhalten lässt sich auch beobachten (<a href="/concurrency/src/main/java/streams/ParallelStreamCountForks.java" class="repo-link">siehe hier</a>). Der per Default genutzte Thread-Pool namens <i>ForkJoinPool.common</i> lässt sich bei Bedarf verändern oder austauschen. Über die folgende Anweisung wird z.B. die Anzahl der Threads in diesem Thread-Pool auf 8 erhöht.</p>
106
107
108
109
110
111
112
113
114

<pre><code class="language-java line-numbers">System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");</code></pre>

<p>Das Fork-Join-Framework unterscheidet also zwischen der Anzahl an Teilaufgaben (= <i>Tasks</i>), die abgearbeitet werden müssen, und der Anzahl an Threads, die dazu zur Verfügung stehen. Das folgende UML-Klassendiagramm visualisiert die Beziehungen zwischen den beteiligten Interfaces und Klassen.</p>

<img src="media/concurrency_forkjoin_classes.png" style="width:540px">
<label>Grundlegende Klassen des Fork-Join-Framework</label>

<ul>
115
116
<li><b>Tasks</b>: <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ForkJoinTask.html"><code>ForkJoinTasks</code></a> sind abgeschlossene Arbeitseinheiten/Teilprobleme, die nicht auf andere Tasks warten müssen. Sie sind nicht an einen bestimmten Thread zur Ausführung gebunden und implementieren das <code>Future</code>-Interface, versprechen also i.d.R. ein Ergebnis bestimmten Typs (= <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/RecursiveTask.html"><code>RecursiveTask</code></a>). Tasks ohne Ergebnis heißen <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/RecursiveAction.html"><code>RecursiveAction</code></a>.</li>
<li><b>Threads</b>: In einem <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ForkJoinPool.html"><code>ForkJoinPool</code></a> werden spezielle Threads, nämlich <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ForkJoinWorkerThread.html"><code>ForkJoinWorkerThreads</code></a> verwaltet, die <code>ForkJoinTasks</code> ausführen. Ein <code>ForkJoinPool</code> unterscheidet sich von anderen <code>ExecutorServices</code> durch sogenanntes <i>Work Stealing</i>, d.h. alle Threads im Pool versuchen aktiv, in den Pool gesendete Tasks zu ergreifen und auszuführen. Dies ermöglicht eine effiziente Verarbeitung, insbesondere wenn die Tasks ihrerseits neue Tasks erzeugen – dadurch dass ein <code>ForkJoinTask</code> typischerweise eine Teilaufgabe rekursiv in kleinere Teilaufgaben zerlegt und diese wieder an den Pool sendet.</li>
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
</ul>

<p>Das Fork-Join-Framework kann auch manuell ohne parallele Streams eingesetzt werden. Das folgende Code-Beispiel demonstriert wie die Berechnung einer bestimmmten Fibonacchi-Zahl über eine <code>RecursiveTask</code> parallelisiert werden kann. Das Beispiel ist aus der Dokumentation der <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/RecursiveTask.html"><code>RecursiveTask</code></a>-Klasse übernommen. Die Implementierung ist nicht effizient, da die gebildeten Teilaufgaben zu klein sind. Stattdessen ist – wie bei allen Fork-Join-Anwendungen – eine sinnvolle, minimale Granularität zu wählen, ab der eine Aufgabe sequentiell gelöst wird, anstatt sie zu unterteilen. Die Methode <code>fork</code> in Zeile 9 erzeugt eine Teilaufgabe, die nebenläufig über den <code>ForkJoinPool</code> abgearbeitet wird. Über die Methode <code>join</code> in Zeile 11 wird im aufrufenden Thread auf das Ergebnis dieser Teilaufgabe gewartet.</p>

<pre><code class="language-java line-numbers">class FibonacciTask extends RecursiveTask&lt;Integer> {
    int n;
    FibonacciTask(int n) { this.n = n; }

	@Override
    protected Integer compute() {
        if (n &lt;= 1) return n;
        FibonacciTask f1 = new FibonacciTask(n - 1);
        f1.fork(); // fork: execute this task concurrently in the same pool the current task is running in
        FibonacciTask f2 = new FibonacciTask(n - 2);
        return f2.compute() + f1.join(); // join: wait for the concurrent task to be done
    }

    public static void main(String[] args) {
        System.out.println(new FibonacciTask(7).compute()); // 7-th Fibonacchi number is 13
    }
}</code></pre>

<p>Eine beliebte Übungsaufgabe ist es, die grundlegenden Sortieralgorithmen <a href="https://de.wikipedia.org/wiki/Quicksort">QuickSort</a> und <a href="https://de.wikipedia.org/wiki/Mergesort">MergeSort</a>, die ebenfalls auf dem Divide & Conquer-Prinzip basieren, über das Fork-Join-Framework zu parallelisieren. Probiere es aus! Vergleiche die Laufzeiten einer sequentiellen und einer parallelisierten Implementierung der Sortieralgorithmen! Hilfreich ist hierbei die Methode <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ForkJoinTask.html#invokeAll(java.util.Collection)"><code>invokeAll</code></a>, die auf einem <code>ForkJoinTask</code> ausgeführt werden kann, um mit einer Anweisung eine Menge von Teilaufgaben zu erzeugen (<code>fork</code>) und direkt auf alle Teilergebnisse zu warten (<code>join</code>).</p>

<p>In 2004 prägte eine wissenschaftliche Veröffentlichung von Google Research maßgeblich den Begriff <a href="https://ai.google/research/pubs/pub62">MapReduce</a>, dem wir insbesondere im Umfeld der Massendatenverarbeitung in verteilten Systemen (z.B. mittels <a href="https://hadoop.apache.org/">Hadoop</a> und <a href="https://spark.apache.org/">Spark</a>) und allgemein in vielen APIs zur funktionalen Programmierung begegnen. MapReduce beschreibt ein Programmiermodell, das ebenfalls auf dem Divide & Conquer-Prinzip aufbaut, und daher dem Fork-Join-Framework sehr ähnlich ist, wobei die <i>Map</i>-Phase der Fork-Phase entspricht und die <i>Reduce</i>-Phase der Join-Phase. Die Ausführung der parallelen Threads ist bei MapReduce-Anwendungen aber i.d.R. nicht auf eine Maschine beschränkt, sondern verteilt sich auf ein ganzes Cluster von Maschinen.</p>

143
<h4 class="page-break">Reaktive Streams</h4>
144
145

<p>In Java sind reaktive Streams seit Java 9 (2017) durch die sogenannte <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.html">Flow API</a> standardisiert. Es werden hier insbesondere die Interfaces <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Publisher.html"><code>Publisher</code></a> und <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.Subscriber.html"><code>Subscriber</code></a> bereitgestellt, über die eine asynchrone Stream-Verarbeitung mit nicht-blockierenden Gegendruck (engl. <i>Backpressure</i>) realisiert werden kann. Eine Datenquelle (<i>Publisher</i>) informiert seine Beobachter (<i>Subscriber</i>) über neue Nachrichten bzw. Zustandsänderungen, wobei sämtliche Publisher und Subscriber in nebenläufigen Threads ausgeführt werden. Der Gegendruck entsteht für einen Subscriber dadurch, dass er auch bei hoher Frequenz neu eingehender Nachrichten vom Publisher, diese verarbeiten können muss. Der max. mögliche Durchsatz zur Nachrichtenverarbeitung auf der Seite des Subscriber muss bestimmt werden und schränkt ggf. ein, wie viele eingehende Nachrichten aus den beobachteten Datenquellen akzeptiert werden können. Ein Code-Beispiel für die Verwendung des <code>Publisher</code>- und <code>Subscriber</code>-Interface ist bereits am Ende des Kapitels zum <a href="#unit-observer" class="navigate">Entwurfsmuster Beobachter</a> dargestellt worden. Bemerkenswert ist, dass die Flow API einen Konsens darstellt, der im Rahmen der Initiative <a href="http://www.reactive-streams.org/">Reactive Streams for the JVM</a> entstanden ist. Die Interfaces der Flow API (<code>Publisher</code>, <code>Subscriber</code>, <code>Subscription</code> und <code>Processor</code>) werden von verschiedenen beliebten Stream-Bibliotheken wie <a href="https://github.com/ReactiveX/RxJava/wiki/Reactive-Streams">RxJava</a>, <a href="https://akka.io/docs/">Akka</a> und <a href="http://vertx.io/">Vert.x</a> implementiert. Dadurch können Datenströme, die auf Basis dieser Bibliotheken entwickelt worden sind, unter Aufrechterhaltung des Gegendrucks miteinander verbunden werden.</p>