Kafka Streams in Action 

Kafka Streams in Action 

„WIESO SOLL ICH KAFKA STREAMS VERWENDEN? DAS FUNKTIONIERT DOCH MIT DER KLASSISCHEN PRODUCER-CONSUMER-API GENAUSO GUT, AUßERDEM IST DAS DOCH VIEL ZU KOMPLIZIERT“. 

Auf solche Aussagen stößt man immer wieder, wenn man versucht, das Thema Kafka Streams in Projekten anzusprechen. Ich möchte mit diesem Artikel gerne aufzeigen, dass das Arbeiten mit Kafka Streams überhaupt nicht kompliziert ist und zudem noch etliche Vorteile bietet. 

Dieser Beitrag richtet sich an alle Leser, die bereits Erfahrungen im Umgang mit klassischen Message Brokern und deren Consumer- / Producer-API haben, und mehr zum Thema von Kafka Streams erfahren möchten, oder aber auch an Leser, die sich für den Umgang mit größeren Datenmengen in Microservice-Architekturen interessieren. 

Apache Kafka ist heutzutage in vielen Unternehmen nicht mehr wegzudenken. Egal ob zur einfachen Entkopplung von Microservices oder als Event Streaming Plattform, gewinnt Apache Kafka immer mehr die Rolle eines zentralen Nervensystems für die digitalen Daten der Unternehmen. Vor allem durch seinen hohen Datendurchsatz und seine sehr gute Skalierbarkeit – auch zur Laufzeit – hat es Apache Kafka erreicht, sich in den großen Projekten zu etablieren. 

Consumer-Producer-API 

Im täglichen Einsatz kommt Apache Kafka hauptsächlich in seiner Funktion als Message Broker zum Einsatz, z.B. um eine lose Koppelung der unterschiedlichen Microservices zu erreichen. Dies geschieht dann mittels der Producer-Consumer-API und den bekannten Vor- und Nachteilen: 

  • Trennung der Verantwortlichkeiten zwischen dem Producer und dem Consumer 
  • Abarbeitung der Events entweder Nacheinander oder über Batch-Verarbeitung 
  • Keine Möglichkeit von paralleler Abarbeitung der Events innerhalb einer Partition 
  • Keine Möglichkeit von zustandsbehafteter Event-Verarbeitung 
  • Producer und Consumer müssen separat im Code erzeugt werden 

Wenn man sich nun Anwendungsszenarien vorstellt, in denen große Datenmengen an Events in möglichst kurzer Zeit verarbeitet werden sollen, und dabei auch noch die Events zueinander in Zusammenhang stehen, dann merkt man wie eine Applikation dabei schnell an ihre Grenzen kommt. 

Solche möglichen Szenarien könnten z.B. das Anbinden und Auswerten verschiedener Datenquellen mit unterschiedlichen Event-Strukturen in Echtzeit sein, was besonders oft im IoT-Bereich benötigt wird. Oder auch, wenn man im E-Commerce-Umfeld unterwegs ist, wäre die oft verwendete Produkt-Recommendation ein interessantes Szenario. 

Als möglichen Lösungsansatz dazu bietet sich dann Kafka Streams an.

Kafka Streams 

Was sind jetzt aber Kafka Streams bzw. was sind sie nicht? Kafka Streams sind auf jeden Fall nichts, was in irgendeiner Weise direkt auf den Kafka-Brokern läuft. Sie sind eine API, die auf den Kafka-Client-Bibliotheken aufsetzt und die man mittels normalen Dependency-Managements in seinen Code einbindet und dort auch ausprogrammiert. Wie auch bei der Producer-Consumer-API werden bei Kafka Streams die Datenströme zwischen den Topics verarbeitet bei denen natürlich auch Vor- und Nachteile zu beachten sind: 

  • Events können in parallelen Threads abgearbeitet werden 
  • Unterstützung von Stateless- und Stateful-Operationen 
  • Komplexe Muster können mit wenigen Zeilen Code geschrieben werden 
  • Keine Möglichkeit von Batch-Verarbeitung der Events 

Topologien 

Um mit den Events zu arbeiten, kommen bei Kafka Streams Topologien zum Einsatz. Diese sind gerichtete azyklische Graphen, mit denen man den Datenfluss der Events strukturiert und verarbeitet. Dabei können die Daten beliebig zwischen den einzelnen Verarbeitungsschritten fließen. 

Die einzelnen Verarbeitungsschritte nennt man Prozessoren. Diese erhalten Ihre Daten durch einen Upstream und geben nach der Verarbeitung ihre Daten durch einen Downstream wieder ab. Dabei gibt es 3 unterschiedliche Arten von Prozessoren: 

  • Source Prozessor – dieser erhält seinen Upstream direkt aus einem definierten Topic 
  • Sink Prozessor – der Downstream wird direkt an ein definiertes Topic geleitet 
  • Standard-Prozessor – erhält die Daten über einen oder mehrere Upstreams und gibt sie über einen Downstream weiter 

[[ IMAGE: topologiebaum ]] 

Kafka Stream DSL vs. Kafka Processor API 

Um nun mit Kafka Streams zu arbeiten und die entsprechenden Topologien aufzubauen, gibt es 2 unterschiedliche Möglichkeiten. Der erste und auch hauptsächlich gewählte Weg ist über die Kafka Stream DSL. Mit dieser leichtgewichtigen Fluent-API kann man mit relativ wenig Code bereits große Teile der Business-Logik abbilden, da sie die höhere Komplexität der Processor API abkapselt. Jeder der bereits Erfahrung mit der Java Stream API hat wird sich darin auch schnell zurechtfinden. Um den Einstieg leichter zu machen, werden alle Code-Beispiele in diesem Artikel auch mittels der Kafka Stream DSL sein. 

Das Gegenstück dazu ist dann die Kafka Processor API. Sie ist eher als eine Low-Level-API anzusehen. Man muss, wenn man damit arbeiten möchte, zwar wesentlich mehr Code schreiben, hat aber dafür mehr Flexibilität und Kontrolle beim Datenzugriff. Dies kann wichtig werden, wenn man mit der Kafka Stream DSL nicht mehr seine Prozessoren entsprechend anpassen kann, z.B. bei bestimmten zeitbasierten Operationen oder auch der Verarbeitung der Header-Daten von Events. 

Stateless- vs. Stateful-Verarbeitung

Generell handelt es sich ja bei Streaming erstmal um einen Strom einzelner Events, die nichts miteinander zu tun haben. Sie werden vom Consumer verarbeitet und danach wieder vergessen, sie werden also stateless behandelt. Was für Möglichkeiten haben wir aber, wenn wir die Events zueinander in Verbindung bringen möchten? 

[[ IMAGE: shopbestellung-events ]] 

Als Beispiel kann man sich eine Reihe Events von Shopbestellungen denken, aus denen man vielleicht ermitteln möchte, welche Produkte dabei am häufigsten bestellt wurden. Mit einem klassischem Consumer wäre dies ohne eine Form der Datenhaltung im Microservice selbst nicht möglich. 

Bei der Verwendung von Kafka Streams hat man die Möglichkeit, mit den sogenannten Local State Stores zu arbeiten, um Daten zu halten. Diese State Stores können entweder eine RocksDB, eine In-Memory-Hash-Map oder eine andere geeignete Datenstruktur sein. Auch die Möglichkeit des Zugriffs auf die einzelnen State Stores aus parallelen Kafka Stream Tasks ist sichergestellt. Ein netter Nebeneffekt bei der Verwendung von der Kafka Stream DSL ist auch, dass man bei vielen Stateful-Operationen sich nicht explizit um die Erstellung und Management des Local State Stores kümmern muss, sondern dass sich die API automatisch darum kümmert. 

Serdes 

Kafka Events werden in den Broker als ByteArrays geschrieben. Damit man die Events mit Kafka Streams verarbeiten kann, müssen die Events noch serialisiert bzw. deserialisiert werden. Dazu werden die sogenannten Serdes (SERializer/DESerializer) verwendet. Für viele einfache Datentypen werden bereits fertige Serdes mitgeliefert. Für die eigenen Events müssen diese Serdes noch erstellt werden. 

Code-Beispiele 

Da im folgenden nur Auszüge aus den Code-Bespielen gezeigt werden, besteht die Möglichkeit sich das komplette Beispiel-Projekt herunterzuladen, unter folgender URL: https://gitlab.com/sidion/demo/2023/javamagazin/kafka-stream-in-action 

Kafka Streaming Setup 

Um nun mit Kafka Streams zu arbeiten werden erstmal nur eine Dependency benötig 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-streams</artifactId> 
    <version>3.5.0</version> 
</dependency>

Das Grundgerüst für Kafka Streams ist mit wenigen Zeilen geschrieben: 

// Kafka-Properties 
Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "streaming-client01"; 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 

// Aufbau der Topologie mittels des StreamBuilders 
StreamsBuilder builder = new StreamsBuilder(); 

// Topologie aufbauen 
...... 

// Start streaming... 
Topology topology = builder.build(); 
KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); 
kafkaStreams.start();

Um nun einen Event Stream in einen SourceProcessor einzulesen, gibt es auf dem StreamsBuilder die Möglichkeit, über die stream()- bzw. table()-Methode die einlaufenden Events in einem KStream bzw. einer KTable zu verarbeiten. 

// Bestellungen in Stream einlesen 
KStream<String, BestellungEvent> bestellungen = builder.stream("bestellungen", Consumed.with(Serdes.String(), SerdeFactory.serdeForBestellungEvent())); 

Um einen KStream nach erfolgreicher Verarbeitung wieder mittels eines SinkProcessors wieder in ein Topic zu schreiben, geschieht dies über die to()-Methode auf dem KStream. 

// Ausgabe auf Topic 
filteredBestellEvents.to("topbestellungen", Produced.with(Serdes.String(), SerdeFactory.serdeForBestellungEvent()));

Stateless Operationen 

Ähnlich wie bei Java Streams können bei Kafka Streams die Events mittels verschiedenen stateless Operationen verarbeitet und transformiert werden. Die dabei am häufigsten verwendeten sind Methoden wie filter(), map(), flatMap() oder branch(). 

Um beispielsweise Bestellungen herauszufinden, die eine Gesamtsumme größer 300 EUR haben, kann dies mit einem einfachen Filtern geschehen. 

// Filtern aller Bestellungen mit Gesammtsummer > 300 
KStream<String, BestellungEvent> filteredBestellEvents = bestellungen.filter((key, val) -> val.getGesamtSumme() > 300);

Die filter()-Methode ist also ein neuer Prozessor der die relevanten Bestellungen als Downstream zurückgibt. Dieser Stream kann jetzt als neuer Upstream für einen weiteren Prozessor genutzt werden. 

Um einen Upstream in verschiedene Downstreams zu filtern, gibt es auch die Möglichkeit der branch()-Methode. Dazu wird auf dem Upstream die split()-Methode aufgerufen und anschließend die einzelnen Downstreams mittels branch() erstellt: 

// eingehende Artikel anhand ihrer Kategorie aufsplitten 
Map<String, KStream<String, ArtikelEvent>> branchedArtikelEventMap = artikelEvents.split(Named.as("branch-")) 
.branch((key, val) -> val.getKategorie().equals("Computer & Büro"), Branched.as("computer")) 
.branch((key, val) -> val.getKategorie().equals("Smartphones"), Branched.as("smartphones")) 
.branch((key, val) -> val.getKategorie().equals("Fotografie"), Branched.as("fotografie")) 
.defaultBranch();

Auch das Konvertieren von Events geht mittels map() bzw. flatMap() sehr einfach. Um beispielsweise anhand der eingehenden Bestellevents neue Events für die Lagerverwaltung zu generieren, können diese neuen Events mittels flatMap() generiert werden. 

// aus Bestellpositionen mittels flatMap die LagerEvents erstellen 
KStream<String, LagerEvent> lagerEventStream = bestellungenStream.flatMap((key, val) -> { 
List<KeyValue<String, LagerEvent>> lagerEvents = new ArrayList<>(); 
val.getBestellPositionen().forEach(bp -> lagerEvents.add(KeyValue.pair( 
bp.getIdx(), 
LagerEvent.builder() 
.anzahl(bp.getAnzahlArtikel()) 
.artikelIdx(bp.getArtikelEvent().getIdx()).build())) 
); 

return lagerEvents; 

});

Stateful Operationen 

Eine der großen Stärken von Kafka Streams liegt jedoch darin, Events miteinander in Verbindung zu bringen. Dies geschieht hauptsächlich beim Aggregieren oder Joinen von Events. Seite 6, Druckdatum: 18.12.2023, 15:13 Uhr 

Damit eine Aggregation ausgeführt werden kann, müssen die entsprechenden Events zuerst gruppiert und danach aggregiert werden. 

Für das Gruppieren der Events bietet sich dabei die groupBy()- bzw. die groupByKey()-Methode dafür an, welche dann als KGroupedStream mit der entsprechenden aggregate()-Methode weiterverarbeitet werden kann. 

// Bestellungen gruppieren anhand KundenIdx 
KGroupedStream<String, BestellungEvent> groupedOrderStream = bestellungen.groupByKey(Grouped.with(Serdes.String(), SerdeFactory.serdeForBestellungEvent())); 

// Bestellungen aggregieren 
KTable<String, BestellungEvent> aggregatedBestellungenByKunde = groupedOrderStream.aggregate(() -> 
new BestellungEvent(), 
(key, value, bestellungEvent) -> { 
bestellungEvent.setGesamtSumme(bestellungEvent.getGesamtSumme() + value.getGesamtSumme()); 
return bestellungEvent; 
}, 

Materialized.with(Serdes.String(), SerdeFactory.serdeForBestellungEvent()));

Natürlich ist es auch wichtig, Events unterschiedlichen Typs miteinander in Verbindung zu bringen. Dazu werden zwei Streams mit der join() Operation miteinander verbunden. Um z.B. zu ermitteln welche Gesamtbestellsumme pro Kunde vorliegt, können wir die eingehenden Bestellungen mit den Kundendaten joinen. Wichtig ist dabei zu wissen, dass man nur Events des gleichen Keys miteinander joinen kann. Daher ist oft zuerst nötig bei den entsprechenden Streams die Keys der Events „on the fly“ korrekt zu setzten. 

Im aktuellen Fall würde es also Sinn machen, den eingehenden Bestell-Events als Key die entsprechende Kunden-ID zu geben. Eine weit verbreitete Möglichkeit dazu besteht mit der selectKey()-Methode. 

// Bestellungen mit Key=Kunden-ID versehen 
KStream<String, BestellungEvent> bestellungenWithNewKey = 

bestellungenStream.selectKey((key, val) -> val.getKundenIdx());

Somit haben wir einen neuen Stream der Bestellungen, die jetzt als Key die entsprechende Kunden-ID haben. Diesen Stream können wir jetzt mit den Kundendaten joinen. Dazu wird ein entsprechendes ValueJoiner-Objekt benötigt, welches den eigentlichen Join durchführt. 

// Kunden in KTable einlesen 

KTable<String, KundeEvent> kundenTable = builder.table("kunden", Consumed.with(Serdes.String(), SerdeFactory.serdeForKundeEvent())); 

// ValueJoiner für Bestellung <--> Kunde 

ValueJoiner<BestellungEvent, KundeEvent, KundeEvent> valueJoiner = 

new ValueJoiner<>() { 

@Override 

public KundeEvent apply(BestellungEvent bestellungEvent, KundeEvent kundeEvent) { 

kundeEvent.setGesammtBestellSumme(bestellungEvent.getGesamtSumme()); 

return kundeEvent; 

} 

}; 

// Bestellungen mit Kunden joinen 

KStream<String, KundeEvent> joinedBestellungen = 

bestellungenWithNewKey.join(kundenTable, valueJoiner, Joined.with(Serdes.String(), SerdeFactory.serdeForBestellungEvent(), SerdeFactory.serdeForKundeEvent()));

Als Ergebnis bekommen wir einen Downstream mit den Informationen bzgl. der Gesamtbestellsumme pro Kunde. 

Stream-Table-Duality 

Wie in dem obigen Beispiel zu sehen, wurden die eingehenden Kunden-Events nicht als Stream, sondern als KTable eingelesen. Dies macht vor allem dann Sinn, wenn man mit Daten arbeitet die nur im neuesten Zustand benötigt werden. 

Bei einer KTable erzeugt jedes Event mit einem gleichen Key ein Update auf den bestehenden Record. Im Gegensatz dazu kann man den entsprechenden KStream als Changelog sehen, um die KTable zu erzeugen. Man spricht daher von der sogenannten Stream-Table-Duality. 

Gehalten werden die Daten für eine KTable standardmäßig im Local State Store für jeden Stream Prozess. Als Sonderfall dazu findet man noch die GlobalKTable. Hier werden die Daten zu jedem Node repliziert, was natürlich ein sehr teurer Prozess ist. Daher sollten man GlobalKTables auch nur sehr sparsam einsetzen. Also hauptsächlich nur bei Daten die sich kaum verändern wie z.B. Lookup-Tabellen. Seite 8, Druckdatum: 18.12.2023, 15:13 Uhr 

Testen der Streaming-Applikation 

Jeder, der mit Message Brokern arbeitet, kennt die Problematik, seine Anwendung sinnvoll testen zu können. Eigentlich benötigt man, um integrativ testen zu können, einen lauffähigen Broker oder zu mindestens einen ebenbürtigen Mock, wie z.B. die Kafka Testcontainers. Und da man bei Kafka Streams auch mit einem Kafka Broker arbeitet, besteht auch hier das Problem. 

Kafka Streams bietet für diesen Zweck eine eigene Test-Dependency an, welche den Event-Fluss simuliert. Somit können die einzelnen Topologien als Unit-Test, also ohne laufenden Kafka Broker getestet werden. 

<dependency> 

<groupId>org.apache.kafka</groupId> 

<artifactId>kafka-streams-test-utils</artifactId> 

<version>3.5.0</version> 

<scope>test</scope> 

</dependency>

Mit dieser Dependency hat man die Möglichkeit über einen TopolgyTestDriver, den entsprechenden Input- und Output-Topics die Business-Logik seiner Topologien zu testen. Ähnlich wie beim Grundgerüst für die Kafka Streams wird auch hier der StreamBuilder mit der zu testenden Topologie hochgezogen. 

@BeforeEach 

void setUp() { 

// Topologie aufbauen 

StreamsBuilder builder = new StreamsBuilder(); 

FilterStreamingTopology filterStreamingTopology = new FilterStreamingTopology(); 

filterStreamingTopology.createTopology(builder); 

Topology topology = builder.build(); 

// TestDriver und TestTopics initialisieren 

testDriver = new TopologyTestDriver(topology, new Properties()); 

inputTopic = testDriver.createInputTopic("bestellungen", Serdes.String().serializer(), SerdeFactory.serdeForBestellungEvent().serializer()); 

outputTopic = testDriver.createOutputTopic("topbestellungen", Serdes.String().deserializer(), 

SerdeFactory.serdeForBestellungEvent().deserializer()); 

}

Der eigentliche Unit-Test erzeugt dann die einzelnen Events, schiebt diese in die Input-Topics und vergleicht dann die entsprechenden Events im Output-Topic. 

@Test 

void testFilterTopology() { 

// given 

BestellungEvent b01 = BestellungEvent.builder().idx("b01").gesamtSumme(249).build(); 

BestellungEvent b02 = BestellungEvent.builder().idx("b02").gesamtSumme(350).build(); 

BestellungEvent b03 = BestellungEvent.builder().idx("b03").gesamtSumme(420).build(); 

BestellungEvent b04 = BestellungEvent.builder().idx("b04").gesamtSumme(270).build(); 

inputTopic.pipeInput(b01.getIdx(), b01); 

inputTopic.pipeInput(b02.getIdx(), b02); 

inputTopic.pipeInput(b03.getIdx(), b03); 

inputTopic.pipeInput(b04.getIdx(), b04); 

//when 

final Map<String, BestellungEvent> result = outputTopic.readKeyValuesToMap(); 

//then 

Assertions.assertEquals(2, result.size()); 

Assertions.assertEquals(350, result.get(b02.getIdx()).getGesamtSumme()); 

Assertions.assertEquals(420, result.get(b03.getIdx()).getGesamtSumme()); 

}

Durch die Verwendung der kafka-streams-test-util Bibliothek ist es also ohne weitere Implementierungen möglich, seine Streaming-Topologien ohne den Einsatz eines vorhandenen Message Broker zu testen. 

Fazit 

Kafka Streams ist eine leistungsstarke Stream-Verarbeitungsplattform, die dabei unterstützt, Datenströme effizient zu verarbeiten. Mit seiner einfachen Integration, Skalierbarkeit und Fehlertoleranz sowie der Unterstützung von Stateful Stream-Verarbeitung bietet Kafka Streams eine robuste Lösung für die Entwicklung von Streaming-Anwendungen. 

Für alle, die noch tiefer in die Materie jetzt eintauchen möchten, habe ich noch einen Buchtipp, der das Thema Kafka Streams umfassend behandelt.

William P. Bejeck Jr. Autor(en) William P. Bejeck Jr. 
Titel Kafka Streams In Action 
Untertitel Real-time apps and microservices with the Kafka Streams API 
Seiten 257 
Preis 29,99 $ 
Verlag Manning Verlag 
Jahr 2018 
ISBN 9781617294471