So erstellen Sie Stateful-Streaming-Anwendungen mit Apache Flink

Fabian Hueske ist Committer und PMC-Mitglied des Apache Flink-Projekts und Mitbegründer von Data Artisans.

Apache Flink ist ein Framework zum Implementieren von Stateful Stream-Verarbeitungsanwendungen und zum Ausführen dieser in großem Maßstab auf einem Computercluster. In einem früheren Artikel haben wir untersucht, was Stateful Stream Processing ist, welche Anwendungsfälle darin behandelt werden und warum Sie Ihre Streaming-Anwendungen mit Apache Flink implementieren und ausführen sollten.

In diesem Artikel werde ich Beispiele für zwei häufige Anwendungsfälle der Stateful Stream-Verarbeitung vorstellen und erläutern, wie sie mit Flink implementiert werden können. Der erste Anwendungsfall sind ereignisgesteuerte Anwendungen, dh Anwendungen, die kontinuierliche Ereignisströme aufnehmen und eine Geschäftslogik auf diese Ereignisse anwenden. Der zweite ist der Anwendungsfall für Streaming-Analysen, in dem ich zwei analytische Abfragen vorstelle, die mit der SQL-API von Flink implementiert wurden und Streaming-Daten in Echtzeit aggregieren. Wir bei Data Artisans stellen den Quellcode aller unserer Beispiele in einem öffentlichen GitHub-Repository bereit.

Bevor wir uns mit den Details der Beispiele befassen, werde ich den Ereignisstrom vorstellen, der von den Beispielanwendungen aufgenommen wird, und erläutern, wie Sie den von uns bereitgestellten Code ausführen können.

Ein Strom von Taxifahrten

Unsere Beispielanwendungen basieren auf einem öffentlichen Datensatz über Taxifahrten, die 2013 in New York City stattfanden. Die Organisatoren der Grand Challenge 2015 der DEBS (ACM International Conference on Distributed Event-Based Systems) haben den ursprünglichen Datensatz neu angeordnet und in einen solchen konvertiert Eine einzelne CSV-Datei, aus der wir die folgenden neun Felder lesen.

  • Medaillon - eine MD5-Summen-ID des Taxis
  • Hack_license - eine MD5-Summen-ID der Taxilizenz
  • Pickup_datetime - die Zeit, zu der Passagiere abgeholt wurden
  • Dropoff_datetime - die Zeit, zu der Passagiere abgesetzt wurden
  • Pickup_longitude - der Längengrad des Abholorts
  • Pickup_latitude - der Breitengrad des Abholorts
  • Dropoff_longitude - Der Längengrad der Abgabestelle
  • Dropoff_latitude - der Breitengrad der Abgabestelle
  • Total_amount - Gesamtbetrag in US-Dollar

In der CSV-Datei werden die Datensätze in aufsteigender Reihenfolge ihres Attributs für die Abgabezeit gespeichert. Daher kann die Datei als geordnetes Protokoll von Ereignissen behandelt werden, die zum Ende einer Reise veröffentlicht wurden. Um die Beispiele auszuführen, die wir auf GitHub bereitstellen, müssen Sie den Datensatz der DEBS-Challenge von Google Drive herunterladen.

Alle Beispielanwendungen lesen nacheinander die CSV-Datei und nehmen sie als Stream von Taxifahrten auf. Von da an verarbeiten die Anwendungen die Ereignisse wie jeden anderen Stream, dh wie einen Stream, der von einem protokollbasierten Publish-Subscribe-System wie Apache Kafka oder Kinesis aufgenommen wird. Das Lesen einer Datei (oder einer anderen Art von persistierten Daten) und das Behandeln als Stream ist ein Eckpfeiler des Ansatzes von Flink zur Vereinheitlichung der Stapel- und Stream-Verarbeitung.

Ausführen der Flink-Beispiele

