apache-kafka - rip tutorial · 2019-01-18 · kapitel 1: erste schritte mit apache-kafka...

33
apache-kafka #apache- kafka

Upload: others

Post on 15-Aug-2020

18 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

apache-kafka

#apache-

kafka

Page 2: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Inhaltsverzeichnis

Über 1

Kapitel 1: Erste Schritte mit Apache-Kafka 2

Bemerkungen 2

Examples 2

Installation oder Setup 2

Einführung 3

Was bedeutet 3

Es wird für zwei breite Anwendungsklassen verwendet: 3

Installation 4

Erstellen Sie ein Thema 4

Nachrichten senden und empfangen 4

Stoppen Sie Kafka 5

Starten Sie ein Multi-Broker-Cluster 5

Erstellen Sie ein repliziertes Thema 6

Testfehlertoleranz 6

Aufräumen 7

Kapitel 2: Benutzerdefinierter Serializer / Deserializer 8

Einführung 8

Syntax 8

Parameter 8

Bemerkungen 8

Examples 8

Gson (de) Serialisierer 9

Serializer 9

Code 9

Verwendungszweck 9

Deserialisierer 9

Code 10

Verwendungszweck 10

Page 3: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 3: Kafka-Konsolen-Tools 12

Einführung 12

Examples 12

kafka-themen 12

Kafka-Konsolenproduzent 13

Kafka-Konsolen-Verbraucher 13

kafka-simple-consumer-shell 13

Kafka-Verbrauchergruppen 14

Kapitel 4: Produzent / Verbraucher in Java 16

Einführung 16

Examples 16

SimpleConsumer (Kafka> = 0.9.0) 16

Konfiguration und Initialisierung 16

Erstellung von Verbrauchern und Themenabonnement 17

Grundlegende Umfrage 18

Der Code 18

Grundlegendes Beispiel 18

Laufbares Beispiel 19

SimpleProducer (kafka> = 0,9) 20

Konfiguration und Initialisierung 20

Nachrichten senden 22

Der Code 22

Kapitel 5: Verbrauchergruppen und Offset-Management 24

Parameter 24

Examples 24

Was ist eine Verbrauchergruppe? 24

Consumer Offset Management und Fehlertoleranz 25

Offsets festlegen 25

Semantik begangener Offsets 26

Bearbeitungsgarantien 26

Wie kann ich das Thema von Anfang an lesen? 27

Page 4: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Starten Sie eine neue Consumer Group 27

Verwenden Sie dieselbe Gruppen-ID erneut 27

Verwenden Sie dieselbe Gruppen-ID und Commit erneut 28

Credits 29

Page 5: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Über

You can share this PDF with anyone you feel could benefit from it, downloaded the latest version from: apache-kafka

It is an unofficial and free apache-kafka ebook created for educational purposes. All the content is extracted from Stack Overflow Documentation, which is written by many hardworking individuals at Stack Overflow. It is neither affiliated with Stack Overflow nor official apache-kafka.

The content is released under Creative Commons BY-SA, and the list of contributors to each chapter are provided in the credits section at the end of this book. Images may be copyright of their respective owners unless otherwise specified. All trademarks and registered trademarks are the property of their respective company owners.

Use the content presented in this book at your own risk; it is not guaranteed to be correct nor accurate, please send your feedback and corrections to [email protected]

https://riptutorial.com/de/home 1

Page 6: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 1: Erste Schritte mit Apache-Kafka

Bemerkungen

Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als verteilter, partitionierter und replizierter Commit-Protokolldienst implementiert ist.

Genommen von der offiziellen Kafka- Site

Schnell

Ein einzelner Kafka-Broker kann Hunderte von Megabytes an Lese- und Schreibvorgängen pro Sekunde von Tausenden von Clients verarbeiten.

Skalierbar

Kafka ist so konzipiert, dass ein einzelner Cluster als zentrales Datengerüst für eine große Organisation dienen kann. Sie kann ohne Ausfallzeiten elastisch und transparent erweitert werden. Datenströme werden partitioniert und auf einen Cluster von Maschinen verteilt, um Datenströme zu ermöglichen, die größer sind als die Kapazität einer einzelnen Maschine, und Cluster von koordinierten Verbrauchern

Dauerhaft

Nachrichten werden auf der Festplatte gespeichert und innerhalb des Clusters repliziert, um Datenverlust zu vermeiden. Jeder Broker kann Terabytes von Nachrichten ohne Auswirkungen auf die Leistung verarbeiten.

Verteilt nach Design

Kafka verfügt über ein modernes, clusterorientiertes Design, das eine hohe Haltbarkeit und Fehlertoleranz garantiert.

Examples

Installation oder Setup

Schritt 1 Installieren Sie Java 7 oder 8

Schritt 2 Laden Sie Apache Kafka herunter: http://kafka.apache.org/downloads.html

Zum Beispiel werden wir versuchen, Apache Kafka 0.10.0.0 herunterzuladen

Schritt 3 Extrahieren Sie die komprimierte Datei.

Unter Linux:

https://riptutorial.com/de/home 2

Page 7: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

tar -xzf kafka_2.11-0.10.0.0.tgz

Unter Fenster: Rechtsklick -> Hier extrahieren

Schritt 4 Starten Sie Zookeeper

cd kafka_2.11-0.10.0.0

Linux:

bin/zookeeper-server-start.sh config/zookeeper.properties

Windows:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Schritt 5 Starten Sie den Kafka-Server

Linux:

bin/kafka-server-start.sh config/server.properties

Windows:

bin/windows/kafka-server-start.bat config/server.properties

Einführung

Apache Kafka ™ ist eine verteilte Streaming-Plattform.

Was bedeutet

Mit 1-It können Sie Streams von Datensätzen veröffentlichen und abonnieren. In dieser Hinsicht ähnelt es einer Nachrichtenwarteschlange oder einem Enterprise-Messagingsystem.

2-It ermöglicht das Speichern von Datensatz-Streams fehlertolerant.

3-It ermöglicht das Verarbeiten von Datensätzen, sobald diese auftreten.

Es wird für zwei breite Anwendungsklassen verwendet:

Echtzeit-Streaming-Datenpipelines, die Daten zuverlässig zwischen Systemen oder Anwendungen abrufen

2 - Erstellen von Echtzeit-Streaming-Anwendungen, die Datenströme transformieren oder darauf reagieren

https://riptutorial.com/de/home 3

