K - type of record keyV - type of record valueS - type of state store (note: state stores always have key/value types <Bytes,byte[]>public class Materialized<K,V,S extends StateStore> extends Object
StateStore should be materialized. You can either provide a custom StateStore backend through one of the provided methods accepting a supplier or use the default RocksDB backends by providing just a store name. For example, you can read a topic as KTable and force a state store materialization to access the content via Interactive Queries API:
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, Integer> table = builder.table(
"topicName",
Materialized.as("queryable-store-name"));
Stores| Modifier and Type | Class and Description |
|---|---|
static class | Materialized.StoreType |
| Modifier and Type | Field and Description |
|---|---|
Materialized.StoreType | storeType |
| Modifier and Type | Method and Description |
|---|---|
static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> | as(KeyValueBytesStoreSupplier supplier)Materialize a KeyValueStore using the provided KeyValueBytesStoreSupplier. |
static <K,V,S extends StateStore> | as(Materialized.StoreType storeType)Materialize a StateStore with the given Materialized.StoreType. |
static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> | as(SessionBytesStoreSupplier supplier)Materialize a SessionStore using the provided SessionBytesStoreSupplier. |
static <K,V,S extends StateStore> | as(String storeName)Materialize a StateStore with the given name. |
static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> | as(WindowBytesStoreSupplier supplier)Materialize a WindowStore using the provided WindowBytesStoreSupplier. |
static <K,V,S extends StateStore> | with(org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Materialize a StateStore with the provided key and value Serdes. |
Materialized<K,V,S> | withCachingDisabled()Disable caching for the materialized StateStore. |
Materialized<K,V,S> | withCachingEnabled()Enable caching for the materialized StateStore. |
Materialized<K,V,S> | withKeySerde(org.apache.kafka.common.serialization.Serde<K> keySerde)Set the keySerde the materialize StateStore will use. |
Materialized<K,V,S> | withLoggingDisabled()Disable change logging for the materialized StateStore. |
Materialized<K,V,S> | withLoggingEnabled(Map<String,String> config)Indicates that a changelog should be created for the store. |
Materialized<K,V,S> | withRetention(Duration retention)Configure retention period for window and session stores. |
Materialized<K,V,S> | withStoreType(Materialized.StoreType storeType)Set the type of the materialized StateStore. |
Materialized<K,V,S> | withValueSerde(org.apache.kafka.common.serialization.Serde<V> valueSerde)Set the valueSerde the materialized StateStore will use. |
public Materialized.StoreType storeType
public static <K,V,S extends StateStore> Materialized<K,V,S> as(Materialized.StoreType storeType)
StateStore with the given Materialized.StoreType.K - key type of the storeV - value type of the storeS - type of the StateStorestoreType - the type of the state storeMaterialized instance with the given storeNamepublic static <K,V,S extends StateStore> Materialized<K,V,S> as(String storeName)
StateStore with the given name.K - key type of the storeV - value type of the storeS - type of the StateStorestoreName - the name of the underlying KTable state store; valid characters are ASCII alphanumerics, '.', '_' and '-'.Materialized instance with the given storeNamepublic static <K,V> Materialized<K,V,WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(WindowBytesStoreSupplier supplier)
WindowStore using the provided WindowBytesStoreSupplier. Important: Custom subclasses are allowed here, but they should respect the retention contract: Window stores are required to retain windows at least as long as (window size + window grace period). Stores constructed via Stores already satisfy this contract.K - key type of the storeV - value type of the storesupplier - the WindowBytesStoreSupplier used to materialize the storeMaterialized instance with the given supplierpublic static <K,V> Materialized<K,V,SessionStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(SessionBytesStoreSupplier supplier)
SessionStore using the provided SessionBytesStoreSupplier. Important: Custom subclasses are allowed here, but they should respect the retention contract: Session stores are required to retain windows at least as long as (session inactivity gap + session grace period). Stores constructed via Stores already satisfy this contract.K - key type of the storeV - value type of the storesupplier - the SessionBytesStoreSupplier used to materialize the storeMaterialized instance with the given sup plierpublic static <K,V> Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> as(KeyValueBytesStoreSupplier supplier)
KeyValueStore using the provided KeyValueBytesStoreSupplier.K - key type of the storeV - value type of the storesupplier - the KeyValueBytesStoreSupplier used to materialize the storeMaterialized instance with the given supplierpublic static <K,V,S extends StateStore> Materialized<K,V,S> with(org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StateStore with the provided key and value Serdes. An internal name will be used for the store.K - key typeV - value typeS - store typekeySerde - the key Serde to use. If the Serde is null, then the default key serde from configs will be usedvalueSerde - the value Serde to use. If the Serde is null, then the default value serde from configs will be usedMaterialized instance with the given key and value serdespublic Materialized<K,V,S> withValueSerde(org.apache.kafka.common.serialization.Serde<V> valueSerde)
StateStore will use.valueSerde - the value Serde to use. If the Serde is null, then the default value serde from configs will be used. If the serialized bytes is null for put operations, it is treated as delete operationpublic Materialized<K,V,S> withKeySerde(org.apache.kafka.common.serialization.Serde<K> keySerde)
StateStore will use.keySerde - the key Serde to use. If the Serde is null, then the default key serde from configs will be usedpublic Materialized<K,V,S> withLoggingEnabled(Map<String,String> config)
Note: Any unrecognized configs will be ignored.
config - any configs that should be applied to the changelogpublic Materialized<K,V,S> withLoggingDisabled()
StateStore.public Materialized<K,V,S> withCachingEnabled()
StateStore.public Materialized<K,V,S> withCachingDisabled()
StateStore.public Materialized<K,V,S> withRetention(Duration retention) throws IllegalArgumentException
as(SessionBytesStoreSupplier) or as(WindowBytesStoreSupplier)). Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period. If not specified, the retention period would be set as the window length (from window-start through window-end) plus the grace period.retention - the retention timeIllegalArgumentException - if retention is negative or can't be represented as long millisecondspublic Materialized<K,V,S> withStoreType(Materialized.StoreType storeType) throws IllegalArgumentException
StateStore.storeType - the store type Materialized.StoreType to use.IllegalArgumentException - if store supplier is also pre-configured