Wie bereits erwähnt, haben wir den Quellcode unserer Beispielanwendungen in einem GitHub-Repository veröffentlicht. Wir empfehlen Ihnen, das Repository zu teilen und zu klonen. Die Beispiele können einfach in der IDE Ihrer Wahl ausgeführt werden. Sie müssen keinen Flink-Cluster einrichten und konfigurieren, um sie auszuführen. Importieren Sie zunächst den Quellcode der Beispiele als Maven-Projekt. Führen Sie dann die Hauptklasse einer Anwendung aus und geben Sie den Speicherort der Datendatei (siehe oben für den Link zum Herunterladen der Daten) als Programmparameter an.

Sobald Sie eine Anwendung gestartet haben, startet sie eine lokale, eingebettete Flink-Instanz im JVM-Prozess der Anwendung und sendet die Anwendung, um sie auszuführen. Sie sehen eine Reihe von Protokollanweisungen, während Flink gestartet wird und die Aufgaben des Jobs geplant werden. Sobald die Anwendung ausgeführt wird, wird ihre Ausgabe in die Standardausgabe geschrieben.

Erstellen einer ereignisgesteuerten Anwendung in Flink

Lassen Sie uns nun unseren ersten Anwendungsfall diskutieren, bei dem es sich um eine ereignisgesteuerte Anwendung handelt. Ereignisgesteuerte Anwendungen nehmen Ereignisströme auf, führen Berechnungen durch, wenn die Ereignisse empfangen werden, und können neue Ereignisse ausgeben oder externe Aktionen auslösen. Mehrere ereignisgesteuerte Anwendungen können zusammengesetzt werden, indem sie über Ereignisprotokollsysteme miteinander verbunden werden, ähnlich wie große Systeme aus Microservices zusammengesetzt werden können. Ereignisgesteuerte Anwendungen, Ereignisprotokolle und Anwendungsstatus-Snapshots (in Flink als Sicherungspunkte bezeichnet) stellen ein sehr leistungsfähiges Entwurfsmuster dar, da Sie ihren Status zurücksetzen und ihre Eingaben wiedergeben können, um einen Fehler zu beheben, einen Fehler zu beheben oder einen zu migrieren Anwendung auf einen anderen Cluster.

In diesem Artikel untersuchen wir eine ereignisgesteuerte Anwendung, die einen Dienst unterstützt, der die Arbeitszeiten von Taxifahrern überwacht. Im Jahr 2016 hat die NYC Taxi and Limousine Commission beschlossen, die Arbeitszeit der Taxifahrer auf 12-Stunden-Schichten zu beschränken und eine Pause von mindestens acht Stunden vor Beginn der nächsten Schicht einzulegen. Eine Schicht beginnt mit dem Beginn der ersten Fahrt. Von da an kann ein Fahrer innerhalb eines Zeitfensters von 12 Stunden neue Fahrten beginnen. Unsere Anwendung verfolgt die Fahrten von Fahrern, markiert die Endzeit ihres 12-Stunden-Fensters (dh die Zeit, zu der sie möglicherweise die letzte Fahrt beginnen) und kennzeichnet Fahrten, die gegen die Vorschriften verstoßen. Den vollständigen Quellcode dieses Beispiels finden Sie in unserem GitHub-Repository.

Unsere Anwendung wird mit der DataStream-API von Flink und a implementiert KeyedProcessFunction. Die DataStream-API ist eine funktionale API und basiert auf dem Konzept typisierter Datenströme. A DataStreamist die logische Darstellung eines Stroms von Ereignissen vom Typ T. Ein Stream wird verarbeitet, indem eine Funktion auf ihn angewendet wird, die einen anderen Datenstrom erzeugt, möglicherweise von einem anderen Typ. Flink verarbeitet Streams parallel, indem Ereignisse auf Stream-Partitionen verteilt und unterschiedliche Instanzen von Funktionen auf jede Partition angewendet werden.

Das folgende Codefragment zeigt den allgemeinen Ablauf unserer Überwachungsanwendung.

// Strom von Taxifahrten aufnehmen.

DataStream Fahrten = TaxiRides.getRides (env, inputPath);

Datenstrom Benachrichtigungen = Fahrten

   // Partitionsstrom durch die Führerschein-ID

   .keyBy (r -> r.licenseId)

   // Fahrereignisse überwachen und Benachrichtigungen generieren

   .process (neues MonitorWorkTime ());

// Benachrichtigungen drucken