Page 8: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kafka-Konsolenskripts unterscheiden sich für Unix-basierte und Windows-Plattformen. In den Beispielen müssen Sie möglicherweise die Erweiterung entsprechend Ihrer Plattform hinzufügen. Linux: Skripte in bin/ mit der Erweiterung .sh . Windows: Skripts in bin\windows\ und mit der Erweiterung .bat .

Installation

Schritt 1: Laden Sie den Code herunter und entpacken Sie ihn:

tar -xzf kafka_2.11-0.10.1.0.tgz cd kafka_2.11-0.10.1.0

Schritt 2: Starten Sie den Server.

server.properties Themen später löschen zu können, öffnen Sie server.properties und setzen Sie delete.topic.enable auf true.

Kafka ist stark auf den Zoowärter angewiesen, daher müssen Sie ihn zuerst starten. Wenn Sie es nicht installiert haben, können Sie das mit kafka gelieferte Komfortskript verwenden, um eine schnelle und schmutzige ZooKeeper-Instanz mit einem Knoten zu erhalten.

zookeeper-server-start config/zookeeper.properties kafka-server-start config/server.properties

Schritt 3: Stellen Sie sicher, dass alles läuft

Sie sollten jetzt einen Zookeeper haben, der localhost:2181 hört, und einen einzelnen Kafka-Broker auf localhost:6667 .

Erstellen Sie ein Thema

Wir haben nur einen Broker, also erstellen wir ein Thema ohne Replikationsfaktor und nur eine Partition:

kafka-topics --zookeeper localhost:2181 \ --create \ --replication-factor 1 \ --partitions 1 \ --topic test-topic

Überprüfen Sie Ihr Thema:

kafka-topics --zookeeper localhost:2181 --list test-topic kafka-topics --zookeeper localhost:2181 --describe --topic test-topic Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

https://riptutorial.com/de/home 4

Page 9: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Nachrichten senden und empfangen

Einen Verbraucher starten:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic

Starten Sie an einem anderen Terminal einen Produzenten und senden Sie einige Nachrichten. Standardmäßig sendet das Tool jede Zeile als separate Nachricht ohne besondere Codierung an den Broker. Schreiben Sie einige Zeilen und beenden Sie mit STRG + D oder STRG + C:

kafka-console-producer --broker-list localhost:9092 --topic test-topic a message another message ^D

Die Meldungen sollten im Verbraucher-Therminal erscheinen.

Stoppen Sie Kafka

kafka-server-stop

Starten Sie ein Multi-Broker-Cluster

Die obigen Beispiele verwenden nur einen Broker. Um einen echten Cluster einzurichten, müssen Sie lediglich mehrere Kafka-Server starten. Sie koordinieren sich automatisch.

Schritt 1: Um eine Kollision zu vermeiden, erstellen wir für jeden Broker eine Datei server.properties und ändern die Konfigurationseigenschaften für id , port und logfile .

Kopieren:

cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties

Bearbeiten Sie die Eigenschaften für jede Datei, zum Beispiel:

vim config/server-1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/usr/local/var/lib/kafka-logs-1 vim config/server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/usr/local/var/lib/kafka-logs-2

https://riptutorial.com/de/home 5

Page 10: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Schritt 2: Starten Sie die drei Broker:

kafka-server-start config/server.properties & kafka-server-start config/server-1.properties & kafka-server-start config/server-2.properties &

Erstellen Sie ein repliziertes Thema

kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

Dieses Mal gibt es mehr Informationen:

"Leader" ist der Knoten, der für alle Lese- und Schreibvorgänge für die angegebene Partition verantwortlich ist. Jeder Knoten ist der Anführer für einen zufällig ausgewählten Teil der Partitionen.

"Replikate" ist die Liste der Knoten, die das Protokoll für diese Partition replizieren, unabhängig davon, ob sie der Anführer sind oder ob sie gerade aktiv sind.

"isr" ist die Menge von "In-Sync" -Replikaten. Dies ist die Untergruppe der Replikatenliste, die derzeit aktiv ist und vom Leader erfasst wird.

Beachten Sie, dass das zuvor erstellte Thema unverändert bleibt.

Testfehlertoleranz

Veröffentlichen Sie eine Nachricht zum neuen Thema:

kafka-console-producer --broker-list localhost:9092 --topic replicated-topic hello 1 hello 2 ^C

Töte den Anführer (1 in unserem Beispiel). Unter Linux:

ps aux | grep server-1.properties kill -9 <PID>

Unter Windows:

wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" taskkill /pid <PID> /f

Sehen Sie, was passiert ist:

https://riptutorial.com/de/home 6

Page 11: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

Die Führung hat auf Makler 2 und "1" in nicht mehr synchronisiert. Die Nachrichten sind jedoch immer noch vorhanden (verwenden Sie den Verbraucher, um sich selbst auszuchecken).

Aufräumen

Löschen Sie die beiden Themen mit:

kafka-topics --zookeeper localhost:2181 --delete --topic test-topic kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic

Erste Schritte mit Apache-Kafka online lesen: https://riptutorial.com/de/apache-kafka/topic/1986/erste-schritte-mit-apache-kafka

https://riptutorial.com/de/home 7

Page 12: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 2: Benutzerdefinierter Serializer / Deserializer

Einführung

Kafka speichert und transportiert Byte-Arrays in seiner Warteschlange. Die (De) Serializer sind für die Übersetzung zwischen dem von Kafka und POJO bereitgestellten Byte-Array verantwortlich.

Syntax

public void configure (Map <String,?> config, boolean isKey);•public T deserialize (String-Thema, Byte [] Bytes);•public byte [] serialize (String-Thema, T obj);•

Parameter

Parameter Einzelheiten

Konfig

die Konfigurationseigenschaften ( Properties ), die bei der Erstellung als Producer oder Producer an den Producer oder Consumer werden Es enthält reguläre Kafka-Konfigurationen, kann jedoch auch mit benutzerdefinierten Konfigurationen erweitert werden. Dies ist der beste Weg, um Argumente an den (De) Serializer zu übergeben.

isKeyBenutzerdefinierte (De) Serialisierer können für Schlüssel und / oder Werte verwendet werden. Dieser Parameter gibt an, mit welcher der beiden Instanzen diese Instanz umgehen wird.

Themadas Thema der aktuellen Nachricht. Auf diese Weise können Sie eine benutzerdefinierte Logik basierend auf dem Quell- / Zielthema definieren.

