Verwendung von Redis für die Echtzeit-Stream-Verarbeitung

Roshan Kumar ist Senior Product Manager bei Redis Labs.

Echtzeit-Streaming-Datenaufnahme ist eine häufige Anforderung für viele Big-Data-Anwendungsfälle. In Bereichen wie IoT, E-Commerce, Sicherheit, Kommunikation, Unterhaltung, Finanzen und Einzelhandel, in denen so viel von einer zeitnahen und genauen datengesteuerten Entscheidungsfindung abhängt, ist die Erfassung und Analyse von Daten in Echtzeit der Kern des Geschäfts.

Das Sammeln, Speichern und Verarbeiten von Streaming-Daten in großen Mengen und mit hoher Geschwindigkeit stellt jedoch architektonische Herausforderungen dar. Ein wichtiger erster Schritt bei der Bereitstellung einer Echtzeit-Datenanalyse besteht darin, sicherzustellen, dass ausreichende Netzwerk-, Rechen-, Speicher- und Speicherressourcen verfügbar sind, um schnelle Datenströme zu erfassen. Der Software-Stack eines Unternehmens muss jedoch der Leistung seiner physischen Infrastruktur entsprechen. Andernfalls werden Unternehmen mit einem massiven Datenstau oder schlimmer noch mit fehlenden oder unvollständigen Daten konfrontiert sein.

Redis ist eine beliebte Wahl für solche schnellen Datenerfassungsszenarien geworden. Redis ist eine leichte In-Memory-Datenbankplattform, die einen Durchsatz von Millionen von Vorgängen pro Sekunde mit einer Latenz von weniger als einer Millisekunde erzielt und dabei nur minimale Ressourcen benötigt. Es bietet auch einfache Implementierungen, die durch seine vielfältigen Datenstrukturen und Funktionen ermöglicht werden.

In diesem Artikel werde ich zeigen, wie Redis Enterprise allgemeine Herausforderungen lösen kann, die mit der Aufnahme und Verarbeitung großer Mengen von Hochgeschwindigkeitsdaten verbunden sind. Wir werden drei verschiedene Ansätze (einschließlich Code) durchlaufen, um einen Twitter-Feed in Echtzeit zu verarbeiten, wobei Redis Pub / Sub, Redis Lists und Redis Sorted Sets verwendet werden. Wie wir sehen werden, spielen alle drei Methoden je nach Anwendungsfall eine Rolle bei der schnellen Datenaufnahme.

Herausforderungen bei der Entwicklung schneller Datenerfassungslösungen

Die schnelle Datenaufnahme ist häufig mit verschiedenen Arten von Komplexität verbunden:

  • Große Datenmengen kommen manchmal in Bursts an. Bursty-Daten erfordern eine Lösung, die große Datenmengen mit minimaler Latenz verarbeiten kann. Idealerweise sollte es in der Lage sein, Millionen von Schreibvorgängen pro Sekunde mit einer Latenz von weniger als einer Millisekunde und mit minimalen Ressourcen auszuführen.
  • Daten aus mehreren Quellen. Datenerfassungslösungen müssen flexibel genug sein, um Daten in vielen verschiedenen Formaten zu verarbeiten, bei Bedarf die Quellidentität beizubehalten und in Echtzeit zu transformieren oder zu normalisieren.
  • Daten, die gefiltert, analysiert oder weitergeleitet werden müssen. Die meisten Datenerfassungslösungen haben einen oder mehrere Abonnenten, die die Daten verwenden. Dies sind häufig unterschiedliche Anwendungen, die an denselben oder unterschiedlichen Standorten mit unterschiedlichen Annahmen funktionieren. In solchen Fällen muss die Datenbank nicht nur die Daten transformieren, sondern auch filtern oder aggregieren, je nach den Anforderungen der konsumierenden Anwendungen.
  • Daten stammen aus geografisch verteilten Quellen. In diesem Szenario ist es häufig praktisch, die Datenerfassungsknoten so zu verteilen, dass sie in der Nähe der Quellen platziert werden. Die Knoten selbst werden Teil der Lösung für die schnelle Datenaufnahme, um Daten aufzunehmen, zu verarbeiten, weiterzuleiten oder umzuleiten.

Umgang mit schneller Datenaufnahme in Redis

