Streams コードのテスト

テストユーティリティのインポート

Kafka Streams アプリケーションをテストするために、Apache Kafka® には、テストコードベースに通常の依存関係として追加できる test-utils アーティファクトが用意されています。

以下は、Maven を使用する場合の pom.xml のスニペットの例です。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>7.1.1-ccs</version>
    <scope>test</scope>
</dependency>

Streams アプリケーションのテスト

Streams アプリケーションの作成時には、StreamsBuilder DSL か低レベルの Processor API のどちらかを使用して、Topology を作成します。通常、トポロジーを実行するには KafkaStreams クラスを使用します。このクラスはブローカーに接続し、start() が呼び出されたときに処理を開始します。しかし、テストの目的でブローカーを実行し、テスト間で確実にステートをクリーンアップしようとすると、複雑さと所要時間が大幅に増加します。

Streams では、KafkaStreams クラスと差し替えることのできる TopologyTestDriverkafka-streams-test-utils パッケージに含まれています。これには外部システムとの依存関係はありません。また、入力は同期的に処理されるため、入力を指定するとすぐに結果を検証できます。出力トピックに送信されたデータは、フックを使用して検証できます。また、テスト中のアプリケーションが管理しているステートストアに対してクエリを実行することもできます。

以下に設定方法を示します。

// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();

// setup test driver
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

レコードをテストドライバにパイプするには、トポロジーの入力トピックごとに TestInputTopic を作成します。

TestInputTopic<String, Integer> inputTopic = testDriver.createInputTopic(
    "input-topic",
    new StringSerializer(),
    new IntegerSerializer());
inputTopic.pipeInput("key", 42);

結果を検証するには、まず TestOutputTopic を作成します。

TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic(
    "result-topic",
    new StringDeserializer(),
    new LongDeserializer());

レコード値だけを検証するか、キーと値を検証するか、またはタイムスタンプとヘッダー情報を含む TestRecord 全体を検証できます。TestOutputTopic#readKeyValuesToMap() を使用して、出力トピックを "テーブル" として消費することもできます。

// user assertion library of your choice to verify output
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));

TopologyTestDriver では区切りもサポートされます。処理されたレコードのタイムスタンプに基づいて、イベント時刻で区切りが自動的にトリガーされます。テストドライバの実際の時刻を進めると、実際の時刻で区切りをトリガーすることもできます。ドライバの実際の時刻は手動で進める必要があります(これはテストの安定性を保つためです)。

testDriver.advanceWallClockTime(Duration.ofMillis(20L));
// supposing that a scheduled punctuator would emit this record...
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("triggered-key", "triggered-value")));

また、テストの前後にテストドライバからステートストアにアクセスできます。テスト前のストアへのアクセスは、ストアに事前に初期値を設定するために役立ちます。データが処理された後は、ストアが想定どおりに更新されたかどうかを検証できます。

KeyValueStore store = testDriver.getKeyValueStore("store-name");
assertEquals("some value", store.get("some key"));

最後には必ずテストドライバを終了して、すべてのリソースが正しく解放されるようにしてください。

testDriver.close();

以下の例では、テストドライバとヘルパークラスの使用方法を示します。この例では、キーと値のストアを使用して、キーごとの最大値を計算するトポロジーを作成します。処理中に出力が生成されることはなく、ストアだけが更新されます。出力は、イベント時刻および実際の時間の区切りに基づいてダウンストリームにのみ送信されます。

private TopologyTestDriver testDriver;
private TestInputTopic<String, Long> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store;

private Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde();