notifications.print ();

Die Anwendung beginnt mit der Aufnahme eines Streams von Taxifahrten. In unserem Beispiel werden die Ereignisse aus einer Textdatei gelesen, analysiert und in TaxiRidePOJO-Objekten gespeichert . Eine reale Anwendung nimmt die Ereignisse normalerweise aus einer Nachrichtenwarteschlange oder einem Ereignisprotokoll auf, z. B. Apache Kafka oder Pravega. Der nächste Schritt besteht darin, die TaxiRideEreignisse licenseIddes Fahrers einzugeben. Die keyByOperation partitioniert den Stream in das deklarierte Feld, sodass alle Ereignisse mit demselben Schlüssel von derselben parallelen Instanz der folgenden Funktion verarbeitet werden. In unserem Fall partitionieren wir auf dem licenseIdFeld, weil wir die Arbeitszeit jedes einzelnen Fahrers überwachen möchten.

Als nächstes wenden wir die MonitorWorkTimeFunktion auf die partitionierten TaxiRideEreignisse an. Die Funktion verfolgt die Fahrten pro Fahrer und überwacht deren Schicht- und Pausenzeiten. Es gibt Ereignisse vom Typ aus Tuple2, wobei jedes Tupel eine Benachrichtigung darstellt, die aus der Lizenz-ID des Treibers und einer Nachricht besteht. Schließlich sendet unsere Anwendung die Nachrichten, indem sie sie auf die Standardausgabe druckt. Eine reale Anwendung würde die Benachrichtigungen in ein externes Nachrichten- oder Speichersystem wie Apache Kafka, HDFS oder ein Datenbanksystem schreiben oder einen externen Anruf auslösen, um sie sofort weiterzuleiten.

Nachdem wir den Gesamtfluss der Anwendung besprochen haben, werfen wir einen Blick auf die MonitorWorkTimeFunktion, die den größten Teil der tatsächlichen Geschäftslogik der Anwendung enthält. Die MonitorWorkTimeFunktion ist ein KeyedProcessFunctionStatus, der TaxiRideEreignisse aufnimmt und Tuple2Datensätze ausgibt . Die KeyedProcessFunctionSchnittstelle bietet zwei Methoden zum Verarbeiten von Daten: processElement()und onTimer(). Die processElement()Methode wird für jedes ankommende Ereignis aufgerufen. Die onTimer()Methode wird aufgerufen, wenn ein zuvor registrierter Timer ausgelöst wird. Das folgende Snippet zeigt das Grundgerüst der MonitorWorkTimeFunktion und alles, was außerhalb der Verarbeitungsmethoden deklariert ist.

öffentliche statische Klasse MonitorWorkTime

    erweitert KeyedProcessFunction {

  // Zeitkonstanten in Millisekunden

  private static final long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 Stunden

  private static final long REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 Stunden

  private static final long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 Stunden

 privater transienter DateTimeFormatter-Formatierer;

  // Statushandle zum Speichern der Startzeit einer Schicht

  ValueState shiftStart;

  @Override

  public void open (Konfiguration conf) {

    // Statushandle registrieren

    shiftStart = getRuntimeContext (). getState (

      neuer ValueStateDescriptor ("shiftStart", Types.LONG));

    // Zeitformatierer initialisieren

    this.formatter = DateTimeFormat.forPattern ("JJJJ-MM-TT HH: MM: SS");

  }}

  // processElement () und onTimer () werden unten ausführlich erläutert.

}}

Die Funktion deklariert einige Konstanten für Zeitintervalle in Millisekunden, einen Zeitformatierer und ein Statushandle für den von Flink verwalteten Schlüsselstatus. Der verwaltete Status wird regelmäßig überprüft und im Fehlerfall automatisch wiederhergestellt. Der Schlüsselstatus wird pro Schlüssel organisiert, was bedeutet, dass eine Funktion einen Wert pro Handle und Schlüssel beibehält. In unserem Fall MonitorWorkTimebehält die Funktion einen LongWert für jede Taste bei, dh für jede licenseId. Der shiftStartStaat speichert die Startzeit einer Fahrerschicht. Das Statushandle wird in der open()Methode initialisiert , die einmal aufgerufen wird, bevor das erste Ereignis verarbeitet wird.

