Entwickelt für Echtzeit: Big Data Messaging mit Apache Kafka, Teil 2

In der ersten Hälfte dieser JavaWorld-Einführung in Apache Kafka haben Sie mit Kafka einige kleine Hersteller- / Verbraucheranwendungen entwickelt. In diesen Übungen sollten Sie mit den Grundlagen des Apache Kafka-Nachrichtensystems vertraut sein. In dieser zweiten Hälfte lernen Sie, wie Sie Partitionen verwenden, um die Last zu verteilen und Ihre Anwendung horizontal zu skalieren und bis zu Millionen von Nachrichten pro Tag zu verarbeiten. Außerdem erfahren Sie, wie Kafka Nachrichten-Offsets verwendet, um die komplexe Nachrichtenverarbeitung zu verfolgen und zu verwalten, und wie Sie Ihr Apache Kafka-Nachrichtensystem vor einem Ausfall schützen, falls ein Verbraucher ausfällt. Wir werden die Beispielanwendung aus Teil 1 sowohl für Publish-Subscribe- als auch für Point-to-Point-Anwendungsfälle entwickeln.

Partitionen in Apache Kafka

Themen in Kafka können in Partitionen unterteilt werden. Wenn Sie beispielsweise ein Thema mit dem Namen Demo erstellen, können Sie es so konfigurieren, dass es drei Partitionen enthält. Der Server würde drei Protokolldateien erstellen, eine für jede der Demo-Partitionen. Wenn ein Produzent eine Nachricht zu dem Thema veröffentlicht, weist er dieser Nachricht eine Partitions-ID zu. Der Server hängt die Nachricht dann nur für diese Partition an die Protokolldatei an.

Wenn Sie dann zwei Consumer gestartet haben, weist der Server dem ersten Consumer möglicherweise die Partitionen 1 und 2 und dem zweiten Consumer die Partitionen 3 zu. Jeder Verbraucher würde nur von seinen zugewiesenen Partitionen lesen. Das für drei Partitionen konfigurierte Demo-Thema ist in Abbildung 1 dargestellt.

Stellen Sie sich zur Erweiterung des Szenarios einen Kafka-Cluster mit zwei Brokern vor, die in zwei Maschinen untergebracht sind. Wenn Sie das Demo-Thema partitioniert haben, haben Sie es so konfiguriert, dass es zwei Partitionen und zwei Replikate enthält. Bei dieser Art der Konfiguration weist der Kafka-Server die beiden Partitionen den beiden Brokern in Ihrem Cluster zu. Jeder Broker wäre der Anführer für eine der Partitionen.

Wenn ein Produzent eine Nachricht veröffentlichte, ging diese an den Partitionsleiter. Der Leiter würde die Nachricht nehmen und an die Protokolldatei auf dem lokalen Computer anhängen. Der zweite Broker würde dieses Festschreibungsprotokoll passiv auf seinen eigenen Computer replizieren. Wenn der Partitionsleiter ausfällt, wird der zweite Broker zum neuen Leiter und beginnt, Clientanforderungen zu bearbeiten. Auf die gleiche Weise würde diese Anforderung, wenn ein Verbraucher eine Anforderung an eine Partition sendet, zuerst an den Partitionsleiter gesendet, der die angeforderten Nachrichten zurückgibt.

Vorteile der Partitionierung