Viele Lösungen, die heutzutage eine schnelle Datenaufnahme unterstützen, sind komplex, funktionsreich und für einfache Anforderungen überentwickelt. Redis hingegen ist extrem leicht, schnell und einfach zu bedienen. Mit Clients, die in mehr als 60 Sprachen verfügbar sind, kann Redis problemlos in die gängigen Software-Stacks integriert werden.

Redis bietet Datenstrukturen wie Listen, Sets, sortierte Sets und Hashes, die eine einfache und vielseitige Datenverarbeitung bieten. Redis liefert mehr als eine Million Lese- / Schreibvorgänge pro Sekunde mit einer Latenz von weniger als einer Millisekunde auf einer bescheidenen Commodity-Cloud-Instanz, was es für große Datenmengen äußerst ressourceneffizient macht. Redis unterstützt auch Messaging-Dienste und Client-Bibliotheken in allen gängigen Programmiersprachen und eignet sich daher gut für die Kombination von Hochgeschwindigkeitsdatenerfassung und Echtzeitanalyse. Mit Redis Pub / Sub-Befehlen kann es die Rolle eines Nachrichtenbrokers zwischen Herausgebern und Abonnenten spielen, eine Funktion, die häufig zum Senden von Benachrichtigungen oder Nachrichten zwischen verteilten Datenaufnahmeknoten verwendet wird.

Redis Enterprise erweitert Redis durch nahtlose Skalierung, ständige Verfügbarkeit, automatisierte Bereitstellung und die Möglichkeit, kostengünstigen Flash-Speicher als RAM-Extender zu verwenden, sodass die Verarbeitung großer Datenmengen kostengünstig durchgeführt werden kann.

In den folgenden Abschnitten werde ich erläutern, wie Sie mit Redis Enterprise allgemeine Probleme bei der Datenaufnahme lösen können.

Redis mit der Geschwindigkeit von Twitter

Um die Einfachheit von Redis zu veranschaulichen, werden wir eine Beispiellösung für die schnelle Datenerfassung untersuchen, die Nachrichten aus einem Twitter-Feed sammelt. Das Ziel dieser Lösung ist es, Tweets in Echtzeit zu verarbeiten und sie während der Verarbeitung in die Pipe zu schieben.

Von der Lösung aufgenommene Twitter-Daten werden dann von mehreren Prozessoren auf der ganzen Linie verwendet. Wie in Abbildung 1 dargestellt, handelt es sich in diesem Beispiel um zwei Prozessoren - den englischen Tweet-Prozessor und den Influencer-Prozessor. Jeder Prozessor filtert die Tweets und leitet sie über seine jeweiligen Kanäle an andere Verbraucher weiter. Diese Kette kann so weit gehen, wie es die Lösung erfordert. In unserem Beispiel hören wir jedoch auf der dritten Ebene auf, wo wir populäre Diskussionen unter englischsprachigen und Top-Influencern zusammenfassen.

Redis Labs

Beachten Sie, dass wir das Beispiel der Verarbeitung von Twitter-Feeds verwenden, da die Daten schnell eintreffen und einfach sind. Beachten Sie auch, dass Twitter-Daten unsere schnelle Datenaufnahme über einen einzigen Kanal erreichen. In vielen Fällen, z. B. beim Internet der Dinge (IoT), können mehrere Datenquellen Daten an den Hauptempfänger senden.

Es gibt drei Möglichkeiten, diese Lösung mit Redis zu implementieren: Aufnahme mit Redis Pub / Sub, Aufnahme mit der List-Datenstruktur oder Aufnahme mit der Sorted Set-Datenstruktur. Lassen Sie uns jede dieser Optionen untersuchen.

Mit Redis Pub / Sub einnehmen

Dies ist die einfachste Implementierung einer schnellen Datenaufnahme. Diese Lösung verwendet die Pub / Sub-Funktion von Redis, mit der Anwendungen Nachrichten veröffentlichen und abonnieren können. Wie in Abbildung 2 dargestellt, verarbeitet jede Stufe die Daten und veröffentlicht sie auf einem Kanal. Die nachfolgende Stufe abonniert den Kanal und empfängt die Nachrichten zur weiteren Verarbeitung oder Filterung.

Redis Labs

Vorteile

  • Einfach zu implementieren.
  • Funktioniert gut, wenn die Datenquellen und Prozessoren geografisch verteilt sind.