Bytes Die rohe Botschaft zur Deserialisierung

objDie Nachricht, die serialisiert werden soll. Die tatsächliche Klasse hängt von Ihrem Serializer ab.

Bemerkungen

Vor der Version 0.9.0.0 verwendete die Kafka Java API Encoders und Decoders . Sie wurden in der neuen API durch Serializer und Deserializer ersetzt.

Examples

https://riptutorial.com/de/home 8

Page 13: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Gson (de) Serialisierer

In diesem Beispiel wird die Gson- Bibliothek verwendet, um Java-Objekte Json-Strings zuzuordnen . Die (De) Serialisierer sind generisch, müssen aber nicht immer!

Serializer

Code

public class GsonSerializer<T> implements Serializer<T> { private Gson gson = new GsonBuilder().create(); @Override public void configure(Map<String, ?> config, boolean isKey) { // this is called right after construction // use it for initialisation } @Override public byte[] serialize(String s, T t) { return gson.toJson(t).getBytes(); } @Override public void close() { // this is called right before destruction } }

Verwendungszweck

Serialisierer werden über die erforderlichen key.serializer und value.serializer definiert.

Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // ... other producer properties ... props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", GsonSerializer.class.getName()); Producer<String, SensorValue> producer = new KafkaProducer<>(properties); // ... produce messages ... producer.close();

( key.serializer ist eine erforderliche Konfiguration. Da wir keine Meldungsschlüssel angeben, wird der StringSerializer mit kafka ausgeliefert, der mit null umgehen kann).

https://riptutorial.com/de/home 9

Page 14: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Deserialisierer

Code

public class GsonDeserializer<T> implements Deserializer<T> { public static final String CONFIG_VALUE_CLASS = "value.deserializer.class"; public static final String CONFIG_KEY_CLASS = "key.deserializer.class"; private Class<T> cls; private Gson gson = new GsonBuilder().create(); @Override public void configure(Map<String, ?> config, boolean isKey) { String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS; String clsName = String.valueOf(config.get(configKey)); try { cls = (Class<T>) Class.forName(clsName); } catch (ClassNotFoundException e) { System.err.printf("Failed to configure GsonDeserializer. " + "Did you forget to specify the '%s' property ?%n", configKey); } } @Override public T deserialize(String topic, byte[] bytes) { return (T) gson.fromJson(new String(bytes), cls); } @Override public void close() {} }

Verwendungszweck

Deserialisierer werden durch die erforderlichen Konsumenteneigenschaften key.deserializer und value.deserializer definiert.

Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // ... other consumer properties ... props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", GsonDeserializer.class.getName()); props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName()); try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {

https://riptutorial.com/de/home 10

Page 15: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

// ... consume messages ... }

Hier fügen wir der Consumer-Konfiguration eine benutzerdefinierte Eigenschaft hinzu, nämlich CONFIG_VALUE_CLASS . Der GsonDeserializer verwendet sie in der configure() Methode, um zu bestimmen, welche POJO-Klasse behandelt werden soll (alle Eigenschaften, die zu props hinzugefügt werden, werden in Form einer Map an die configure Methode übergeben).

Benutzerdefinierter Serializer / Deserializer online lesen: https://riptutorial.com/de/apache-kafka/topic/8992/benutzerdefinierter-serializer---deserializer

https://riptutorial.com/de/home 11

Page 16: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 3: Kafka-Konsolen-Tools

Einführung

Kafka bietet Befehlszeilen-Tools zum Verwalten von Themen und Verbrauchergruppen, zum Konsumieren und Veröffentlichen von Nachrichten usw.

Wichtig : Kafka-Konsolenskripts unterscheiden sich für Unix-basierte und Windows-Plattformen. In den Beispielen müssen Sie möglicherweise die Erweiterung entsprechend Ihrer Plattform hinzufügen.

Linux : Skripte in bin/ mit der Erweiterung .sh .

Windows : Skripts in bin\windows\ und mit der Erweiterung .bat .

Examples

kafka-themen

Mit diesem Tool können Sie Themen auflisten, erstellen, ändern und beschreiben.

Themen auflisten:

kafka-topics --zookeeper localhost:2181 --list

Erstellen Sie ein Thema:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

erstellt ein Thema mit einer Partition und keiner Replikation.

Beschreiben Sie ein Thema:

kafka-topics --zookeeper localhost:2181 --describe --topic test

Ändern Sie ein Thema:

# change configuration kafka-topics --zookeeper localhost:2181 --alter --topic test --config max.message.bytes=128000 # add a partition kafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 2

(Achtung: Kafka unterstützt nicht die Reduzierung der Anzahl der Partitionen eines Themas.) (Siehe Liste der Konfigurationseigenschaften ).

https://riptutorial.com/de/home 12

Page 17: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kafka-Konsolenproduzent

Mit diesem Tool können Sie Meldungen über die Befehlszeile erstellen.

Senden Sie einfache Stringnachrichten an ein Thema:

kafka-console-producer --broker-list localhost:9092 --topic test here is a message here is another message ^D

(jede neue Zeile ist eine neue Nachricht, geben Sie zum Beenden Strg + D oder Strg + C ein)

Nachrichten mit Schlüssel senden:

kafka-console-producer --broker-list localhost:9092 --topic test-topic \ --property parse.key=true \ --property key.separator=, key 1, message 1 key 2, message 2 null, message 3 ^D

Nachrichten aus einer Datei senden:

kafka-console-producer --broker-list localhost:9092 --topic test_topic < file.log

Kafka-Konsolen-Verbraucher

Mit diesem Tool können Sie Nachrichten eines Themas verwenden.

--bootstrap-server die alte Consumer-Implementierung zu verwenden, ersetzen Sie --bootstrap-server durch --zookeeper .

Einfache Nachrichten anzeigen:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test

Alte Nachrichten verbrauchen:

Um ältere Nachrichten --from-beginning , können Sie die Option --from-beginning .

Schlüsselwertmeldungen anzeigen :

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic \ --property print.key=true \ --property key.separator=,

kafka-simple-consumer-shell

https://riptutorial.com/de/home 13

Page 18: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Dieser Consumer ist ein einfaches Tool, mit dem Sie Nachrichten von bestimmten Partitionen, Offsets und Repliken verwenden können.

Nützliche Parameter:

parition : die bestimmte Partition, von der konsumiert werden soll (Standardeinstellung für alle)

offset : der Anfangsversatz. Verwenden Sie -2 , um Nachrichten von Anfang an zu verbrauchen, und -1 , um vom Ende an zu verbrauchen.

max-messages : Anzahl der zu druckenden Nachrichten•replica : die Replik, Standard für den Broker-Leader (-1)•

Beispiel:

kafka-simple-consumer-shell \ --broker-list localhost:9092 \ --partition 1 \ --offset 4 \ --max-messages 3 \ --topic test-topic

Zeigt 3 Meldungen von Partition 1 an, beginnend mit Offset 4 aus dem Testthema.

Kafka-Verbrauchergruppen

Mit diesem Tool können Sie Verbrauchergruppen auflisten, beschreiben oder löschen. Weitere Informationen zu Verbrauchergruppen finden Sie in diesem Artikel .

Wenn Sie noch die alte Consumer-Implementierung verwenden, ersetzen Sie --bootstrap-server durch --zookeeper .

Verbrauchergruppen auflisten:

kafka-consumer-groups --bootstrap-server localhost:9092 --list octopus

Beschreiben Sie eine Verbrauchergruppe:

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group octopus GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER octopus test-topic 0 15 15 0 octopus-1/127.0.0.1 octopus test-topic 1 14 15 1 octopus-2_/127.0.0.1

Bemerkungen : in der Ausgabe oben

current-offset ist der zuletzt festgelegte Versatz der Verbraucherinstanz,•log-end-offset ist der höchste Offset der Partition (das Summieren dieser Spalte gibt die Gesamtzahl der Nachrichten für das Thema an.)

lag ist die Differenz zwischen dem aktuellen Offset des Verbrauchers und dem höchsten •

https://riptutorial.com/de/home 14

Page 19: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Offset, also wie weit hinter dem Verbraucher liegt.owner ist die client.id des Verbrauchers (wenn nicht angegeben, wird eine Standard- client.id angezeigt).

Eine Verbrauchergruppe löschen:

Das Löschen ist nur verfügbar, wenn die Gruppenmetadaten im zookeeper (alte Consumer-API) gespeichert sind. Mit der neuen Consumer-API übernimmt der Broker alles, einschließlich Löschen von Metadaten: Die Gruppe wird automatisch gelöscht, wenn der letzte festgeschriebene Offset für die Gruppe abläuft.

kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group octopus

Kafka-Konsolen-Tools online lesen: https://riptutorial.com/de/apache-kafka/topic/8990/kafka-konsolen-tools

https://riptutorial.com/de/home 15

Page 20: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 4: Produzent / Verbraucher in Java

Einführung

In diesem Thema wird gezeigt, wie Datensätze in Java erstellt und verwendet werden.

Examples

SimpleConsumer (Kafka> = 0.9.0)

Mit der 0.9-Version von Kafka wurde ein komplett neues Design des Kafka-Konsumenten eingeführt. Wenn Sie sich für den alten SimpleConsumer (0.8.X) interessieren, SimpleConsumer Sie diese Seite . Wenn Ihre Kafka-Installation neuer als 0.8.X ist, sollten die folgenden Codes sofort funktionieren.

Konfiguration und Initialisierung

Kafka 0.9 unterstützt Java 6 oder Scala 2.9 nicht mehr. Wenn Sie noch Java 6 verwenden, sollten Sie ein Upgrade auf eine unterstützte Version in Betracht ziehen.

Erstellen Sie zunächst ein Maven-Projekt und fügen Sie Ihrem Pom folgende Abhängigkeit hinzu:

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> </dependencies>

Hinweis : Vergessen Sie nicht, das Versionsfeld für die neuesten Versionen (jetzt> 0.10) zu aktualisieren.

Der Consumer wird mit einem Properties Objekt initialisiert. Es gibt viele Eigenschaften, mit denen Sie das Verbraucherverhalten genau einstellen können. Unten ist die minimale Konfiguration erforderlich:

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer-tutorial"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName());

Bei den bootstrap-servers handelt es sich um eine erste Liste von Brokern, die der Konsument den Rest des Clusters ermitteln kann. Dies müssen nicht alle Server im Cluster sein: Der Client ermittelt die vollständige Anzahl der aktiven Broker aus den Brokern in dieser Liste.

https://riptutorial.com/de/home 16

Page 21: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Der deserializer teilt dem Benutzer mit, wie er die Nachrichtenschlüssel und -werte interpretieren / deserialisieren soll. Hier verwenden wir den integrierten StringDeserializer .

Schließlich entspricht die group.id der Verbrauchergruppe dieses Clients. Denken Sie daran: Alle Verbraucher einer Verbrauchergruppe teilen Nachrichten auf (kafka, die sich wie eine Nachrichtenwarteschlange verhalten), während Verbraucher aus verschiedenen Verbrauchergruppen dieselben Meldungen erhalten (kafka wie ein Publish-Subscribe-System).

Andere nützliche Eigenschaften sind:

auto.offset.reset : auto.offset.reset was zu tun ist, wenn der in Zookeeper gespeicherte Offset fehlt oder außerhalb des Bereichs liegt. Mögliche Werte sind latest und earliest . Alles andere wird eine Ausnahme auslösen.

enable.auto.commit : Wenn true (Standard), wird der Verbraucherversatz periodisch (siehe auto.commit.interval.ms ) im Hintergrund gespeichert. Wenn Sie den auto.offset.reset=earliest auf false und auto.offset.reset=earliest , können Sie festlegen, von wo aus der Consumer starten soll, falls keine festgeschriebenen Offset-Informationen gefunden werden. earliest bedeutet vom Beginn der zugewiesenen Themenpartition. latest Mittelwert aus der höchsten Anzahl verfügbarer festgeschriebener Offsets für die Partition. Der Kafka-Consumer wird jedoch immer vom letzten festgeschriebenen Offset auto.offset.reset solange ein gültiger Offset-Datensatz gefunden wird (dh auto.offset.reset ignoriert. Das beste Beispiel ist, wenn eine brandneue Consumer-Gruppe ein Thema abonniert. Dies ist der auto.offset.reset in dem sie verwendet wird auto.offset.reset , um zu bestimmen, ob am Anfang (frühesten) oder am Ende (neuesten) des Themas begonnen werden soll.

session.timeout.ms : Durch ein Sitzungszeitlimit wird sichergestellt, dass die Sperre session.timeout.ms wird, wenn der Verbraucher abstürzt oder wenn eine Netzwerkpartition den Verbraucher vom Koordinator isoliert. Tatsächlich:

Wenn er zu einer Verbrauchergruppe gehört, wird jedem Verbraucher eine Teilmenge der Partitionen aus Themen zugewiesen, die er abonniert hat. Dies ist im Grunde eine Gruppensperre für diese Partitionen. Solange die Sperre gehalten wird, kann kein anderes Mitglied der Gruppe von ihnen lesen. Wenn Ihr Verbraucher gesund ist, ist dies genau das, was Sie wollen. Nur so können Sie doppelten Verbrauch vermeiden. Wenn der Consumer jedoch aufgrund eines Maschinen- oder Anwendungsfehlers stirbt, müssen Sie die Sperre aufheben, damit die Partitionen einem fehlerhaften Mitglied zugewiesen werden können. Quelle

Die vollständige Liste der Eigenschaften finden Sie hier http://kafka.apache.org/090/documentation.html#newconsumerconfigs .

Erstellung von Verbrauchern und Themenabonnement

https://riptutorial.com/de/home 17

Page 22: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Sobald wir die Eigenschaften haben, ist das Erstellen eines Verbrauchers einfach:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ); consumer.subscribe( Collections.singletonList( "topic-example" ) );

Nach dem Abonnieren kann der Consumer mit dem Rest der Gruppe koordinieren, um die Partitionszuordnung zu erhalten. Dies wird automatisch erledigt, wenn Sie mit dem Datenverbrauch beginnen.

Grundlegende Umfrage

Der Verbraucher muss in der Lage sein, Daten parallel abzurufen, möglicherweise aus vielen Partitionen für viele Themen, die sich wahrscheinlich auf viele Broker erstrecken. Glücklicherweise wird dies alles automatisch erledigt, wenn Sie mit dem Datenverbrauch beginnen. Dazu müssen Sie poll eine poll in einer Schleife aufrufen, und der Verbraucher kümmert sich um den Rest.

poll gibt einen (möglicherweise leeren) Satz von Nachrichten aus den zugewiesenen Partitionen zurück.

while( true ){ ConsumerRecords<String, String> records = consumer.poll( 100 ); if( !records.isEmpty() ){ StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println ); } }

Der Code

Grundlegendes Beispiel

Dies ist der einfachste Code, mit dem Sie Nachrichten aus einem Kafka-Thema abrufen können.

public class ConsumerExample09{ public static void main( String[] args ){ Properties props = new Properties(); props.put( "bootstrap.servers", "localhost:9092" ); props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ); props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ); props.put( "auto.offset.reset", "earliest" ); props.put( "enable.auto.commit", "false" ); props.put( "group.id", "octopus" ); try( KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ) ){ consumer.subscribe( Collections.singletonList( "test-topic" ) );

https://riptutorial.com/de/home 18

Page 23: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

while( true ){ // poll with a 100 ms timeout ConsumerRecords<String, String> records = consumer.poll( 100 ); if( records.isEmpty() ) continue; StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println ); } } } }

Laufbares Beispiel

Der Consumer ist dafür ausgelegt, in einem eigenen Thread ausgeführt zu werden. Es ist nicht sicher für die Verwendung von Multithreading ohne externe Synchronisierung, und es ist wahrscheinlich keine gute Idee, es auszuprobieren.

Nachfolgend finden Sie eine einfache ausführbare Task, die den Verbraucher initialisiert, eine Liste von Themen abonniert und die Abfrageschleife unbegrenzt ausführt, bis sie extern heruntergefahren wird.

public class ConsumerLoop implements Runnable{ private final KafkaConsumer<String, String> consumer; private final List<String> topics; private final int id; public ConsumerLoop( int id, String groupId, List<String> topics ){ this.id = id; this.topics = topics; Properties props = new Properties(); props.put( "bootstrap.servers", "localhost:9092"); props.put( "group.id", groupId ); props.put( "auto.offset.reset", "earliest" ); props.put( "key.deserializer", StringDeserializer.class.getName() ); props.put( "value.deserializer", StringDeserializer.class.getName() ); this.consumer = new KafkaConsumer<>( props ); } @Override public void run(){ try{ consumer.subscribe( topics ); while( true ){ ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE ); StreamSupport.stream( records.spliterator(), false ).forEach( System.out::println ); } }catch( WakeupException e ){ // ignore for shutdown }finally{ consumer.close(); } } public void shutdown(){

https://riptutorial.com/de/home 19

Page 24: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

consumer.wakeup(); } }

Beachten Sie, dass wir während des Long.MAX_VALUE ein Timeout von Long.MAX_VALUE Es wird also unbegrenzt auf eine neue Nachricht gewartet. Um den Consumer ordnungsgemäß zu schließen, muss vor dem Beenden der Anwendung die Methode shutdown() aufgerufen werden.

Ein Treiber könnte es so verwenden:

public static void main( String[] args ){ int numConsumers = 3; String groupId = "octopus"; List<String> topics = Arrays.asList( "test-topic" ); ExecutorService executor = Executors.newFixedThreadPool( numConsumers ); final List<ConsumerLoop> consumers = new ArrayList<>(); for( int i = 0; i < numConsumers; i++ ){ ConsumerLoop consumer = new ConsumerLoop( i, groupId, topics ); consumers.add( consumer ); executor.submit( consumer ); } Runtime.getRuntime().addShutdownHook( new Thread(){ @Override public void run(){ for( ConsumerLoop consumer : consumers ){ consumer.shutdown(); } executor.shutdown(); try{ executor.awaitTermination( 5000, TimeUnit.MILLISECONDS ); }catch( InterruptedException e ){ e.printStackTrace(); } } } ); }

SimpleProducer (kafka> = 0,9)

Konfiguration und Initialisierung

Erstellen Sie zunächst ein Maven-Projekt und fügen Sie Ihrem Pom folgende Abhängigkeit hinzu:

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> </dependencies>

https://riptutorial.com/de/home 20

Page 25: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Der Produzent wird mit einem Properties initialisiert. Es gibt viele Eigenschaften, mit denen Sie das Verhalten des Produzenten fein abstimmen können. Unten ist die minimale Konfiguration erforderlich:

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "simple-producer-XX");

Bei den bootstrap-servers handelt es sich um eine erste Liste von einem oder mehreren Brokern, damit der Hersteller den Rest des Clusters ermitteln kann. Die Eigenschaften des serializer Kafka an, wie der Nachrichtenschlüssel und der Wert codiert werden sollen. Hier werden wir String-Nachrichten senden. Obwohl dies nicht erforderlich ist, wird das Festlegen einer client.id since immer empfohlen: Auf diese Weise können Sie Anforderungen auf dem Broker problemlos mit der client.id korrelieren, von der sie erstellt wurde.

Andere interessante Eigenschaften sind:

props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432);

