Java - Erkennung und Handhabung hängender Threads

Von Alex. C. Punnen

Architekt - Nokia Siemens Networks

Bangalore

Hängende Threads sind eine häufige Herausforderung bei der Entwicklung von Software, die über proprietäre oder standardisierte Schnittstellen wie SNMP, Q3 oder Telnet mit proprietären Geräten verbunden werden muss. Dieses Problem ist nicht auf die Netzwerkverwaltung beschränkt, sondern tritt in einer Vielzahl von Bereichen auf, z. B. Webservern, Prozessen, die Remoteprozeduraufrufe aufrufen, usw.

Ein Thread, der eine Anforderung an ein Gerät initiiert, benötigt einen Mechanismus, um zu erkennen, ob das Gerät nicht oder nur teilweise reagiert. In einigen Fällen, in denen ein solcher Hang erkannt wird, muss eine bestimmte Aktion ausgeführt werden. Die spezifische Aktion kann entweder ein erneuter Versuch sein oder den Endbenutzer über den Fehler der Aufgabe oder eine andere Wiederherstellungsoption informieren. In einigen Fällen, in denen eine große Anzahl von Aufgaben von einer Komponente an eine große Anzahl von Netzwerkelementen gesendet werden muss, ist die Erkennung hängender Threads wichtig, damit sie nicht zu einem Engpass für die Verarbeitung anderer Aufgaben wird. Die Verwaltung hängender Threads hat also zwei Aspekte: Leistung und Benachrichtigung .

Für den Benachrichtigungsaspekt können wir das Java Observer-Muster so anpassen, dass es in die Multithread-Welt passt.

Anpassen des Java Observer-Musters an Multithread-Systeme

Aufgrund hängender Aufgaben ist die Verwendung der Java- ThreadPoolKlasse mit einer Anpassungsstrategie die erste Lösung, die in den Sinn kommt. Die Verwendung von Java ThreadPoolim Kontext einiger Threads, die über einen bestimmten Zeitraum zufällig hängen, führt jedoch zu unerwünschtem Verhalten, das auf der jeweils verwendeten Strategie basiert, wie z. B. Thread-Hunger im Fall einer Strategie mit festem Thread-Pool. Dies ist hauptsächlich auf die Tatsache zurückzuführen, dass Java ThreadPoolkeinen Mechanismus zum Erkennen eines Thread-Hangs hat.

Wir könnten einen zwischengespeicherten Thread-Pool ausprobieren, aber er hat auch Probleme. Wenn eine hohe Rate an Aufgaben ausgelöst wird und einige Threads hängen bleiben, kann die Anzahl der Threads in die Höhe schießen und schließlich zu Ressourcenmangel und Ausnahmen aufgrund von Speichermangel führen. Oder wir könnten eine benutzerdefinierte ThreadPoolStrategie verwenden, die a aufruft CallerRunsPolicy. Auch in diesem Fall kann ein Thread-Hang dazu führen, dass alle Threads irgendwann hängen bleiben. (Der Haupt-Thread sollte niemals der Aufrufer sein, da die Möglichkeit besteht, dass eine an den Haupt-Thread übergebene Aufgabe hängen bleibt und alles zum Stillstand kommt.)

Also, was ist die Lösung? Ich werde ein nicht so einfaches ThreadPool-Muster demonstrieren, das die Poolgröße entsprechend der Aufgabenrate und basierend auf der Anzahl der hängenden Threads anpasst. Kommen wir zunächst zum Problem der Erkennung hängender Fäden.

Hängende Fäden erkennen

Abbildung 1 zeigt eine Abstraktion des Musters:

Hier gibt es zwei wichtige Klassen: ThreadManagerund ManagedThread. Beide erstrecken sich von der Java- ThreadKlasse. Das ThreadManagerhält einen Behälter, der das hält ManagedThreads. Wenn ein neues ManagedThreaderstellt wird, fügt es sich diesem Container hinzu.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

Die ThreadManageriteriert durch diese Liste und ruft die ManagedThread‚s - isHung()Methode. Dies ist im Grunde eine Zeitstempelprüflogik.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Wenn festgestellt wird, dass ein Thread in eine Task-Schleife geraten ist und seine Ergebnisse nie aktualisiert hat, wird ein Wiederherstellungsmechanismus verwendet, wie vom ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Damit ein neues ManagedThreaderstellt und anstelle des aufgehängten verwendet werden kann, sollte es keinen Zustand oder Container enthalten. Hierzu soll der Container, auf dem die ManagedThreadAkte abgetrennt werden sollen. Hier verwenden wir das ENUM-basierte Singleton-Muster, um die Aufgabenliste zu speichern. Der Container mit den Aufgaben ist also unabhängig von dem Thread, der die Aufgaben verarbeitet. Klicken Sie auf den folgenden Link, um die Quelle für das beschriebene Muster herunterzuladen: Java Thread Manager Source.

Hängende Threads und Java ThreadPool-Strategien

Java ThreadPoolverfügt nicht über einen Mechanismus zum Erkennen hängender Threads. Die Verwendung einer Strategie wie Fixed Threadpool ( Executors.newFixedThreadPool()) funktioniert nicht, da sich einige Threads möglicherweise im Stillstand befinden, wenn einige Aufgaben im Laufe der Zeit hängen bleiben. Eine andere Option ist die Verwendung einer zwischengespeicherten ThreadPool-Richtlinie (Executors.newCachedThreadPool()). Dies könnte sicherstellen, dass immer Threads zur Verarbeitung einer Aufgabe verfügbar sind, die nur durch VM-Speicher-, CPU- und Thread-Beschränkungen eingeschränkt sind. Mit dieser Richtlinie gibt es jedoch keine Kontrolle über die Anzahl der Threads, die erstellt werden. Unabhängig davon, ob ein Verarbeitungsthread hängt oder nicht, führt die Verwendung dieser Richtlinie bei hoher Taskrate dazu, dass eine große Anzahl von Threads erstellt wird. Wenn Sie nicht bald über genügend Ressourcen für die JVM verfügen, erreichen Sie die maximale Speicherschwelle oder die hohe CPU. Es ist ziemlich üblich, dass die Anzahl der Threads Hunderte oder Tausende erreicht. Obwohl sie nach der Verarbeitung der Aufgabe freigegeben werden, überfordert die hohe Anzahl von Threads manchmal während der Burst-Verarbeitung die Systemressourcen.

