Entwickelt für Echtzeit: Big Data Messaging mit Apache Kafka, Teil 1

Als die Big-Data-Bewegung begann, konzentrierte sie sich hauptsächlich auf die Stapelverarbeitung. Verteilte Datenspeicherungs- und Abfragetools wie MapReduce, Hive und Pig wurden entwickelt, um Daten nicht kontinuierlich, sondern stapelweise zu verarbeiten. Unternehmen führten jede Nacht mehrere Jobs aus, um Daten aus einer Datenbank zu extrahieren, sie dann zu analysieren, zu transformieren und schließlich zu speichern. In jüngerer Zeit haben Unternehmen die Macht der Analyse und Verarbeitung von Daten und Ereignissen entdeckt , wie sie geschehen , nicht nur einmal alle paar Stunden. Die meisten herkömmlichen Messagingsysteme lassen sich jedoch nicht für die Verarbeitung von Big Data in Echtzeit skalieren. Daher haben die Ingenieure von LinkedIn Apache Kafka entwickelt und als Open-Source-Lösung bereitgestellt: ein verteiltes Messaging-Framework, das die Anforderungen von Big Data durch Skalierung auf Standardhardware erfüllt.

In den letzten Jahren hat sich Apache Kafka herausgebildet, um eine Vielzahl von Anwendungsfällen zu lösen. Im einfachsten Fall kann es sich um einen einfachen Puffer zum Speichern von Anwendungsprotokollen handeln. In Kombination mit einer Technologie wie Spark Streaming können Datenänderungen verfolgt und Maßnahmen für diese Daten ergriffen werden, bevor sie an einem endgültigen Ziel gespeichert werden. Der Vorhersagemodus von Kafka macht es zu einem leistungsstarken Tool zum Erkennen von Betrug, z. B. zum Überprüfen der Gültigkeit einer Kreditkartentransaktion, wenn dies geschieht, und zum Warten auf die Stapelverarbeitung Stunden später.

Dieses zweiteilige Tutorial stellt Kafka vor und beginnt mit der Installation und Ausführung in Ihrer Entwicklungsumgebung. Sie erhalten einen Überblick über die Architektur von Kafka, gefolgt von einer Einführung in die Entwicklung eines sofort einsatzbereiten Apache Kafka-Messagingsystems. Schließlich erstellen Sie eine benutzerdefinierte Produzenten- / Konsumentenanwendung, die Nachrichten über einen Kafka-Server sendet und konsumiert. In der zweiten Hälfte des Tutorials erfahren Sie, wie Sie Nachrichten partitionieren und gruppieren und steuern, welche Nachrichten ein Kafka-Verbraucher verwendet.

Was ist Apache Kafka?

Apache Kafka ist ein Messaging-System, das speziell für Big Data entwickelt wurde. Ähnlich wie bei Apache ActiveMQ oder RabbitMq ermöglicht Kafka Anwendungen, die auf verschiedenen Plattformen basieren, die Kommunikation über asynchrone Nachrichtenübermittlung. Kafka unterscheidet sich jedoch in wesentlichen Punkten von diesen traditionelleren Messagingsystemen:

  • Es ist so konzipiert, dass es horizontal skaliert, indem mehr Commodity-Server hinzugefügt werden.
  • Es bietet einen viel höheren Durchsatz sowohl für Hersteller- als auch für Verbraucherprozesse.
  • Es kann verwendet werden, um sowohl Batch- als auch Echtzeit-Anwendungsfälle zu unterstützen.
  • JMS, die nachrichtenorientierte Middleware-API von Java, wird nicht unterstützt.

Apache Kafkas Architektur

Bevor wir uns mit Kafkas Architektur befassen, sollten Sie die grundlegende Terminologie kennen:

  • Ein Produzent ist ein Prozess, der eine Nachricht zu einem Thema veröffentlichen kann.
  • Ein Verbraucher ist ein Prozess, der ein oder mehrere Themen abonnieren und zu Themen veröffentlichte Nachrichten verwenden kann.
  • Eine Themenkategorie ist der Name des Feeds, in dem Nachrichten veröffentlicht werden.
  • Ein Broker ist ein Prozess, der auf einem einzelnen Computer ausgeführt wird.
  • Ein Cluster ist eine Gruppe von Brokern, die zusammenarbeiten.

Die Architektur von Apache Kafka ist sehr einfach, was in einigen Systemen zu einer besseren Leistung und einem besseren Durchsatz führen kann. Jedes Thema in Kafka ist wie eine einfache Protokolldatei. Wenn ein Produzent eine Nachricht veröffentlicht, hängt der Kafka-Server sie an das Ende der Protokolldatei für das angegebene Thema an. Der Server weist auch einen Versatz zu , bei dem es sich um eine Nummer handelt, mit der jede Nachricht dauerhaft identifiziert wird. Wenn die Anzahl der Nachrichten zunimmt, nimmt der Wert jedes Versatzes zu; Wenn der Produzent beispielsweise drei Nachrichten veröffentlicht, erhält die erste möglicherweise einen Versatz von 1, die zweite einen Versatz von 2 und die dritte einen Versatz von 3.