Sie können die Haltbarkeit von Nachrichten, die an Kafka geschrieben werden, über die Einstellung " acks . Der Standardwert "1" erfordert eine explizite Bestätigung des Partitionsführers, dass das Schreiben erfolgreich war. Die stärkste Garantie, die Kafka bietet, ist acks=all garantiert, dass der Partitionsleiter nicht nur den Schreibvorgang akzeptiert, sondern auch erfolgreich auf alle in-sync-Replikate repliziert wurde. Sie können auch den Wert "0" verwenden, um den Durchsatz zu maximieren. Sie können jedoch nicht garantieren, dass die Nachricht erfolgreich in das Broker-Protokoll geschrieben wurde, da der Broker in diesem Fall nicht einmal eine Antwort sendet.

retries (Standardeinstellung> 0) legt fest, ob der Produzent versucht, die Nachricht nach einem Fehler erneut zu senden. Beachten Sie, dass bei Neuversuchen> 0 eine Neuordnung der Nachrichten auftreten kann, da der Neuversuch nach einem nachfolgenden Schreibvorgang erfolgen kann.

Kafka-Hersteller versuchen, gesendete Nachrichten in Batch zu sammeln, um den Durchsatz zu verbessern. Mit dem Java-Client können Sie batch.size , um die maximale Größe jedes Nachrichtenstapels in Byte zu steuern. Um mehr Zeit für das Füllen der linger.ms zu haben, können Sie linger.ms , damit der Produzent das Senden verzögert. Schließlich kann die compression.type mit der Einstellung Komprimierungstyp aktiviert werden.

