public final class Stores extends Object
When using the high-level DSL, i.e., StreamsBuilder, users create StoreSuppliers that can be further customized via Materialized. For example, a topic read as KTable can be materialized into an in-memory store with custom key/value serdes and caching disabled:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"topicName",
Materialized.<Long,String>as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
.withCachingDisabled());
When using the Processor API, i.e., Topology, users create StoreBuilders that can be attached to Processors. For example, you can create a windowed RocksDB store with custom changelog topic configuration like:
Topology topology = new Topology();
topology.addProcessor("processorName", ...);
Map<String,String> topicConfig = new HashMap<>();
StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
.windowStoreBuilder(
Stores.persistentWindowStore("queryable-store-name", ...),
Serdes.Integer(),
Serdes.Long())
.withLoggingEnabled(topicConfig);
topology.addStateStore(storeBuilder, "processorName");
| Constructor and Description |
|---|
Stores() |
| Modifier and Type | Method and Description |
|---|---|
static KeyValueBytesStoreSupplier | inMemoryKeyValueStore(String name)Create an in-memory KeyValueBytesStoreSupplier. |
static SessionBytesStoreSupplier | inMemorySessionStore(String name, Duration retentionPeriod)Create an in-memory SessionBytesStoreSupplier. |
static WindowBytesStoreSupplier | inMemoryWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates)Create an in-memory WindowBytesStoreSupplier. |
static <K,V> StoreBuilder<KeyValueStore<K,V>> | keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a KeyValueStore. |
static KeyValueBytesStoreSupplier | lruMap(String name, int maxCacheSize)Create a LRU Map KeyValueBytesStoreSupplier. |
static KeyValueBytesStoreSupplier | persistentKeyValueStore(String name)Create a persistent KeyValueBytesStoreSupplier. |
static SessionBytesStoreSupplier | persistentSessionStore(String name, Duration retentionPeriod)Create a persistent SessionBytesStoreSupplier. |
static KeyValueBytesStoreSupplier | persistentTimestampedKeyValueStore(String name)Create a persistent KeyValueBytesStoreSupplier. |
static WindowBytesStoreSupplier | persistentTimestampedWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates)Create a persistent WindowBytesStoreSupplier. |
static VersionedBytesStoreSupplier | persistentVersionedKeyValueStore(String name, Duration historyRetention)Create a persistent versioned key-value store VersionedBytesStoreSupplier. |
static VersionedBytesStoreSupplier | persistentVersionedKeyValueStore(String name, Duration historyRetention, Duration segmentInterval)Create a persistent versioned key-value store VersionedBytesStoreSupplier. |
static WindowBytesStoreSupplier | persistentWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates)Create a persistent WindowBytesStoreSupplier. |
static <K,V> StoreBuilder<SessionStore<K,V>> | sessionStoreBuilder(SessionBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a SessionStore. |
static <K,V> StoreBuilder<TimestampedKeyValueStore<K,V>> | timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a TimestampedKeyValueStore. |
static <K,V> StoreBuilder<TimestampedWindowStore<K,V>> | timestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a TimestampedWindowStore. |
static <K,V> StoreBuilder<VersionedKeyValueStore<K,V>> | versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a VersionedKeyValueStore. |
static <K,V> StoreBuilder<WindowStore<K,V>> | windowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)Creates a StoreBuilder that can be used to build a WindowStore. |
public static KeyValueBytesStoreSupplier persistentKeyValueStore(String name)
KeyValueBytesStoreSupplier. This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde). If you want to create a TimestampedKeyValueStore or VersionedKeyValueStore you should use persistentTimestampedKeyValueStore(String) or persistentVersionedKeyValueStore(String, Duration), respectively, to create a store supplier instead.
name - name of the store (cannot be null)KeyValueBytesStoreSupplier that can be used to build a persistent key-value storepublic static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(String name)
KeyValueBytesStoreSupplier. This store supplier can be passed into a timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde). If you want to create a KeyValueStore or a VersionedKeyValueStore you should use persistentKeyValueStore(String) or persistentVersionedKeyValueStore(String, Duration), respectively, to create a store supplier instead.
name - name of the store (cannot be null)KeyValueBytesStoreSupplier that can be used to build a persistent key-(timestamp/value) storepublic static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name, Duration historyRetention)
VersionedBytesStoreSupplier. This store supplier can be passed into a versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde).
Note that it is not safe to change the value of historyRetention between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.
name - name of the store (cannot be null)historyRetention - length of time that old record versions are available for query (cannot be negative). If a timestamp bound provided to VersionedKeyValueStore.get(Object, long) is older than this specified history retention, then the get operation will not return data. This parameter also determines the "grace period" after which out-of-order writes will no longer be accepted.VersionedBytesStoreSupplierIllegalArgumentException - if historyRetention can't be represented as long millisecondspublic static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(String name, Duration historyRetention, Duration segmentInterval)
VersionedBytesStoreSupplier. This store supplier can be passed into a versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde).
Note that it is not safe to change the value of segmentInterval between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store otherwise.
name - name of the store (cannot be null)historyRetention - length of time that old record versions are available for query (cannot be negative). If a timestamp bound provided to VersionedKeyValueStore.get(Object, long) is older than this specified history retention, then the get operation will not return data. This parameter also determines the "grace period" after which out-of-order writes will no longer be accepted.segmentInterval - size of segments for storing old record versions (must be positive). Old record versions for the same key in a single segment are stored (updated and accessed) together. The only impact of this parameter is performance. If segments are large and a workload results in many record versions for the same key being collected in a single segment, performance may degrade as a result. On the other hand, historical reads (which access older segments) and out-of-order writes may slow down if there are too many segments.VersionedBytesStoreSupplierIllegalArgumentException - if historyRetention or segmentInterval can't be represented as long millisecondspublic static KeyValueBytesStoreSupplier inMemoryKeyValueStore(String name)
KeyValueBytesStoreSupplier. This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) or timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde).
name - name of the store (cannot be null)KeyValueBytesStoreSupplier than can be used to build an in-memory storepublic static KeyValueBytesStoreSupplier lruMap(String name, int maxCacheSize)
KeyValueBytesStoreSupplier. This store supplier can be passed into a keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) or timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde).
name - name of the store (cannot be null)maxCacheSize - maximum number of items in the LRU (cannot be negative)KeyValueBytesStoreSupplier that can be used to build an LRU Map based storeIllegalArgumentException - if maxCacheSize is negativepublic static WindowBytesStoreSupplier persistentWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier. This store supplier can be passed into a windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde). If you want to create a TimestampedWindowStore you should use persistentTimestampedWindowStore(String, Duration, Duration, boolean) to create a store supplier instead.
Note that it is not safe to change the value of retentionPeriod between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.
name - name of the store (cannot be null)retentionPeriod - length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize - size of the windows (cannot be negative)retainDuplicates - whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.WindowBytesStoreSupplierIllegalArgumentException - if retentionPeriod or windowSize can't be represented as long millisecondsIllegalArgumentException - if retentionPeriod is smaller than windowSizepublic static WindowBytesStoreSupplier persistentTimestampedWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier. This store supplier can be passed into a timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde). If you want to create a WindowStore you should use persistentWindowStore(String, Duration, Duration, boolean) to create a store supplier instead.
Note that it is not safe to change the value of retentionPeriod between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.
name - name of the store (cannot be null)retentionPeriod - length of time to retain data in the store (cannot be negative) (note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period)windowSize - size of the windows (cannot be negative)retainDuplicates - whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.WindowBytesStoreSupplierIllegalArgumentException - if retentionPeriod or windowSize can't be represented as long millisecondsIllegalArgumentException - if retentionPeriod is smaller than windowSizepublic static WindowBytesStoreSupplier inMemoryWindowStore(String name, Duration retentionPeriod, Duration windowSize, boolean retainDuplicates) throws IllegalArgumentException
WindowBytesStoreSupplier. This store supplier can be passed into a windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde) or timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde).
name - name of the store (cannot be null)retentionPeriod - length of time to retain data in the store (cannot be negative) Note that the retention period must be at least long enough to contain the windowed data's entire life cycle, from window-start through window-end, and for the entire grace period.windowSize - size of the windows (cannot be negative)retainDuplicates - whether or not to retain duplicates. Turning this on will automatically disable caching and means that null values will be ignored.WindowBytesStoreSupplierIllegalArgumentException - if retentionPeriod or windowSize can't be represented as long millisecondsIllegalArgumentException - if retentionPeriod is smaller than windowSizepublic static SessionBytesStoreSupplier persistentSessionStore(String name, Duration retentionPeriod)
SessionBytesStoreSupplier. Note that it is not safe to change the value of retentionPeriod between application restarts without clearing local state from application instances, as this may cause incorrect values to be read from the state store if it impacts the underlying storage format.
name - name of the store (cannot be null)retentionPeriod - length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)SessionBytesStoreSupplierpublic static SessionBytesStoreSupplier inMemorySessionStore(String name, Duration retentionPeriod)
SessionBytesStoreSupplier.name - name of the store (cannot be null)retentionPeriod - length of time to retain data in the store (cannot be negative) (note that the retention period must be at least as long enough to contain the inactivity gap of the session and the entire grace period.)SessionBytesStoreSupplierpublic static <K,V> StoreBuilder<KeyValueStore<K,V>> keyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a KeyValueStore. The provided supplier should not be a supplier for TimestampedKeyValueStores.
K - key typeV - value typesupplier - a KeyValueBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as deleteStoreBuilder that can build a KeyValueStorepublic static <K,V> StoreBuilder<TimestampedKeyValueStore<K,V>> timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a TimestampedKeyValueStore. The provided supplier should not be a supplier for KeyValueStores. For this case, passed in timestamps will be dropped and not stored in the key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.
K - key typeV - value typesupplier - a KeyValueBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as deleteStoreBuilder that can build a KeyValueStorepublic static <K,V> StoreBuilder<VersionedKeyValueStore<K,V>> versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a VersionedKeyValueStore.K - key typeV - value typesupplier - a VersionedBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as a deletionStoreBuilder that can build a VersionedKeyValueStorepublic static <K,V> StoreBuilder<WindowStore<K,V>> windowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a WindowStore. The provided supplier should not be a supplier for TimestampedWindowStores.
K - key typeV - value typesupplier - a WindowBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as deleteStoreBuilder than can build a WindowStorepublic static <K,V> StoreBuilder<TimestampedWindowStore<K,V>> timestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a TimestampedWindowStore. The provided supplier should not be a supplier for WindowStores. For this case, passed in timestamps will be dropped and not stored in the window-store. On read, no valid timestamp but a dummy timestamp will be returned.
K - key typeV - value typesupplier - a WindowBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as deleteStoreBuilder that can build a TimestampedWindowStorepublic static <K,V> StoreBuilder<SessionStore<K,V>> sessionStoreBuilder(SessionBytesStoreSupplier supplier, org.apache.kafka.common.serialization.Serde<K> keySerde, org.apache.kafka.common.serialization.Serde<V> valueSerde)
StoreBuilder that can be used to build a SessionStore.K - key typeV - value typesupplier - a SessionBytesStoreSupplier (cannot be null)keySerde - the key serde to usevalueSerde - the value serde to use; if the serialized bytes is null for put operations, it is treated as deleteStoreBuilder than can build a SessionStore