@Before
public void setup() {
    final Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
    topology.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("aggStore"),
            Serdes.String(),
            Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
        "aggregator");
    topology.addSink("sinkProcessor", "result-topic", "aggregator");

    // setup test driver
    final Properties props = new Properties();
    props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation");
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
    testDriver = new TopologyTestDriver(topology, props);

    // setup test topics
    inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
    outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());

    // pre-populate store
    store = testDriver.getKeyValueStore("aggStore");
    store.put("a", 21L);
}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void shouldFlushStoreForFirstInput() {
    inputTopic.pipeInput("a", 1L);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForSmallerValue() {
    inputTopic.pipeInput("a", 1L);
    assertThat(store.get("a"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldNotUpdateStoreForLargerValue() {
    inputTopic.pipeInput("a", 42L);
    assertThat(store.get("a"), equalTo(42L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldUpdateStoreForNewKey() {
    inputTopic.pipeInput("b", 21L);
    assertThat(store.get("b"), equalTo(21L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfEvenTimeAdvances() {
    final Instant recordTime = Instant.now();
    inputTopic.pipeInput("a", 1L, recordTime);
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));

    inputTopic.pipeInput("a", 1L, recordTime);
    assertThat(outputTopic.isEmpty(), is(true));

    inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
    testDriver.advanceWallClockTime(Duration.ofSeconds(60));
    assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
    assertThat(outputTopic.isEmpty(), is(true));
}

public static class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
    @Override
    public Processor<String, Long> get() {
        return new CustomMaxAggregator();
    }
}

public static class CustomMaxAggregator implements Processor<String, Long> {
    ProcessorContext context;
    private KeyValueStore<String, Long> store;

    @SuppressWarnings("unchecked")
    @Override
    public void init(final ProcessorContext context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
        context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
        store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
    }

    @Override
    public void process(final String key, final Long value) {
        final Long oldValue = store.get(key);
        if (oldValue == null || value > oldValue) {
            store.put(key, value);
        }
    }

    private void flushStore() {
        final KeyValueIterator<String, Long> it = store.all();
        while (it.hasNext()) {
            final KeyValue<String, Long> next = it.next();
            context.forward(next.key, next.value);
        }
    }

    @Override
    public void close() {}
}

プロセッサーの単体テスト

Processor API を使用すると、カスタムの ProcessorTransformer、または ValueTransformer の実装を定義できます。

これらのクラスは結果を返さずに ProcessorContext に転送するため、単体テストでは、転送されたデータを検査用にキャプチャできる疑似的なコンテキストが必要になります。

この目的で、Streams の kafka-streams-test-utils には MockProcessorContext が用意されています。

まず、プロセッサーをインスタンス化し、モックコンテキストで初期化します。

final Processor processorUnderTest = ...;
final MockProcessorContext context = new MockProcessorContext();
processorUnderTest.init(context);

プロセッサーに構成を渡したり、デフォルトの serde を設定したりする必要がある場合は、構成を指定してモックを作成できます。

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
config.put("some.other.config", "some config value");
final MockProcessorContext context = new MockProcessorContext(config);

モックは、プロセッサーが転送するすべての値をキャプチャします。これらについてアサーションを作成できます。

processorUnderTest.process("key", "value");

final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(forwarded.next().keyValue(), new KeyValue<>(..., ...));
assertFalse(forwarded.hasNext());

// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
context.resetForwards();

assertEquals(context.forwarded().size(), 0);

プロセッサーから特定の子プロセッサーに転送している場合は、キャプチャされたデータのコンテキストを子の名前で照会できます。

final List<CapturedForward> captures = context.forwarded("childProcessorName");

モックでは、プロセッサーがコンテキストの commit() を呼び出したかどうかもキャプチャします。

assertTrue(context.committed());

// commit captures can also be reset.
context.resetCommit();

assertFalse(context.committed());

プロセッサーのロジックがレコードのメタデータ(トピック、パーティション、オフセット、またはタイムスタンプ)に依存している場合は、それらをコンテキストに設定できます。すべてを一度に設定することも、個別に設定することもできます。

context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
context.setTopic("topicName");
context.setPartition(0);
context.setOffset(0L);
context.setTimestamp(0L);

いったん値が設定されると、コンテキストは、新しい値が設定されるまで同じ値を返し続けます。


パンクチュエーターがステートフルな場合は、モックコンテキストにステートストアを登録できます。モックコンテキストは changelog やステートディレクトリなどを "管理しない" ため、適切な種類(KeyValueWindowed、または Session )のシンプルなインメモリーストアを使用することをお勧めします。

final KeyValueStore<String, Integer> store =
    Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("myStore"),
            Serdes.String(),
            Serdes.Integer()
        )
        .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
        .build();
store.init(context, store);
context.register(store, /*parameter unused in mock*/ null);

プロセッサーは、定期的なタスクを処理するためにパンクチュエーターをスケジュールできます。モックコンテキストでは、パンクチュエーターが自動的に実行されることはありませんが、スケジュールの呼び出しはキャプチャされるため、パンクチュエーターのスケジュール動作の単体テストを手動で行うことができます。

final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final long interval = capturedPunctuator.getIntervalMs();
final PunctuationType type = capturedPunctuator.getType();
final boolean cancelled = capturedPunctuator.cancelled();
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(/*timestamp*/ 0L);

スケジュールされたパンクチュエーターの自動呼び出しを行うテストを作成する必要がある場合は、そのプロセッサーを含む単純なトポロジーを用意して TopologyTestDriver を使用する必要があります。

おすすめのリソース

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。