Java 101: Java-Parallelität ohne Schmerzen, Teil 2

Zurück 1 2 3 4 Page 3 Weiter Seite 3 von 4

Atomvariablen

Multithread-Anwendungen, die auf Multicore-Prozessoren oder Multiprozessorsystemen ausgeführt werden, können eine gute Hardwareauslastung erzielen und sind hoch skalierbar. Sie können diese Ziele erreichen, indem ihre Threads die meiste Zeit mit der Ausführung von Arbeiten verbringen, anstatt auf die Ausführung der Arbeit zu warten oder auf den Erwerb von Sperren zu warten, um auf gemeinsam genutzte Datenstrukturen zuzugreifen.

Der traditionelle Synchronisationsmechanismus von Java, der den gegenseitigen Ausschluss erzwingt (der Thread, der die Sperre hält, die eine Reihe von Variablen schützt, hat exklusiven Zugriff darauf) und die Sichtbarkeit (Änderungen an den geschützten Variablen werden für andere Threads sichtbar, die anschließend die Sperre erhalten), wirkt sich jedoch aus Hardware-Auslastung und Skalierbarkeit wie folgt:

  • Die konkurrierende Synchronisation (mehrere Threads konkurrieren ständig um eine Sperre) ist teuer und der Durchsatz leidet darunter. Ein Hauptgrund für die Kosten ist der häufige Kontextwechsel; Eine Kontextwechseloperation kann viele Prozessorzyklen dauern. Im Gegensatz dazu ist eine unkontrollierte Synchronisation bei modernen JVMs kostengünstig.
  • Wenn ein Thread, der eine Sperre hält, verzögert wird (z. B. aufgrund einer Planungsverzögerung), macht kein Thread, der diese Sperre erfordert, Fortschritte, und die Hardware wird nicht so gut verwendet, wie es sonst der Fall wäre.

Sie könnten denken, dass Sie volatileals Synchronisationsalternative verwenden können. volatileVariablen lösen jedoch nur das Sichtbarkeitsproblem. Sie können nicht verwendet werden, um die atomaren Lese-, Änderungs- und Schreibsequenzen sicher zu implementieren, die für die sichere Implementierung von Zählern und anderen Entitäten erforderlich sind, die einen gegenseitigen Ausschluss erfordern.

Java 5 führte eine Synchronisationsalternative ein, die gegenseitigen Ausschluss in Kombination mit der Leistung von bietet volatile. Diese atomare Variablenalternative basiert auf dem Vergleichs- und Austauschbefehl eines Mikroprozessors und besteht größtenteils aus den Typen im java.util.concurrent.atomicPaket.

Vergleichen und Tauschen verstehen

Der CAS- Befehl (compare-and-swap) ist ein unterbrechungsfreier Befehl, der einen Speicherort liest, den Lesewert mit einem erwarteten Wert vergleicht und einen neuen Wert im Speicherort speichert, wenn der Lesewert mit dem erwarteten Wert übereinstimmt. Ansonsten wird nichts getan. Der tatsächliche Mikroprozessorbefehl kann sich etwas unterscheiden (z. B. true zurückgeben, wenn CAS erfolgreich war, oder andernfalls false anstelle des gelesenen Werts).

Mikroprozessor-CAS-Anweisungen

Moderne Mikroprozessoren bieten eine Art CAS-Anweisung. Beispielsweise bieten Intel-Mikroprozessoren die cmpxchgBefehlsfamilie, während PowerPC-Mikroprozessoren Load-Link- (z. B. lwarx) und speicherbedingte (z. B. stwcx) Anweisungen für denselben Zweck anbieten .

CAS ermöglicht die Unterstützung atomarer Lese-, Änderungs- und Schreibsequenzen. Normalerweise verwenden Sie CAS wie folgt:

  1. Lesen Sie den Wert v von Adresse X ab.
  2. Führen Sie eine mehrstufige Berechnung durch, um einen neuen Wert v2 abzuleiten.
  3. Verwenden Sie CAS, um den Wert von X von v in v2 zu ändern. CAS ist erfolgreich, wenn sich der Wert von X während dieser Schritte nicht geändert hat.

Betrachten Sie ein Zählerbeispiel, mit dem Sie den aktuellen Wert lesen und den Zähler erhöhen können, um zu sehen, wie CAS eine bessere Leistung (und Skalierbarkeit) gegenüber der Synchronisation bietet. Die folgende Klasse implementiert einen Zähler basierend auf synchronized:

Listing 4. Counter.java (Version 1)

public class Counter { private int value; public synchronized int getValue() { return value; } public synchronized int increment() { return ++value; } }