Berücksichtigen Sie die Vorteile der Partitionierung eines Kafka-basierten Messagingsystems:

  1. Skalierbarkeit : In einem System mit nur einer Partition werden für ein Thema veröffentlichte Nachrichten in einer Protokolldatei gespeichert, die auf einem einzelnen Computer vorhanden ist. Die Anzahl der Nachrichten für ein Thema muss in eine einzelne Festschreibungsprotokolldatei passen, und die Größe der gespeicherten Nachrichten darf niemals größer sein als der Speicherplatz des Computers. Durch das Partitionieren eines Themas können Sie Ihr System skalieren, indem Sie Nachrichten auf verschiedenen Computern in einem Cluster speichern. Wenn Sie beispielsweise 30 Gigabyte (GB) Nachrichten für das Demo-Thema speichern möchten, können Sie einen Kafka-Cluster mit drei Computern mit jeweils 10 GB Speicherplatz erstellen. Dann würden Sie das Thema so konfigurieren, dass es drei Partitionen hat.
  2. Serverlastausgleich : Mit mehreren Partitionen können Sie Nachrichtenanforderungen auf mehrere Broker verteilen. Wenn Sie beispielsweise ein Thema hatten, das 1 Million Nachrichten pro Sekunde verarbeitete, können Sie es in 100 Partitionen aufteilen und Ihrem Cluster 100 Broker hinzufügen. Jeder Broker ist führend bei einzelnen Partitionen und für die Beantwortung von nur 10.000 Client-Anfragen pro Sekunde verantwortlich.
  3. Consumer-Load-Balancing : Ähnlich wie beim Server-Load-Balancing können Sie durch das Hosten mehrerer Consumer auf verschiedenen Computern die Consumer-Last verteilen. Angenommen, Sie möchten 1 Million Nachrichten pro Sekunde aus einem Thema mit 100 Partitionen verbrauchen. Sie können 100 Konsumenten erstellen und parallel ausführen. Der Kafka-Server würde jedem Verbraucher eine Partition zuweisen, und jeder Verbraucher würde 10.000 Nachrichten parallel verarbeiten. Da Kafka jede Partition nur einem Verbraucher zuweist, wird innerhalb der Partition jede Nachricht der Reihe nach konsumiert.

Zwei Möglichkeiten zur Partitionierung

Der Produzent ist dafür verantwortlich, zu entscheiden, auf welche Partition eine Nachricht verschoben wird. Der Produzent hat zwei Möglichkeiten, diese Zuordnung zu steuern:

  • Benutzerdefinierter Partitionierer : Sie können eine Klasse erstellen, die die org.apache.kafka.clients.producer.PartitionerSchnittstelle implementiert . Dieser Benutzer Partitionerimplementiert die Geschäftslogik, um zu entscheiden, wohin Nachrichten gesendet werden.
  • DefaultPartitioner : Wenn Sie keine benutzerdefinierte Partitioniererklasse erstellen, wird standardmäßig die org.apache.kafka.clients.producer.internals.DefaultPartitionerKlasse verwendet. Der Standardpartitionierer ist in den meisten Fällen ausreichend und bietet drei Optionen:
    1. Manuell : Wenn Sie eine erstellen ProducerRecord, verwenden Sie den überladenen Konstruktor new ProducerRecord(topicName, partitionId,messageKey,message), um eine Partitions-ID anzugeben.
    2. Hashing (lokalitätsabhängig) : Wenn Sie a erstellen ProducerRecord, geben Sie a messageKeydurch Aufrufen an new ProducerRecord(topicName,messageKey,message). DefaultPartitionerverwendet den Hash des Schlüssels, um sicherzustellen, dass alle Nachrichten für denselben Schlüssel an denselben Produzenten gehen. Dies ist der einfachste und gebräuchlichste Ansatz.
    3. Sprühen (Random Load Balancing) : Wenn Sie nicht steuern möchten, an welche Partitionsnachrichten gesendet wird, rufen new ProducerRecord(topicName, message)Sie einfach an, um Ihre zu erstellen ProducerRecord. In diesem Fall sendet der Partitionierer im Round-Robin-Verfahren Nachrichten an alle Partitionen, um eine ausgeglichene Serverlast sicherzustellen.

Partitionieren einer Apache Kafka-Anwendung

Für das einfache Hersteller / Verbraucher-Beispiel in Teil 1 haben wir a verwendet DefaultPartitioner. Jetzt versuchen wir stattdessen, einen benutzerdefinierten Partitionierer zu erstellen. Nehmen wir für dieses Beispiel an, wir haben eine Einzelhandels-Website, auf der Verbraucher Produkte überall auf der Welt bestellen können. Aufgrund der Nutzung wissen wir, dass sich die meisten Verbraucher entweder in den USA oder in Indien befinden. Wir möchten unsere Anwendung so aufteilen, dass Bestellungen aus den USA oder Indien an die jeweiligen Verbraucher gesendet werden, während Bestellungen von anderen Orten an einen dritten Verbraucher gehen.