Eine dritte Option ist die Verwendung benutzerdefinierter Strategien oder Richtlinien. Eine solche Option besteht darin, einen Thread-Pool zu haben, der von 0 bis zu einer maximalen Anzahl skaliert. Selbst wenn ein Thread hängen würde, würde ein neuer Thread erstellt, solange die maximale Threadanzahl erreicht ist:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Hier ist 3 die maximale Thread-Anzahl und die Keep-Alive-Zeit ist auf 60 Sekunden eingestellt, da dies ein aufgabenintensiver Prozess ist. Wenn wir eine ausreichend hohe maximale Thread-Anzahl angeben, ist dies mehr oder weniger eine vernünftige Richtlinie, die im Zusammenhang mit hängenden Aufgaben verwendet werden kann. Das einzige Problem ist, dass, wenn die hängenden Fäden schließlich nicht freigegeben werden, eine geringe Wahrscheinlichkeit besteht, dass alle Fäden irgendwann hängen bleiben. Wenn die maximale Anzahl an Threads ausreichend hoch ist und davon ausgegangen wird, dass ein Task-Hang ein seltenes Phänomen ist, passt diese Richtlinie in die Rechnung.

Es wäre süß gewesen, wenn der ThreadPoolauch einen steckbaren Mechanismus zum Erkennen von hängenden Fäden hätte. Ich werde später auf ein solches Design eingehen. Wenn alle Threads eingefroren sind, können Sie natürlich die Richtlinie für abgelehnte Aufgaben des Thread-Pools konfigurieren und verwenden. Wenn Sie die Aufgaben nicht verwerfen möchten, müssen Sie Folgendes verwenden CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

In diesem Fall würde diese Aufgabe an den aufzurufenden aufrufenden Thread übergeben, wenn ein Thread-Hang dazu führte, dass eine Aufgabe abgelehnt wurde. Es besteht immer die Möglichkeit, dass diese Aufgabe zu hängen bleibt. In diesem Fall würde der gesamte Prozess einfrieren. Es ist daher besser, in diesem Zusammenhang keine solche Richtlinie hinzuzufügen.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Ein benutzerdefinierter ThreadPool mit Hang-Erkennung

Eine Thread-Pool-Bibliothek mit der Fähigkeit zur Erkennung und Handhabung von Task-Hänge wäre großartig. Ich habe eine entwickelt und werde sie unten demonstrieren. Dies ist eigentlich ein Port aus einem C ++ - Thread-Pool, den ich vor einiger Zeit entworfen und verwendet habe (siehe Referenzen). Grundsätzlich verwendet diese Lösung das Befehlsmuster und das Muster der Verantwortungskette. Das Implementieren des Befehlsmusters in Java ohne die Unterstützung von Funktionsobjekten ist jedoch etwas schwierig. Dafür musste ich die Implementierung leicht ändern, um Java Reflection zu verwenden. Beachten Sie, dass in dem Kontext, in dem dieses Muster entworfen wurde, ein Thread-Pool eingebaut / eingesteckt werden musste, ohne eine der vorhandenen Klassen zu ändern.(Ich glaube, der einzige große Vorteil der objektorientierten Programmierung besteht darin, dass wir damit Klassen entwerfen können, um das Open Closed-Prinzip effektiv zu nutzen. Dies gilt insbesondere für komplexen alten Legacy-Code und ist möglicherweise weniger relevant für Entwicklung neuer Produkte.) Daher habe ich Reflexion verwendet, anstatt eine Schnittstelle zur Implementierung des Befehlsmusters zu verwenden. Der Rest des Codes kann ohne größere Änderungen portiert werden, da fast alle Thread-Synchronisations- und Signalisierungsprimitive ab Java 1.5 verfügbar sind.Der Rest des Codes kann ohne größere Änderungen portiert werden, da fast alle Thread-Synchronisations- und Signalisierungsprimitive ab Java 1.5 verfügbar sind.Der Rest des Codes kann ohne größere Änderungen portiert werden, da fast alle Thread-Synchronisations- und Signalisierungsprimitive ab Java 1.5 verfügbar sind.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Anwendungsbeispiel für dieses Muster:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Jetzt haben wir hier zwei weitere wichtige Klassen. Eine ist die ThreadChainKlasse, die das Muster der Verantwortungskette implementiert:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Diese Klasse hat zwei Hauptmethoden. Einer ist der Boolesche Wert, CanHandle()der von der ThreadPoolKlasse initiiert wird und dann rekursiv fortgesetzt wird. Dadurch wird überprüft, ob der aktuelle Thread (aktuelle ThreadChainInstanz) für die Ausführung der Aufgabe frei ist. Wenn es bereits eine Aufgabe bearbeitet, ruft es die nächste in der Kette auf.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Beachten Sie, dass dies HandleRequesteine Methode ThreadChainist, die von der Thread run()Methode aufgerufen wird und auf das Signal von der canHandleMethode wartet . Beachten Sie auch, wie die Aufgabe über das Befehlsmuster behandelt wird.