Verwenden Sie buffer.memory , um den für den Java-Client verfügbaren Gesamtspeicher für das Sammeln nicht gesendeter Nachrichten zu begrenzen. Wenn dieses Limit erreicht ist, blockiert der Produzent zusätzliche Sends für max.block.ms bevor eine Ausnahme max.block.ms . Um zu vermeiden, dass Datensätze unbegrenzt in der Warteschlange request.timeout.ms werden, können Sie mit request.timeout.ms ein Timeout request.timeout.ms .

https://riptutorial.com/de/home 21

Page 26: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Die vollständige Liste der Eigenschaften finden Sie hier . Ich schlage vor, diesen Artikel von Confluent für weitere Details zu lesen.

Nachrichten senden

Die send() -Methode ist asynchron. Beim Aufruf fügt er den Datensatz zu einem Puffer mit ausstehenden Datensatzübertragungen hinzu und kehrt sofort zurück. Dies ermöglicht es dem Hersteller, einzelne Datensätze für die Effizienz zusammenzufassen.

Das Ergebnis von send ist ein RecordMetadata , das die Partition angibt, an die der Datensatz gesendet wurde, und den Offset, dem er zugewiesen wurde. Da der Sendeaufruf asynchron ist, gibt er eine Future für die RecordMetadata zurück, die diesem Datensatz zugewiesen wird. Um die Metadaten abzurufen, können Sie entweder get() aufrufen, das blockiert, bis die Anforderung abgeschlossen ist, oder einen Rückruf verwenden.