Wenn der Kafka-Consumer zum ersten Mal gestartet wird, sendet er eine Pull-Anforderung an den Server und fordert Sie auf, Nachrichten für ein bestimmtes Thema mit einem Versatzwert über 0 abzurufen. Der Server überprüft die Protokolldatei für dieses Thema und gibt die drei neuen Nachrichten zurück . Der Verbraucher verarbeitet die Nachrichten und sendet dann eine Anforderung für Nachrichten mit einem Versatz von mehr als 3 usw.

In Kafka ist der Client dafür verantwortlich, sich die Offset-Anzahl zu merken und Nachrichten abzurufen. Der Kafka-Server verfolgt oder verwaltet den Nachrichtenverbrauch nicht. Standardmäßig speichert ein Kafka-Server eine Nachricht sieben Tage lang. Ein Hintergrundthread auf dem Server überprüft und löscht Nachrichten, die sieben Tage oder älter sind. Ein Verbraucher kann auf Nachrichten zugreifen, solange sie sich auf dem Server befinden. Es kann eine Nachricht mehrmals lesen und sogar Nachrichten in umgekehrter Reihenfolge des Empfangs lesen. Wenn der Verbraucher die Nachricht jedoch nicht vor Ablauf der sieben Tage abruft, wird diese Nachricht übersehen.

Kafka Benchmarks

Die Verwendung der Produktion durch LinkedIn und andere Unternehmen hat gezeigt, dass Apache Kafka bei richtiger Konfiguration täglich Hunderte von Gigabyte an Daten verarbeiten kann. Im Jahr 2011 verwendeten drei LinkedIn-Ingenieure Benchmark-Tests, um zu zeigen, dass Kafka einen viel höheren Durchsatz als ActiveMQ und RabbitMQ erzielen kann.

Apache Kafka schnelles Setup und Demo

In diesem Lernprogramm wird eine benutzerdefinierte Anwendung erstellt. Beginnen wir jedoch mit der Installation und dem Testen einer Kafka-Instanz bei einem sofort einsatzbereiten Hersteller und Verbraucher.

  1. Besuchen Sie die Kafka-Download-Seite, um die neueste Version zu installieren (0.9 zum jetzigen Zeitpunkt).
  2. Extrahieren Sie die Binärdateien in einen software/kafkaOrdner. Für die aktuelle Version ist es software/kafka_2.11-0.9.0.0.
  3. Ändern Sie Ihr aktuelles Verzeichnis so, dass es auf den neuen Ordner verweist.
  4. Starten Sie den Zookeeper-Server, indem Sie den folgenden Befehl ausführen : bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Starten Sie den Kafka-Server, indem Sie Folgendes ausführen : bin/kafka-server-start.sh config/server.properties.
  6. Erstellen Sie ein Testthema, das Sie zum Testen verwenden können : bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Starten Sie eine einfache Konsolen Verbraucher , die zu einem bestimmten Thema veröffentlicht Nachrichten konsumieren können, wie zum Beispiel javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Starten Sie eine einfache Produzenten-Konsole, die Nachrichten zum Testthema veröffentlichen kann : bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Geben Sie ein oder zwei Nachrichten in die Produzenten-Konsole ein. Ihre Nachrichten sollten in der Verbraucherkonsole angezeigt werden.

Beispielanwendung mit Apache Kafka

Sie haben gesehen, wie Apache Kafka sofort funktioniert. Als nächstes entwickeln wir eine benutzerdefinierte Produzenten- / Konsumentenanwendung. Der Produzent ruft Benutzereingaben von der Konsole ab und sendet jede neue Zeile als Nachricht an einen Kafka-Server. Der Verbraucher ruft Nachrichten für ein bestimmtes Thema ab und druckt sie auf der Konsole aus. Die Hersteller- und Verbraucherkomponenten in diesem Fall sind Ihre eigenen Implementierungen von kafka-console-producer.shund kafka-console-consumer.sh.

Beginnen wir mit der Erstellung einer Producer.javaKlasse. Diese Client-Klasse enthält Logik zum Lesen von Benutzereingaben von der Konsole und zum Senden dieser Eingabe als Nachricht an den Kafka-Server.

Wir konfigurieren den Produzenten, indem wir ein Objekt aus der java.util.PropertiesKlasse erstellen und seine Eigenschaften festlegen. Die ProducerConfig-Klasse definiert alle verfügbaren Eigenschaften, aber die Standardwerte von Kafka sind für die meisten Anwendungen ausreichend. Für die Standardkonfiguration müssen nur drei obligatorische Eigenschaften festgelegt werden:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

Im Fall der Beispielanwendung wissen wir, dass der Produzent ByteArraySerializerfür den Schlüssel und StringSerializerfür den Wert verwendet. Auf der Client-Seite müssen wir daher org.apache.kafka.common.serialization.ByteArrayDeserializerfür den Schlüssel und org.apache.kafka.common.serialization.StringDeserializerfür den Wert verwenden. Durch Festlegen dieser Klassen als Werte für KEY_DESERIALIZER_CLASS_CONFIGund VALUE_DESERIALIZER_CLASS_CONFIGermöglicht es dem Verbraucher, byte[]vom Hersteller gesendete codierte Typen zu deserialisieren .

Schließlich müssen wir den Wert von einstellen GROUP_ID_CONFIG. Dies sollte ein Gruppenname im Zeichenfolgenformat sein. Ich werde gleich mehr über diese Konfiguration erklären. Schauen Sie sich zunächst den Kafka-Verbraucher mit den vier obligatorischen Eigenschaften an: