Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Testing Streams Code¶
Table of Contents
Importing the Test Utilities¶
To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
Here is an example pom.xml
snippet when using Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>1.1.1-cp2</version>
<scope>test</scope>
</dependency>
Testing a Streams Application¶
When you create a Streams application, you create a Topology
, either using the StreamsBuilder DSL
or using the low-level Processor API. Normally, you run the topology using the KafkaStreams
class, which connects
to your broker and begins processing when you call start()
.
For testing, though, running a broker and making sure to clean up state between tests adds a lot of complexity and time.
Streams provides the TopologyTestDriver
in the kafka-streams-test-utils
package as a drop-in replacement for the KafkaStreams
class.
It has no external system dependencies, and it also processes input synchronously, so you can verify the results immediately after providing input.
There are hooks for verifying data sent to output topics, and you can also query state stores maintained by your application under test.
To set it up:
// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();
// setup test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
The test driver accepts ConsumerRecords
with key and value type byte[]
.
Because byte[]
types can be problematic, you can use the ConsumerRecordFactory
to generate records by providing regular Java types for key and values and the corresponding serializers.
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
testDriver.pipe(factory.create("key", 42L));
For result verification, you can specify corresponding deserializers when reading the output record from the driver so that you don’t have to deal with the default types (byte[]
).
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(
"output-topic",
new StringDeserializer(),
new LongDeserializer()
);
ProducerRecord
contains all the record metadata in addition to the key and value, which can make it awkward to use with test frameworks’ equality checks.
Instead, consider using the OutputVerifier
methods to compare just some parts of the records.
// throws AssertionError if key or value does not match
OutputVerifier.compareKeyValue(outputRecord, "key", 42L);
TopologyTestDriver
supports punctuations, too.
Event-time punctuations are triggered automatically based on the processed records’ timestamps.
Wall-clock-time punctuations can also be triggered by advancing the test driver’s wall-clock time.
The driver’s wall-clock time must be advanced manually (this is for test stability).
testDriver.advanceWallClockTime(20L);
// supposing that a scheduled punctuator would emit this record...
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(
"output-topic",
new StringDeserializer(),
new StringDeserializer()
);
OutputVerifier.compareKeyValue(outputRecord, "triggered-key", "triggered-value");
Additionally, state stores are accessible via the test driver before or after a test. Accessing stores before a test is useful to pre-populate a store with some initial values. After data was processed, expected updates to the store can be verified.
KeyValueStore store = testDriver.getKeyValueStore("store-name");
assertEquals("some value", store.get("some key"));
Always close the test driver at the end to make sure all resources are released properly.
testDriver.close();
Example¶
The following example demonstrates how to use the test driver and helper classes. The example creates a topology that computes the maximum value per key using a key-value-store. While processing, no output is generated, but only the store is updated. Output is only sent downstream based on event-time and wall-clock punctuations.
private TopologyTestDriver testDriver;
private KeyValueStore<String, Long> store;
private StringDeserializer stringDeserializer = new StringDeserializer();
private LongDeserializer longDeserializer = new LongDeserializer();
private ConsumerRecordFactory<String, Long> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new LongSerializer());
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("aggStore"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
"aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");
// setup test driver
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
testDriver = new TopologyTestDriver(topology, config);
// pre-populate store
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void shouldFlushStoreForFirstInput() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
@Test
public void shouldNotUpdateStoreForSmallerValue() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
Assert.assertThat(store.get("a"), equalTo(21L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
@Test
public void shouldNotUpdateStoreForLargerValue() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
Assert.assertThat(store.get("a"), equalTo(42L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
@Test
public void shouldUpdateStoreForNewKey() {
testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
Assert.assertThat(store.get("b"), equalTo(21L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
@Test
public void shouldPunctuateIfEvenTimeAdvances() {
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 10000L));
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
testDriver.advanceWallClockTime(60000);
OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
}
public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
@Override
public Processor<String, Long> get() {
return new CustomMaxAggregator();
}
}
public class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;
private KeyValueStore<String, Long> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
context.schedule(60000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
flushStore();
}
});
context.schedule(10000, PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long timestamp) {
flushStore();
}
});
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
}
@Override
public void process(String key, Long value) {
Long oldValue = store.get(key);
if (oldValue == null || value > oldValue) {
store.put(key, value);
}
}
private void flushStore() {
KeyValueIterator<String, Long> it = store.all();
while (it.hasNext()) {
KeyValue<String, Long> next = it.next();
context.forward(next.key, next.value);
}
}
@Override
public void punctuate(long timestamp) {} // deprecated; not used
@Override
public void close() {}
}