// synchronous call with get() RecordMetadata recordMetadata = producer.send( message ).get(); // callback with a lambda producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

Der Code

public class SimpleProducer{ public static void main( String[] args ) throws ExecutionException, InterruptedException{ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put( "client.id", "octopus" ); String topic = "test-topic"; Producer<String, String> producer = new KafkaProducer<>( props ); for( int i = 0; i < 10; i++ ){ ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i ); producer.send( message ); System.out.println("message sent."); } producer.close(); // don't forget this } }

https://riptutorial.com/de/home 22

Page 27: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Produzent / Verbraucher in Java online lesen: https://riptutorial.com/de/apache-kafka/topic/8974/produzent---verbraucher-in-java

https://riptutorial.com/de/home 23

Page 28: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Kapitel 5: Verbrauchergruppen und Offset-Management

Parameter

Parameter Beschreibung

Gruppen-ID Der Name der Verbrauchergruppe.

enable.auto.commit Offsets automatisch festschreiben; Standardeinstellung: true

auto.commit.interval.msDie minimale Verzögerung in Millisekunden zwischen to-Commits (erfordert enable.auto.commit=true ); Standardeinstellung: 5000 .

auto.offset.resetWas ist zu tun, wenn kein gültiger festgeschriebener Offset gefunden wurde? voreingestellt: neueste (+)

(+) Mögliche Werte Beschreibung

früheste Setzt den Offset automatisch auf den frühesten Offset zurück.

neuesteSetzen Sie den Versatz automatisch auf den neuesten Versatz zurück.

keinerAusnahme für den Verbraucher auslösen, wenn für die Gruppe des Verbrauchers kein vorheriger Offset gefunden wurde.

noch etwas Ausnahme für den Verbraucher werfen.

Examples

Was ist eine Verbrauchergruppe?

Ab Kafka 0.9 steht der neue KafkaConsumer- Client auf hohem Niveau zur Verfügung. Es nutzt ein neues integriertes Kafka-Protokoll , das die Kombination mehrerer Verbraucher in einer sogenannten Consumer-Gruppe ermöglicht . Eine Consumer Group kann als ein einzelner logischer Consumer bezeichnet werden, der eine Reihe von Themen abonniert. Die Teilbereiche über alle Themen werden den physischen Verbrauchern innerhalb der Gruppe zugeordnet, so dass jedes Patent genau einem Verbraucher zugewiesen wird (ein einzelner Verbraucher kann mehrere Teiltiteln erhalten). Die einzelnen Konsumenten, die zu derselben Gruppe gehören, können auf verschiedenen Hosts verteilt ausgeführt werden.

Verbrauchergruppen werden über ihre group.id identifiziert. Um eine bestimmte group.id zu einer Consumer Group zu machen, genügt es, die Gruppen group.id diesem Client über die

https://riptutorial.com/de/home 24

Page 29: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Konfiguration des Clients zuzuweisen:

Properties props = new Properties(); props.put("group.id", "groupName"); // ...some more properties required new KafkaConsumer<K, V>(config);

Daher bilden alle Verbraucher, die sich mit demselben Kafka-Cluster verbinden und dieselbe group.id , eine Verbrauchergruppe. Verbraucher können jederzeit eine Gruppe verlassen, und neue Verbraucher können jederzeit einer Gruppe beitreten. In beiden Fällen wird eine sogenannte Neuverteilung ausgelöst, und Partitionen werden bei der Consumer-Gruppe neu zugewiesen, um sicherzustellen, dass jede Partition von genau einem Consumer innerhalb der Gruppe verarbeitet wird.

KafkaConsumer , dass auch ein einzelner KafkaConsumer mit sich selbst als Einzelmitglied eine Consumer Group bildet.

Consumer Offset Management und Fehlertoleranz

KafkaConsumers fordern über einen Aufruf von poll() Nachrichten von einem Kafka-Broker an poll() und ihr Fortschritt wird über Offsets verfolgt . Jeder Nachricht in jeder Partition jedes Themas ist ein sogenannter Versatz zugewiesen, dessen logische Folgenummer in der Partition. Ein KafkaConsumer verfolgt seinen aktuellen Versatz für jede ihm zugewiesene Partition. Beachten Sie, dass die Kafka-Broker die aktuellen Offsets der Verbraucher nicht kennen. Daher muss der Verbraucher bei poll() seine aktuellen Offsets an den Broker senden, sodass der Broker die entsprechenden Nachrichten zurücksenden kann. Nachrichten mit einem größeren nachfolgenden Offset. Nehmen wir zum Beispiel an, dass wir ein einzelnes Partitionsthema und einen einzelnen Consumer mit aktuellem Offset 5 haben. Bei poll() sendet der Consumer, wenn ein Offset an den Broker gesendet wird, und der Broker sendet Nachrichten für die Offsets 6,7,8, ...

Da Verbraucher ihre Offsets selbst verfolgen, können diese Informationen verloren gehen, wenn ein Verbraucher ausfällt. Offsets müssen daher zuverlässig gespeichert werden, so dass ein Verbraucher beim Neustart seinen alten Offset aufheben und dort weiterspielen kann, wo er sich befindet. In Kafka ist dies durch Offset-Commits integriert . Der neue KafkaConsumer kann seinen aktuellen Offset an Kafka übergeben, und Kafka speichert diese Offsets in einem speziellen Thema namens __consumer_offsets . Das Speichern der Offsets innerhalb eines Kafka-Themas ist nicht nur fehlertolerant, sondern ermöglicht auch die Neuzuordnung von Partitionen während einer Neuverteilung. Da alle Benutzer einer Consumer-Gruppe auf alle festgeschriebenen Offsets aller Partitionen zugreifen können, liest ein __consumer_offsets , der eine neue Partition zugewiesen bekommt, bei der __consumer_offsets nur den festgeschriebenen Offset dieser Partition aus dem Thema __consumer_offsets und setzt dort fort, wo der alte Consumer __consumer_offsets .

Offsets festlegen

KafkaConsumers können Offsets automatisch im Hintergrund enable.auto.commit = true (Konfigurationsparameter enable.auto.commit = true ). enable.auto.commit = true ist die Standardeinstellung. Diese Auto-Commits werden innerhalb von poll() ( das normalerweise in einer Schleife aufgerufen wird ) ausgeführt. Wie häufig Offsets ausgeführt werden sollen, kann

https://riptutorial.com/de/home 25

Page 30: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

über auto.commit.interval.ms konfiguriert werden. Da Auto-Commits in poll() eingebettet sind und poll() vom Benutzercode aufgerufen wird, definiert dieser Parameter eine Untergrenze für das Inter-Commit-Intervall.

Als Alternative zum automatischen Festschreiben können Offsets auch manuell verwaltet werden. Dafür sollte das automatische enable.auto.commit = false deaktiviert werden ( enable.auto.commit = false ). Für das manuelle KafkaConsumers bietet KafkaConsumers zwei Methoden an, nämlich commitSync () und commitAsync () . Wie der Name schon sagt, ist commitSync() ein blockierender Aufruf, der zurückkehrt, nachdem Offsets erfolgreich festgeschrieben wurden, und commitAsync() kehrt sofort zurück. Wenn Sie wissen möchten, ob ein Commit erfolgreich war oder nicht, können Sie einen Methodenrückrufhandler ( OffsetCommitCallback ) OffsetCommitCallback . Beachten Sie, dass der Verbraucher bei beiden Commit-Aufrufen die Offsets des letzten poll() Aufrufs festlegt. Zum Beispiel. Nehmen wir an, ein einzelnes Partitionsthema mit einem einzelnen Consumer und der letzte Aufruf von poll() Nachrichten mit Offsets 4,5,6 zurück. Beim Commit wird Offset 6 festgeschrieben, da dies der letzte vom Kundenclient verfolgte Offset ist. Gleichzeitig ermöglichen commitSync() und commitAsync() mehr Kontrolle darüber, welchen Offset Sie festlegen möchten: Wenn Sie die entsprechenden Überladungen verwenden, mit denen Sie eine Map<TopicPartition, OffsetAndMetadata> angeben Map<TopicPartition, OffsetAndMetadata> der Map<TopicPartition, OffsetAndMetadata> nur die angegebenen Offsets (dh die Karte kann eine beliebige Teilmenge der zugewiesenen Partitionen enthalten, und der angegebene Versatz kann einen beliebigen Wert haben).

Semantik begangener Offsets

Ein festgeschriebener Offset zeigt an, dass alle Nachrichten bis zu diesem Offset bereits verarbeitet wurden. Da Offsets aufeinanderfolgende Zahlen sind, werden durch das Festlegen des Offsets X implizit alle Offsets kleiner als X . Daher ist es nicht notwendig, jeden Versatz einzeln festzulegen, und mehrere Versätze gleichzeitig zu begehen, dies geschieht jedoch nur der größte Versatz.

Beachten Sie, dass es konstruktionsbedingt auch möglich ist, einen kleineren Offset als den zuletzt festgeschriebenen Offset festzulegen. Dies ist möglich, wenn Nachrichten ein zweites Mal gelesen werden sollen.

Bearbeitungsgarantien

Die Verwendung des automatischen Festschreibens ermöglicht die Verarbeitung von Semantiken. Die zugrunde liegende Annahme ist, dass poll() nur aufgerufen wird, nachdem alle zuvor gelieferten Nachrichten erfolgreich verarbeitet wurden. Dadurch wird sichergestellt, dass keine Nachricht verloren geht, da nach der Verarbeitung ein Commit ausgeführt wird. Wenn ein Consumer vor einem Commit ausfällt, werden alle Nachrichten nach dem letzten Commit von Kafka empfangen und erneut verarbeitet. Dieser Wiederholungsversuch kann jedoch zu Duplikaten führen, da möglicherweise eine Nachricht des letzten poll() Aufrufs verarbeitet wurde, der Fehler jedoch unmittelbar vor dem Auto-Commit-Aufruf aufgetreten ist.

Wenn eine Bearbeitungssemantik erforderlich ist, muss das automatische commitSync() deaktiviert

https://riptutorial.com/de/home 26

Page 31: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

werden, und ein commitSync() direkt nach dem commitSync() poll() sollte ausgeführt werden. Danach werden Nachrichten verarbeitet. Dadurch wird sichergestellt, dass Nachrichten festgeschrieben werden, bevor sie verarbeitet werden, und somit niemals ein zweites Mal gelesen werden. Natürlich kann eine Nachricht im Fehlerfall verloren gehen.

Wie kann ich das Thema von Anfang an lesen?

Es gibt mehrere Strategien, um ein Thema von Anfang an zu lesen. Um dies zu erklären, müssen wir zuerst verstehen, was beim Start des Verbrauchers geschieht. Beim Start eines Verbrauchers geschieht Folgendes:

treten Sie der konfigurierten Verbrauchergruppe bei, die eine Neuverteilung auslöst und dem Verbraucher Partitionen zuweist

1.

nach festgeschriebenen Offsets suchen (für alle Partitionen, die dem Verbraucher zugewiesen wurden)

2.

Für alle Partitionen mit gültigem Offset setzen Sie diesen Offset fort3. auto.offset.reset für alle Partitionen mit ungültigem Offset den Start-Offset gemäß dem Konfigurationsparameter auto.offset.reset

4.

Starten Sie eine neue Consumer Group

Wenn Sie ein Thema von Anfang an group.id möchten, können Sie einfach eine neue Verbrauchergruppe starten (dh eine nicht verwendete group.id ) und auto.offset.reset = earliest . Da für eine neue Gruppe keine festgeschriebenen Offsets vorhanden sind, wird das automatische Offset-Reset ausgelöst, und das Thema wird von Anfang an verwendet. group.id Sie, dass beim Neustart des Verbrauchers, wenn Sie dieselbe group.id erneut verwenden, das Thema nicht von group.id wieder gelesen wird, sondern dort group.id wird, wo es group.id . Für diese Strategie müssen Sie group.id jedes Mal, wenn Sie ein Thema lesen möchten, eine neue group.id zuweisen.

Verwenden Sie dieselbe Gruppen-ID erneut

Um zu vermeiden, group.id jedes Mal, wenn Sie ein Thema von Anfang an lesen möchten, eine neue group.id , können Sie die automatische enable.auto.commit = false deaktivieren (über enable.auto.commit = false ), bevor Sie den Consumer zum ersten Mal starten (mithilfe einer nicht verwendeten group.id und Einstellung auto.offset.reset = earliest ). Darüber hinaus sollten Sie keine Offsets manuell festlegen. Da Offsets niemals mit dieser Strategie festgelegt werden, liest der Verbraucher das Thema beim Neustart erneut von Anfang an.

Diese Strategie hat jedoch zwei Nachteile:

es ist nicht fehlertolerant1. Gruppengleichgewicht funktioniert nicht wie beabsichtigt2.

(1) Da Offsets niemals festgeschrieben werden, werden ein fehlerhafter und ein gestoppter Consumer beim Neustart auf dieselbe Weise behandelt. In beiden Fällen wird das Thema von Anfang an verwendet. (2) Da Offset niemals festgeschrieben wird, werden neu zugewiesene

https://riptutorial.com/de/home 27

Page 32: apache-kafka - RIP Tutorial · 2019-01-18 · Kapitel 1: Erste Schritte mit Apache-Kafka Bemerkungen Kafka ist ein Publish-Subscribe-Messaging-System mit hohem Durchsatz, das als

Partitionen bei der Neuverteilung von Anfang an Verbraucher.

Daher funktioniert diese Strategie nur für Verbrauchergruppen mit einem einzelnen Verbraucher und sollte nur für Entwicklungszwecke verwendet werden.

Verwenden Sie dieselbe Gruppen-ID und Commit erneut

Wenn Sie fehlertolerant sein möchten und / oder mehrere Verbraucher in Ihrer Verbrauchergruppe verwenden möchten, müssen Sie Offsets festlegen. Wenn Sie also ein Thema von Anfang an lesen möchten, müssen Sie festgelegte Offsets beim Start des Konsumenten manipulieren. Dafür bietet KafkaConsumer drei Methoden seek() , seekToBeginning() und seekToEnd() . Während seek() verwendet werden kann, um einen beliebigen Versatz festzulegen, können die zweite und dritte Methode verwendet werden, um den Anfang bzw. das Ende einer Partition zu suchen. Bei einem Ausfall und bei einem Neustart des Verbrauchers würde somit das Suchen entfallen, und der Verbraucher kann dort fortfahren, wo er aufgehört hat. Für Consumer-stop-and- seekToBeginning() -from-Beginn wird seekToBeginning() explizit aufgerufen, bevor Sie Ihre poll() Schleife eingeben. Beachten Sie, dass seekXXX() nur verwendet werden kann, nachdem ein Verbraucher einer Gruppe seekXXX() . seekXXX() vor der Verwendung von seekXXX() eine "Dummy-Abfrage" durchgeführt werden. Der Gesamtcode würde ungefähr so aussehen:

if (consumer-stop-and-restart-from-beginning) { consumer.poll(0); // dummy poll() to join consumer group consumer.seekToBeginning(...); } // now you can start your poll() loop while (isRunning) { for (ConsumerRecord record : consumer.poll(0)) { // process a record } }

Verbrauchergruppen und Offset-Management online lesen: https://riptutorial.com/de/apache-kafka/topic/5449/verbrauchergruppen-und-offset-management

https://riptutorial.com/de/home 28