Nachteile 

  • Die Lösung erfordert, dass die Herausgeber und Abonnenten ständig auf dem Laufenden sind. Abonnenten verlieren Daten, wenn sie gestoppt werden oder wenn die Verbindung unterbrochen wird.
  • Es erfordert mehr Verbindungen. Ein Programm kann nicht dieselbe Verbindung veröffentlichen und abonnieren. Daher benötigt jeder Zwischendatenprozessor zwei Verbindungen - eine zum Abonnieren und eine zum Veröffentlichen. Wenn Sie Redis auf einer DBaaS-Plattform ausführen, ist es wichtig zu überprüfen, ob Ihr Paket oder Ihr Servicelevel die Anzahl der Verbindungen begrenzt.

Ein Hinweis zu Verbindungen

Wenn mehr als ein Client einen Kanal abonniert, überträgt Redis die Daten linear nacheinander an jeden Client. Große Datennutzdaten und viele Verbindungen können zu einer Latenz zwischen einem Herausgeber und seinen Abonnenten führen. Obwohl das Standard-Hard-Limit für die maximale Anzahl von Verbindungen 10.000 beträgt, müssen Sie testen und bewerten, wie viele Verbindungen für Ihre Nutzlast geeignet sind.

Redis verwaltet für jeden Client einen Client-Ausgabepuffer. Die Standardgrenzwerte für den Client-Ausgabepuffer für Pub / Sub sind wie folgt festgelegt:

Client-Output-Buffer-Limit-Pubsub 32 MB 8 MB 60

Mit dieser Einstellung zwingt Redis Clients, die Verbindung unter zwei Bedingungen zu trennen: Wenn der Ausgabepuffer über 32 MB hinaus wächst oder wenn der Ausgabepuffer 60 Sekunden lang konsistent 8 MB Daten enthält.

Dies sind Hinweise darauf, dass Clients die Daten langsamer verbrauchen als veröffentlicht. Sollte eine solche Situation auftreten, versuchen Sie zunächst, die Verbraucher so zu optimieren, dass sie beim Konsumieren der Daten keine Latenz hinzufügen. Wenn Sie feststellen, dass Ihre Clients immer noch nicht verbunden sind, können Sie die Grenzwerte für die client-output-buffer-limit pubsubEigenschaft in redis.conf erhöhen. Beachten Sie bitte, dass Änderungen an den Einstellungen die Latenz zwischen Herausgeber und Abonnent erhöhen können. Änderungen müssen gründlich getestet und verifiziert werden.

Code-Design für die Redis Pub / Sub-Lösung

Redis Labs

Dies ist die einfachste der drei in diesem Dokument beschriebenen Lösungen. Hier sind die wichtigen Java-Klassen aufgeführt, die für diese Lösung implementiert wurden. Laden Sie den Quellcode mit vollständiger Implementierung hier herunter: //github.com/redislabsdemo/IngestPubSub.

Die SubscriberKlasse ist die Kernklasse dieses Entwurfs. Jedes SubscriberObjekt unterhält eine neue Verbindung zu Redis.

Klasse Subscriber erweitert JedisPubSub implementiert Runnable {

       privater Stringname;

       private RedisConnection conn = null;

       private Jedis jedis = null;

       private String subscriberChannel;

       public Subscriber (StringcriberName, String channelName) löst eine Ausnahme aus {

              Name = Abonnentenname;

              SubscriberChannel = ChannelName;

              Thread t = neuer Thread (dies);

              t.start ();

       }}

       @Override

       public void run () {

              Versuchen{

                      conn = RedisConnection.getRedisConnection ();

                      jedis = conn.getJedis ();

                      while (wahr) {

                             jedis.subscribe (this, this.subscriberChannel);

                      }}

              } catch (Ausnahme e) {

                      e.printStackTrace ();

              }}

       }}

       @Override

       public void onMessage (String-Kanal, String-Nachricht) {

              super.onMessage (Kanal, Nachricht);

       }}

}}

Die PublisherKlasse unterhält eine separate Verbindung zu Redis, um Nachrichten auf einem Kanal zu veröffentlichen.

public class Publisher {

       RedisConnection conn = null;

       Jedis jedis = null;

       privater String-Kanal;

