public class StreamsBuilder extends Object
StreamsBuilder
provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
It is a requirement that the processing logic (Topology
) be defined in a deterministic way,
as in, the order in which all operators are added must be predictable and the same across all application
instances.
Topologies are only identical if all operators are added in the same order.
If different KafkaStreams
instances of the same application build different topologies the result may be
incompatible runtime code and unexpected results or errors
Topology
,
KStream
,
KTable
,
GlobalKTable
Modifier and Type | Field and Description |
---|---|
protected org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder |
internalStreamsBuilder |
protected org.apache.kafka.streams.processor.internals.InternalTopologyBuilder |
internalTopologyBuilder
The topology's internal builder.
|
protected Topology |
topology
The actual topology that is constructed by this StreamsBuilder.
|
Constructor and Description |
---|
StreamsBuilder() |
StreamsBuilder(TopologyConfig topologyConfigs)
Create a
StreamsBuilder instance. |
Modifier and Type | Method and Description |
---|---|
<KIn,VIn> StreamsBuilder |
addGlobalStore(StoreBuilder<?> storeBuilder,
String topic,
Consumed<KIn,VIn> consumed,
ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)
Adds a global
StateStore to the topology. |
<K,V> StreamsBuilder |
addGlobalStore(StoreBuilder<?> storeBuilder,
String topic,
Consumed<K,V> consumed,
ProcessorSupplier<K,V> stateUpdateSupplier)
Deprecated.
Since 2.7.0; use
addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) instead. |
StreamsBuilder |
addStateStore(StoreBuilder<?> builder)
Adds a state store to the underlying
Topology . |
Topology |
build()
Returns the
Topology that represents the specified processing logic. |
Topology |
build(Properties props)
Returns the
Topology that represents the specified processing logic and accepts
a Properties instance used to indicate whether to optimize topology or not. |
protected Topology |
getNewTopology(TopologyConfig topologyConfigs) |
<K,V> GlobalKTable<K,V> |
globalTable(String topic)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic,
Consumed<K,V> consumed)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic,
Consumed<K,V> consumed,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a
GlobalKTable for the specified topic. |
<K,V> KStream<K,V> |
stream(Collection<String> topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Collection<String> topics,
Consumed<K,V> consumed)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Pattern topicPattern,
Consumed<K,V> consumed)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(String topic)
Create a
KStream from the specified topic. |
<K,V> KStream<K,V> |
stream(String topic,
Consumed<K,V> consumed)
Create a
KStream from the specified topic. |
<K,V> KTable<K,V> |
table(String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
Consumed<K,V> consumed)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
Consumed<K,V> consumed,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a
KTable for the specified topic. |
protected final Topology topology
protected final org.apache.kafka.streams.processor.internals.InternalTopologyBuilder internalTopologyBuilder
protected final org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder internalStreamsBuilder
public StreamsBuilder()
public StreamsBuilder(TopologyConfig topologyConfigs)
StreamsBuilder
instance.topologyConfigs
- the streams configs that apply at the topology level. Please refer to TopologyConfig
for more detailprotected Topology getNewTopology(TopologyConfig topologyConfigs)
public <K,V> KStream<K,V> stream(String topic)
KStream
from the specified topic.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and default key and value
deserializers as specified in the config
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topic must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
topic
- the topic name; cannot be null
KStream
for the specified topicpublic <K,V> KStream<K,V> stream(String topic, Consumed<K,V> consumed)
KStream
from the specified topic.
The "auto.offset.reset"
strategy, TimestampExtractor
, key and value deserializers
are defined by the options in Consumed
are used.
Note that the specified input topic must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
public <K,V> KStream<K,V> stream(Collection<String> topics)
KStream
from the specified topics.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and default key and value
deserializers as specified in the config
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
topics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Collection<String> topics, Consumed<K,V> consumed)
KStream
from the specified topics.
The "auto.offset.reset"
strategy, TimestampExtractor
, key and value deserializers
are defined by the options in Consumed
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
public <K,V> KStream<K,V> stream(Pattern topicPattern)
KStream
from the specified topic pattern.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and default key and value
deserializers as specified in the config
are used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics. This also means that the work
will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition
count of any matching topic rather than the total number of partitions across all topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
topicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Pattern topicPattern, Consumed<K,V> consumed)
KStream
from the specified topic pattern.
The "auto.offset.reset"
strategy, TimestampExtractor
, key and value deserializers
are defined by the options in Consumed
are used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics. This also means that the work
will not be parallelized for multiple topics, and the number of tasks will scale with the maximum partition
count of any matching topic rather than the total number of partitions across all topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the data before any key based operation
(like aggregation or join) is applied to the returned KStream
.
public <K,V> KTable<K,V> table(String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
for the specified topic.
The "auto.offset.reset"
strategy, TimestampExtractor
, key and value deserializers
are defined by the options in Consumed
are used.
Input records
with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
using the given
Materialized
instance.
An internal changelog topic is created by default. Because the source topic can
be used for recovery, you can avoid creating the changelog topic by setting
the "topology.optimization"
to "all"
in the StreamsConfig
.
You should only specify serdes in the Consumed
instance as these will also be used to overwrite the
serdes in Materialized
, i.e.,
streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
K key = "some-key";
ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.metadataForAllStreamsClients()
to
query the value of the key on a parallel running instance of your Kafka Streams application.topic
- the topic name; cannot be null
consumed
- the instance of Consumed
used to define optional parameters; cannot be null
materialized
- the instance of Materialized
used to materialize a state store; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(String topic)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy and default key and value deserializers as specified in the
config
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that store name may not be queryable through Interactive Queries.
An internal changelog topic is created by default. Because the source topic can
be used for recovery, you can avoid creating the changelog topic by setting
the "topology.optimization"
to "all"
in the StreamsConfig
.
topic
- the topic name; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(String topic, Consumed<K,V> consumed)
KTable
for the specified topic.
The "auto.offset.reset"
strategy, TimestampExtractor
, key and value deserializers
are defined by the options in Consumed
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that store name may not be queryable through Interactive Queries.
An internal changelog topic is created by default. Because the source topic can
be used for recovery, you can avoid creating the changelog topic by setting
the "topology.optimization"
to "all"
in the StreamsConfig
.
public <K,V> KTable<K,V> table(String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy as specified in the config
are used.
Key and value deserializers as defined by the options in Materialized
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
using the Materialized
instance.
An internal changelog topic is created by default. Because the source topic can
be used for recovery, you can avoid creating the changelog topic by setting
the "topology.optimization"
to "all"
in the StreamsConfig
.
topic
- the topic name; cannot be null
materialized
- the instance of Materialized
used to materialize a state store; cannot be null
KTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic, Consumed<K,V> consumed)
GlobalKTable
for the specified topic.
Input records
with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that store name may not be queryable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
or Consumed
.
topic
- the topic name; cannot be null
consumed
- the instance of Consumed
used to define optional parametersGlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic)
GlobalKTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that store name may not be queryable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.
topic
- the topic name; cannot be null
GlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic, Consumed<K,V> consumed, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
GlobalKTable
for the specified topic.
Input KeyValue
pairs with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
configured with
the provided instance of Materialized
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
You should only specify serdes in the Consumed
instance as these will also be used to overwrite the
serdes in Materialized
, i.e.,
streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
K key = "some-key";
ValueAndTimestamp<V> valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
or Consumed
.topic
- the topic name; cannot be null
consumed
- the instance of Consumed
used to define optional parameters; can't be null
materialized
- the instance of Materialized
used to materialize a state store; cannot be null
GlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
GlobalKTable
for the specified topic.
Input KeyValue
pairs with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
configured with
the provided instance of Materialized
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local ReadOnlyKeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore());
K key = "some-key";
ValueAndTimestamp<V> valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.topic
- the topic name; cannot be null
materialized
- the instance of Materialized
used to materialize a state store; cannot be null
GlobalKTable
for the specified topicpublic StreamsBuilder addStateStore(StoreBuilder<?> builder)
Topology
.
It is required to connect state stores to Processors
,
Transformers
,
or ValueTransformers
before they can be used.
builder
- the builder used to obtain this state store StateStore
instanceTopologyException
- if state store supplier is already added@Deprecated public <K,V> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<K,V> consumed, ProcessorSupplier<K,V> stateUpdateSupplier)
addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)
instead.StateStore
to the topology.
The StateStore
sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore
per Kafka Streams instance.
A SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier
will be used to create an ProcessorNode
that will receive all
records forwarded from the SourceNode
. NOTE: you should not use the Processor
to insert transformed records into
the global state store. This store uses the source topic as changelog and during restore will insert records directly
from the source.
This ProcessorNode
should be used to keep the StateStore
up-to-date.
The default TimestampExtractor
as specified in the config
is used.
It is not required to connect a global store to Processors
,
Transformers
,
or ValueTransformer
; those have read-only access to all global stores by default.
The supplier should always generate a new instance each time ProcessorSupplier.get()
gets called. Creating
a single Processor
object and returning the same object reference in ProcessorSupplier.get()
would be
a violation of the supplier pattern and leads to runtime exceptions.
storeBuilder
- user defined StoreBuilder
; can't be null
topic
- the topic to source the data fromconsumed
- the instance of Consumed
used to define optional parameters; can't be null
stateUpdateSupplier
- the instance of ProcessorSupplier
TopologyException
- if the processor of state is already registeredpublic <KIn,VIn> StreamsBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String topic, Consumed<KIn,VIn> consumed, ProcessorSupplier<KIn,VIn,Void,Void> stateUpdateSupplier)
StateStore
to the topology.
The StateStore
sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore
per Kafka Streams instance.
A SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier
} will be used to create an
Processor
that will receive all records forwarded from the SourceNode
.
The supplier should always generate a new instance. Creating a single Processor
object
and returning the same object reference in ProcessorSupplier.get()
is a
violation of the supplier pattern and leads to runtime exceptions.
NOTE: you should not use the Processor
to insert transformed records into
the global state store. This store uses the source topic as changelog and during restore will insert records directly
from the source.
This Processor
should be used to keep the StateStore
up-to-date.
The default TimestampExtractor
as specified in the config
is used.
It is not required to connect a global store to the Processors
,
Transformers
, or ValueTransformer
; those have read-only access to all global stores by default.
storeBuilder
- user defined StoreBuilder
; can't be null
topic
- the topic to source the data fromconsumed
- the instance of Consumed
used to define optional parameters; can't be null
stateUpdateSupplier
- the instance of ProcessorSupplier
TopologyException
- if the processor of state is already registeredpublic Topology build()
Topology
that represents the specified processing logic.
Note that using this method means no optimizations are performed.Topology
that represents the specified processing logicpublic Topology build(Properties props)
Topology
that represents the specified processing logic and accepts
a Properties
instance used to indicate whether to optimize topology or not.props
- the Properties
used for building possibly optimized topologyTopology
that represents the specified processing logic