Schauen wir uns nun die processElement()Methode an.

@Override

public void processElement (

    Taxifahrt,

    Kontext ctx,

    Kollektor out) wirft Exception {

  // Startzeit der letzten Schicht nachschlagen

  Long startTs = shiftStart.value ();

  if (startTs == null ||

    startTs <ritt.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // Dies ist die erste Fahrt einer neuen Schicht.

    startTs =ritt.pickUpTime;

    shiftStart.update (startTs);

    lange endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ritt.licenseId,

      „Sie dürfen neue Passagiere aufnehmen, bis“ + formatter.print (endTs)));

    // Timer registrieren, um den Status in 24 Stunden zu bereinigen

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } else if (startTs <ritt.pickUpTime - ALLOWED_WORK_TIME) {

    // Diese Fahrt begann nach Ablauf der zulässigen Arbeitszeit.

    // es ist ein Verstoß gegen die Vorschriften!

    out.collect (Tuple2.of (ritt.licenseId,

      "Diese Fahrt verstieß gegen die Arbeitszeitbestimmungen."));

  }}

}}

Die processElement()Methode wird für jedes TaxiRideEreignis aufgerufen . Zunächst ruft das Verfahren die Startzeit der Fahrerverschiebung vom Statushandle ab. Wenn der Status keine Startzeit enthält ( startTs == null) oder wenn die letzte Schicht mehr als 20 Stunden ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) früher als die aktuelle Fahrt begonnen hat, ist die aktuelle Fahrt die erste Fahrt einer neuen Schicht. In beiden Fällen startet die Funktion eine neue Schicht, indem sie die Startzeit der Schicht auf die Startzeit der aktuellen Fahrt aktualisiert, eine Nachricht mit der Endzeit der neuen Schicht an den Fahrer sendet und einen Timer zum Bereinigen der Schicht registriert Zustand in 24 Stunden.

Wenn die aktuelle Fahrt nicht die erste Fahrt einer neuen Schicht ist, prüft die Funktion, ob sie gegen die Arbeitszeitregelung verstößt, dh ob sie mehr als 12 Stunden später als der Beginn der aktuellen Schicht des Fahrers gestartet wurde. In diesem Fall sendet die Funktion eine Nachricht, um den Fahrer über den Verstoß zu informieren.

Die processElement()Methode der MonitorWorkTimeFunktion registriert einen Timer, um den Zustand 24 Stunden nach Beginn einer Schicht zu bereinigen. Das Entfernen eines nicht mehr benötigten Zustands ist wichtig, um zu verhindern, dass der Zustand aufgrund eines undichten Zustands wächst. Ein Timer wird ausgelöst, wenn die Zeit der Anwendung den Zeitstempel des Timers überschreitet. Zu diesem Zeitpunkt wird die onTimer()Methode aufgerufen. Ähnlich wie bei state werden Timer pro Schlüssel verwaltet und die Funktion wird vor dem onTimer()Aufruf der Methode in den Kontext des zugehörigen Schlüssels gestellt . Daher wird der gesamte Statuszugriff auf den Schlüssel geleitet, der bei der Registrierung des Timers aktiv war.

Werfen wir einen Blick auf die onTimer()Methode von MonitorWorkTime.

@Override

public void onTimer (

    lange timerTs,

    OnTimerContext ctx,

    Kollektor out) wirft Exception {

  // den Schichtzustand entfernen, wenn noch keine neue Schicht gestartet wurde.

  Long startTs = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }}

}}

Die processElement()Methode registriert Timer für 24 Stunden, nachdem eine Schicht begonnen hat, den nicht mehr benötigten Zustand zu bereinigen. Das Bereinigen des Status ist die einzige Logik, die die onTimer()Methode implementiert. Wenn ein Timer ausgelöst wird, prüfen wir, ob der Fahrer in der Zwischenzeit eine neue Schicht gestartet hat, dh ob sich die Schichtstartzeit geändert hat. Ist dies nicht der Fall, löschen wir den Schaltzustand für den Fahrer.