Hohe Konflikte um die Monitorsperre führen zu einer übermäßigen Kontextumschaltung, die alle Threads verzögern und zu einer Anwendung führen kann, die nicht gut skaliert werden kann.

Die CAS-Alternative erfordert eine Implementierung des Vergleichs- und Austauschbefehls. Die folgende Klasse emuliert CAS. Es verwendet synchronizedanstelle der eigentlichen Hardwareanweisung, um den Code zu vereinfachen:

Listing 5. EmulatedCAS.java

public class EmulatedCAS { private int value; public synchronized int getValue() { return value; } public synchronized int compareAndSwap(int expectedValue, int newValue) { int readValue = value; if (readValue == expectedValue) value = newValue; return readValue; } }

Hier valueidentifiziert eine Speicherstelle, die durch abgerufen werden kann getValue(). Außerdem compareAndSwap()implementiert die CAS - Algorithmus.

Die folgende Klasse verwendet EmulatedCAS, um einen Nichtzähler zu implementieren synchronized(so zu tun, als wäre EmulatedCASdies nicht erforderlich synchronized):

Listing 6. Counter.java (Version 2)

public class Counter { private EmulatedCAS value = new EmulatedCAS(); public int getValue() { return value.getValue(); } public int increment() { int readValue = value.getValue(); while (value.compareAndSwap(readValue, readValue+1) != readValue) readValue = value.getValue(); return readValue+1; } }

Counterkapselt eine EmulatedCASInstanz und deklariert Methoden zum Abrufen und Inkrementieren eines Zählerwerts mit Hilfe dieser Instanz. getValue()Ruft den "aktuellen Zählerwert" der Instanz ab und increment()erhöht den Zählerwert sicher.

increment()wird wiederholt aufgerufen, compareAndSwap()bis sich readValueder Wert nicht mehr ändert. Es ist dann frei, diesen Wert zu ändern. Wenn keine Sperre beteiligt ist, werden Konflikte und übermäßige Kontextwechsel vermieden. Die Leistung verbessert sich und der Code ist skalierbarer.

ReentrantLock und CAS

Sie haben zuvor erfahren, dass dies ReentrantLockeine bessere Leistung bietet als synchronizedbei hohen Thread-Konflikten. Um die Leistung zu steigern, wird ReentrantLockdie Synchronisation von einer Unterklasse der abstrakten java.util.concurrent.locks.AbstractQueuedSynchronizerKlasse verwaltet. Diese Klasse nutzt wiederum die undokumentierte sun.misc.UnsafeKlasse und ihre compareAndSwapInt()CAS-Methode.

Erkundung des atomaren Variablenpakets

Sie müssen nicht compareAndSwap()über die nicht portierbare Java Native Interface implementieren . Stattdessen bietet Java 5 diese Unterstützung über java.util.concurrent.atomic: ein Toolkit von Klassen, die für die sperrfreie, threadsichere Programmierung einzelner Variablen verwendet werden.

Laut java.util.concurrent.atomicJavadoc sind diese Klassen

Erweitern Sie den Begriff der volatileWerte, Felder und Array-Elemente auf diejenigen, die auch eine atomare bedingte Aktualisierungsoperation des Formulars bereitstellen boolean compareAndSet(expectedValue, updateValue). Diese Methode (die sich in den Argumenttypen in verschiedenen Klassen unterscheidet) setzt eine Variable atomar auf die, updateValuewenn sie derzeit die enthält expectedValue, und gibt an, ob sie erfolgreich ist.

Dieses Paket bietet Klassen für die Typen Boolean ( AtomicBoolean), integer ( AtomicInteger), long integer ( AtomicLong) und reference ( AtomicReference). Es bietet auch Array - Versionen integer, long integer und Referenz ( AtomicIntegerArray, AtomicLongArray, und AtomicReferenceArray), beschreib- und gestanzten Referenzkurse für atomar ein Wertepaar zu aktualisieren ( AtomicMarkableReferenceund AtomicStampedReference) und vieles mehr.

Implementierung von compareAndSet ()

Java implementiert compareAndSet()über das schnellste verfügbare native Konstrukt (z. B. cmpxchgLoad-Link / Store-Conditional) oder (im schlimmsten Fall) Spin-Locks .

Überlegen Sie AtomicInteger, wie Sie einen intWert atomar aktualisieren können. Mit dieser Klasse können wir den in Listing 6 gezeigten Zähler implementieren. Listing 7 enthält den entsprechenden Quellcode.

Listing 7. Counter.java (Version 3)

import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger value = new AtomicInteger(); public int getValue() { return value.get(); } public int increment() { int readValue = value.get(); while (!value.compareAndSet(readValue, readValue+1)) readValue = value.get(); return readValue+1; } }