       public Publisher (String channelName) löst eine Ausnahme aus {

              channel = channelName;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();

       }}

       public void Publish (String msg) löst eine Ausnahme aus {

              jedis.publish (Kanal, Nachricht);

       }}

}}

Die EnglishTweetFilter, InfluencerTweetFilter, HashTagCollectorund InfluencerCollectorFilter erweitern Subscriber, die sie an die Eingangskanäle hören können. Da Sie zum Abonnieren und Veröffentlichen separate Redis-Verbindungen benötigen, verfügt jede Filterklasse über ein eigenes RedisConnectionObjekt. Filter hören die neuen Nachrichten in ihren Kanälen in einer Schleife ab. Hier ist der Beispielcode der EnglishTweetFilterKlasse:

öffentliche Klasse EnglishTweetFilter erweitert Abonnenten

{

       private RedisConnection conn = null;

       private Jedis jedis = null; 

       private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) löst eine Ausnahme aus {

              super (Name, AbonnentKanal);

              this.publisherChannel = publisherChannel;

              conn = RedisConnection.getRedisConnection ();

              jedis = conn.getJedis ();           

       }}

       @Override

       public void onMessage (StringcriberChannel, String message) {

              JsonParser jsonParser = neuer JsonParser ();

              JsonElement jsonElement = jsonParser.parse (Nachricht);

              JsonObject jsonObject = jsonElement.getAsJsonObject ();

              // Nachrichten filtern: Nur englische Tweets veröffentlichen           

if (jsonObject.get ("lang")! = null &&

       jsonObject.get ("lang"). getAsString (). equals ("en")) {

                      jedis.publish (publisherChannel, message);

              }}

       }}

}}

Die PublisherKlasse verfügt über eine Veröffentlichungsmethode, mit der Nachrichten auf dem erforderlichen Kanal veröffentlicht werden.

public class Publisher {

.

.     

       public void Publish (String msg) löst eine Ausnahme aus {

              jedis.publish (Kanal, Nachricht);

       }}

.

}}

Die Hauptklasse liest Daten aus dem Aufnahmestream und sendet sie an den AllDataKanal. Die Hauptmethode dieser Klasse startet alle Filterobjekte.

öffentliche Klasse IngestPubSub

{

.

       public void start () löst eine Ausnahme aus {

       .

       .

              Publisher = neuer Publisher ("AllData");

              englishFilter = neuer EnglishTweetFilter ("English Filter", "AllData",

                                           "EnglishTweets");

              influencerFilter = neuer InfluencerTweetFilter („Influencer Filter“,

                                           "AllData", "InfluencerTweets");

              hashtagCollector = neuer HashTagCollector ("Hashtag Collector", 

                                           "EnglishTweets");

              influencerCollector = neuer InfluencerCollector („Influencer Collector“,

                                           "InfluencerTweets");

       .

       .

}}

Mit Redis-Listen einnehmen

Die Listendatenstruktur in Redis macht die Implementierung einer Warteschlangenlösung einfach und unkompliziert. Bei dieser Lösung schiebt der Produzent jede Nachricht in den hinteren Bereich der Warteschlange, und der Abonnent fragt die Warteschlange ab und zieht neue Nachrichten vom anderen Ende ab.

Redis Labs

Vorteile

  • Diese Methode ist bei Verbindungsverlust zuverlässig. Sobald Daten in die Listen verschoben wurden, bleiben sie dort erhalten, bis die Abonnenten sie lesen. Dies gilt auch dann, wenn die Teilnehmer gestoppt sind oder ihre Verbindung zum Redis-Server verlieren.
  • Produzenten und Konsumenten benötigen keine Verbindung zwischen ihnen.

Nachteile

  • Sobald Daten aus der Liste abgerufen wurden, werden sie entfernt und können nicht mehr abgerufen werden. Sofern die Verbraucher die Daten nicht beibehalten, gehen sie verloren, sobald sie verbraucht werden.
  • Jeder Verbraucher benötigt eine separate Warteschlange, in der mehrere Kopien der Daten gespeichert werden müssen.

Code-Design für die Redis Lists-Lösung

Redis Labs

Sie können den Quellcode für die Redis Lists-Lösung hier herunterladen: //github.com/redislabsdemo/IngestList. Die Hauptklassen dieser Lösung werden unten erläutert.