Zu Beginn erstellen wir eine CountryPartitioner, die die org.apache.kafka.clients.producer.PartitionerSchnittstelle implementiert . Wir müssen die folgenden Methoden implementieren:

  1. Kafka ruft configure () auf, wenn wir die PartitionerKlasse mit einer Mapder Konfigurationseigenschaften initialisieren . Diese Methode initialisiert Funktionen, die für die Geschäftslogik der Anwendung spezifisch sind, z. B. das Herstellen einer Verbindung zu einer Datenbank. In diesem Fall möchten wir einen ziemlich generischen Partitionierer, der countryNameals Eigenschaft verwendet wird. Wir können dann configProperties.put("partitions.0","USA")den Nachrichtenfluss Partitionen zuordnen. In Zukunft können wir dieses Format verwenden, um zu ändern, welche Länder ihre eigene Partition erhalten.
  2. Die ProducerAPI ruft partition () einmal für jede Nachricht auf. In diesem Fall verwenden wir es, um die Nachricht zu lesen und den Namen des Landes aus der Nachricht zu analysieren. Wenn der Name des Landes in der ist countryToPartitionMap, wird es partitionIdgespeichert in der zurückgegeben Map. Wenn nicht, wird der Wert des Landes gehasht und verwendet, um zu berechnen, zu welcher Partition es gehen soll.
  3. Wir rufen close () auf, um den Partitionierer herunterzufahren. Durch diese Methode wird sichergestellt, dass alle während der Initialisierung erworbenen Ressourcen beim Herunterfahren bereinigt werden.

Beachten Sie, dass configure()der Kafka-Produzent beim Aufrufen von Kafka alle Eigenschaften, die wir für den Produzenten konfiguriert haben, an die PartitionerKlasse übergibt. Es ist wichtig, dass wir nur die Eigenschaften lesen, die mit beginnen partitions., sie analysieren, um die zu erhalten partitionId, und die ID in speichern countryToPartitionMap.

Nachfolgend finden Sie unsere benutzerdefinierte Implementierung der PartitionerSchnittstelle.

Listing 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

Die ProducerKlasse in Listing 2 (unten) ist unserem einfachen Produzenten aus Teil 1 sehr ähnlich, wobei zwei Änderungen fett markiert sind:

  1. Wir setzen eine Konfigurationseigenschaft mit einem Schlüssel gleich dem Wert von ProducerConfig.PARTITIONER_CLASS_CONFIG, der dem vollständig qualifizierten Namen unserer CountryPartitionerKlasse entspricht. Wir setzen auch countryNameauf partitionIdund ordnen so die Eigenschaften zu, an die wir übergeben möchten CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign() to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Angenommen, Sie erstellen ein neues Thema mit drei Partitionen. Wenn Sie den ersten Consumer für das neue Thema starten, weist Kafka alle drei Partitionen demselben Consumer zu. Wenn Sie dann einen zweiten Consumer starten, weist Kafka alle Partitionen neu zu und weist dem ersten Consumer eine Partition und dem zweiten Consumer die verbleibenden zwei Partitionen zu. Wenn Sie einen dritten Consumer hinzufügen, weist Kafka die Partitionen erneut zu, sodass jedem Consumer eine einzelne Partition zugewiesen wird. Wenn Sie den vierten und fünften Konsumenten starten, haben drei der Konsumenten eine zugewiesene Partition, die anderen erhalten jedoch keine Nachrichten. Wenn eine der ersten drei Partitionen ausfällt, verwendet Kafka dieselbe Partitionierungslogik, um die Partition dieses Verbrauchers einem der zusätzlichen Verbraucher neu zuzuweisen.