Listing 7 is very similar to Listing 6 except that it replaces EmulatedCAS with AtomicInteger. Incidentally, you can simplify increment() because AtomicInteger supplies its own int getAndIncrement() method (and similar methods).

Fork/Join framework

Computer hardware has evolved significantly since Java's debut in 1995. Back in the day, single-processor systems dominated the computing landscape and Java's synchronization primitives, such as synchronized and volatile, as well as its threading library (the Thread class, for example) were generally adequate.

Multiprocessor systems became cheaper and developers found themselves needing to create Java applications that effectively exploited the hardware parallelism that these systems offered. However, they soon discovered that Java's low-level threading primitives and library were very difficult to use in this context, and the resulting solutions were often riddled with errors.

What is parallelism?

Parallelism is the simultaneous execution of multiple threads/tasks via some combination of multiple processors and processor cores.

The Java Concurrency Utilities framework simplifies the development of these applications; however, the utilities offered by this framework do not scale to thousands of processors or processor cores. In our many-core era, we need a solution for achieving a finer-grained parallelism, or we risk keeping processors idle even when there is lots of work for them to handle.

Professor Doug Lea presented a solution to this problem in his paper introducing the idea for a Java-based fork/join framework. Lea describes a framework that supports "a style of parallel programming in which problems are solved by (recursively) splitting them into subtasks that are solved in parallel." The Fork/Join framework was eventually included in Java 7.

Overview of the Fork/Join framework

The Fork/Join framework is based on a special executor service for running a special kind of task. It consists of the following types that are located in the java.util.concurrent package:

  • ForkJoinPool: an ExecutorService implementation that runs ForkJoinTasks. ForkJoinPool provides task-submission methods, such as void execute(ForkJoinTask task), along with management and monitoring methods, such as int getParallelism() and long getStealCount().
  • ForkJoinTask: an abstract base class for tasks that run within a ForkJoinPool context. ForkJoinTask describes thread-like entities that have a much lighter weight than normal threads. Many tasks and subtasks can be hosted by very few actual threads in a ForkJoinPool instance.
  • ForkJoinWorkerThread: a class that describes a thread managed by a ForkJoinPool instance. ForkJoinWorkerThread is responsible for executing ForkJoinTasks.
  • RecursiveAction: an abstract class that describes a recursive resultless ForkJoinTask.
  • RecursiveTask: an abstract class that describes a recursive result-bearing ForkJoinTask.

The ForkJoinPool executor service is the entry-point for submitting tasks that are typically described by subclasses of RecursiveAction or RecursiveTask. Behind the scenes, the task is divided into smaller tasks that are forked (distributed among different threads for execution) from the pool. A task waits until joined (its subtasks finish so that results can be combined).

ForkJoinPool manages a pool of worker threads, where each worker thread has its own double-ended work queue (deque). When a task forks a new subtask, the thread pushes the subtask onto the head of its deque. When a task tries to join with another task that hasn't finished, the thread pops another task off the head of its deque and executes the task. If the thread's deque is empty, it tries to steal another task from the tail of another thread's deque. This work stealing behavior maximizes throughput while minimizing contention.

Using the Fork/Join framework

Fork/Join was designed to efficiently execute divide-and-conquer algorithms, which recursively divide problems into sub-problems until they are simple enough to solve directly; for example, a merge sort. The solutions to these sub-problems are combined to provide a solution to the original problem. Each sub-problem can be executed independently on a different processor or core.

Lea's paper presents the following pseudocode to describe the divide-and-conquer behavior:

Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } }

The pseudocode presents a solve method that's called with some problem to solve and which returns a Result that contains the problem's solution. If the problem is too small to solve via parallelism, it's solved directly. (The overhead of using parallelism on a small problem exceeds any gained benefit.) Otherwise, the problem is divided into subtasks: each subtask independently focuses on part of the problem.

Operation fork launches a new fork/join subtask that will execute in parallel with other subtasks. Operation join delays the current task until the forked subtask finishes. At some point, the problem will be small enough to be executed sequentially, and its result will be combined along with other subresults to achieve an overall solution that's returned to the caller.

The Javadoc for the RecursiveAction and RecursiveTask classes presents several divide-and-conquer algorithm examples implemented as fork/join tasks. For RecursiveAction the examples sort an array of long integers, increment each element in an array, and sum the squares of each element in an array of doubles. RecursiveTask's solitary example computes a Fibonacci number.

Listing 8 zeigt eine Anwendung, die das Sortierbeispiel in Nicht-Fork / Join- sowie Fork / Join-Kontexten demonstriert. Es werden auch einige Zeitinformationen angezeigt, um die Sortiergeschwindigkeiten gegenüberzustellen.