K
- type of record keyV
- type of record valueS
- type of state store (note: state stores always have key/value types <Bytes,byte[]>
public class Materialized<K,V,S extends StateStore> extends Object
StateStore
should be materialized.
You can either provide a custom StateStore
backend through one of the provided methods accepting a supplier
or use the default RocksDB backends by providing just a store name.
For example, you can read a topic as KTable
and force a state store materialization to access the content
via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Stores
Modifier and Type | Class and Description |
---|---|
static class |
Materialized.StoreType |
Modifier and Type | Method and Description |
---|---|
static <K,V,S extends StateStore> |
as(DslStoreSuppliers storeSuppliers)
Materialize a
StateStore with the given DslStoreSuppliers . |
static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(KeyValueBytesStoreSupplier supplier)
Materialize a
KeyValueStore using the provided KeyValueBytesStoreSupplier . |
static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(SessionBytesStoreSupplier supplier)
Materialize a
SessionStore using the provided SessionBytesStoreSupplier . |
static <K,V,S extends StateStore> |
as(String storeName)
Materialize a
StateStore with the given name. |
static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> |
as(WindowBytesStoreSupplier supplier)
Materialize a
WindowStore using the provided WindowBytesStoreSupplier . |
static <K,V,S extends StateStore> |
with(org.apache.kafka.common.serialization.Serde<K> keySerde,
org.apache.kafka.common.serialization.Serde<V> valueSerde)
Materialize a
StateStore with the provided key and value Serde s. |
Materialized<K,V,S> |
withCachingDisabled()
Disable caching for the materialized
StateStore . |
Materialized<K,V,S> |
withCachingEnabled()
Enable caching for the materialized
StateStore . |
Materialized<K,V,S> |
withKeySerde(org.apache.kafka.common.serialization.Serde<K> keySerde)
Set the keySerde the materialize
StateStore will use. |
Materialized<K,V,S> |
withLoggingDisabled()
Disable change logging for the materialized
StateStore . |
Materialized<K,V,S> |
withLoggingEnabled(Map<String,String> config)
Indicates that a changelog should be created for the store.
|
Materialized<K,V,S> |
withRetention(Duration retention)
Configure retention period for window and session stores.
|
Materialized<K,V,S> |
withStoreType(DslStoreSuppliers storeSuppliers)
Set the type of the materialized
StateStore . |
Materialized<K,V,S> |
withValueSerde(org.apache.kafka.common.serialization.Serde<V> valueSerde)
Set the valueSerde the materialized
StateStore will use. |
public static <K,V,S extends StateStore> Materialized<K,V,S> as(DslStoreSuppliers storeSuppliers)
StateStore
with the given DslStoreSuppliers
.K
- key type of the storeV
- value type of the storeS
- type of the StateStore
storeSuppliers
- the type of the state storeMaterialized
instance with the given storeNamepublic static <K,V,S extends StateStore> Materialized<K,V,S> as(String storeName)
StateStore
with the given name.K
- key type of the storeV
- value type of the storeS
- type of the StateStore
storeName
- the name of the underlying KTable
state store; valid characters are ASCII
alphanumerics, '.', '_' and '-'.Materialized
instance with the given storeNamepublic static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(WindowBytesStoreSupplier supplier)
WindowStore
using the provided WindowBytesStoreSupplier
.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Window stores are required to retain windows at least as long as (window size + window grace period).
Stores constructed via Stores
already satisfy this contract.K
- key type of the storeV
- value type of the storesupplier
- the WindowBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given supplierpublic static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(SessionBytesStoreSupplier supplier)
SessionStore
using the provided SessionBytesStoreSupplier
.
Important: Custom subclasses are allowed here, but they should respect the retention contract:
Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
Stores constructed via Stores
already satisfy this contract.K
- key type of the storeV
- value type of the storesupplier
- the SessionBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given sup
plierpublic static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(KeyValueBytesStoreSupplier supplier)
KeyValueStore
using the provided KeyValueBytesStoreSupplier
.K
- key type of the storeV
- value type of the storesupplier
- the KeyValueBytesStoreSupplier
used to materialize the storeMaterialized
instance with the given supplierpublic static <K,V,S extends StateStore> Materialized<K,V,S> with(org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StateStore
with the provided key and value Serde
s.
An internal name will be used for the store.K
- key typeV
- value typeS
- store typekeySerde
- the key Serde
to use. If the Serde
is null, then the default key
serde from configs will be usedvalueSerde
- the value Serde
to use. If the Serde
is null, then the default value
serde from configs will be usedMaterialized
instance with the given key and value serdespublic Materialized<K,V,S> withValueSerde(org.apache.kafka.common.serialization.Serde<V> valueSerde)
StateStore
will use.valueSerde
- the value Serde
to use. If the Serde
is null, then the default value
serde from configs will be used. If the serialized bytes is null for put operations,
it is treated as delete operationpublic Materialized<K,V,S> withKeySerde(org.apache.kafka.common.serialization.Serde<K> keySerde)
StateStore
will use.keySerde
- the key Serde
to use. If the Serde
is null, then the default key
serde from configs will be usedpublic Materialized<K,V,S> withLoggingEnabled(Map<String,String> config)
Note: Any unrecognized configs will be ignored.
config
- any configs that should be applied to the changelogpublic Materialized<K,V,S> withLoggingDisabled()
StateStore
.public Materialized<K,V,S> withCachingEnabled()
StateStore
.public Materialized<K,V,S> withCachingDisabled()
StateStore
.public Materialized<K,V,S> withRetention(Duration retention) throws IllegalArgumentException
as(SessionBytesStoreSupplier)
or as(WindowBytesStoreSupplier)
).
Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
from window-start through window-end, and for the entire grace period. If not specified, the retention
period would be set as the window length (from window-start through window-end) plus the grace period.retention
- the retention timeIllegalArgumentException
- if retention is negative or can't be represented as long milliseconds
public Materialized<K,V,S> withStoreType(DslStoreSuppliers storeSuppliers) throws IllegalArgumentException
StateStore
.storeSuppliers
- the store type Materialized.StoreType
to use.IllegalArgumentException
- if store supplier is also pre-configured