MessageListbettet die Redis List-Datenstruktur ein. Die push()Methode verschiebt die neue Nachricht links von der Warteschlange und pop()wartet auf eine neue Nachricht von rechts, wenn die Warteschlange leer ist.

öffentliche Klasse MessageList {

       protected String name = "MyList"; // Name

.

.     

       public void push (String msg) löst eine Ausnahme aus {

              jedis.lpush (Name, Nachricht); // Links drücken

       }}

       public String pop () löst eine Ausnahme aus {

              return jedis.brpop (0, name) .toString ();

       }}

.

.

}}

MessageListenerist eine abstrakte Klasse, die Listener- und Publisher-Logik implementiert. Ein MessageListenerObjekt hört nur eine Liste ab, kann jedoch auf mehreren Kanälen ( MessageFilterObjekten) veröffentlichen. Diese Lösung erfordert ein separates MessageFilterObjekt für jeden Teilnehmer in der Pipe.

Klasse MessageListener implementiert Runnable {

       privater String name = null;

       private MessageList inboundList = null;

       Map outBoundMsgFilters = new HashMap ();

.

.     

       public void registerOutBoundMessageList (MessageFilter msgFilter) {

              if (msgFilter! = null) {

                      if (outBoundMsgFilters.get (msgFilter.name) == null) {

                             outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }}

              }}

       }}

.

.

       @Override

       public void run () {

.

                      while (wahr) {

                             String msg = inboundList.pop ();

                             processMessage (msg);

                      }}                                  

.

       }}

.

       protected void pushMessage (String msg) löst eine Ausnahme aus {

              Set outBoundMsgNames = outBoundMsgFilters.keySet ();

              for (String name: outBoundMsgNames) {

                      MessageFilter msgList = outBoundMsgFilters.get (Name);

                      msgList.filterAndPush (msg);

              }}

       }}

}}

MessageFilterist eine Elternklasse, die die filterAndPush()Methode erleichtert . Während Daten durch das Aufnahmesystem fließen, werden sie häufig gefiltert oder transformiert, bevor sie zur nächsten Stufe gesendet werden. Klassen, die die MessageFilterKlasse erweitern, überschreiben die filterAndPush()Methode und implementieren ihre eigene Logik, um die gefilterte Nachricht in die nächste Liste zu verschieben.

öffentliche Klasse MessageFilter {

       MessageList messageList = null;

.

.

       public void filterAndPush (String msg) löst eine Ausnahme aus {

              messageList.push (msg);

       }}

.

.     

}}

AllTweetsListenerist eine Beispielimplementierung einer MessageListenerKlasse. Dadurch werden alle Tweets auf dem AllDataKanal abgehört und die Daten an EnglishTweetsFilterund veröffentlicht InfluencerFilter.

öffentliche Klasse AllTweetsListener erweitert MessageListener {

.

.     

       public static void main (String [] args) löst eine Ausnahme aus {

              MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (neu

              EnglishTweetsFilter ("EnglishTweetsFilter", "EnglishTweets"));

              allTweetsProcessor.registerOutBoundMessageList (neu

                             InfluencerFilter ("InfluencerFilter", "Influencer"));

              allTweetsProcessor.start ();

       }}

.

.

}}

EnglishTweetsFiltererstreckt sich MessageFilter. Diese Klasse implementiert Logik, um nur die Tweets auszuwählen, die als englische Tweets markiert sind. Der Filter verwirft nicht englische Tweets und schiebt englische Tweets zur nächsten Liste.

öffentliche Klasse EnglishTweetsFilter erweitert MessageFilter {

       public EnglishTweetsFilter (String name, String listName) löst eine Ausnahme aus {

              super (name, listName);

       }}

       @Override

       public void filterAndPush (String message) löst eine Ausnahme aus {

              JsonParser jsonParser = neuer JsonParser ();

              JsonElement jsonElement = jsonParser.parse (Nachricht);

              JsonArray jsonArray = jsonElement.getAsJsonArray ();

              JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

              if (jsonObject.get ("lang")! = null &&

jsonObject.get ("lang"). getAsString (). equals ("en")) {

                             Jedis jedis = super.getJedisInstance ();

                             if (jedis! = null) {

                                    jedis.lpush (super.name, jsonObject.toString ());

                             }}

              }}

       }}

}}