public interface ConnectedStoreProvider
StoreBuilder
s that will be automatically added to the topology and connected to the
associated processor.
Implementing this interface is recommended when the associated processor wants to encapsulate its usage of its state stores, rather than exposing them to the user building the topology.
In the event that separate but related processors may want to share the same store, different ConnectedStoreProvider
s
may provide the same instance of StoreBuilder
, as shown below.
class StateSharingProcessors {
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), Serdes.String(), Serdes.String());
class SupplierA implements ProcessorSupplier<String, Integer> {
Processor<String, Integer> get() {
return new Processor() {
private StateStore store;
void init(ProcessorContext context) {
this.store = context.getStateStore("myStore");
}
void process(String key, Integer value) {
// can access this.store
}
void close() {
// can access this.store
}
}
}
Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder);
}
}
class SupplierB implements ProcessorSupplier<String, String> {
Processor<String, String> get() {
return new Processor() {
private StateStore store;
void init(ProcessorContext context) {
this.store = context.getStateStore("myStore");
}
void process(String key, String value) {
// can access this.store
}
void close() {
// can access this.store
}
}
}
Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder);
}
}
}
Topology.addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
,
KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
,
KStream.process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
,
KStream.transform(TransformerSupplier, String...)
,
KStream.transform(TransformerSupplier, Named, String...)
,
KStream.transformValues(ValueTransformerSupplier, String...)
,
KStream.transformValues(ValueTransformerSupplier, Named, String...)
,
KStream.transformValues(ValueTransformerWithKeySupplier, String...)
,
KStream.transformValues(ValueTransformerWithKeySupplier, Named, String...)
,
KStream.flatTransform(TransformerSupplier, String...)
,
KStream.flatTransform(TransformerSupplier, Named, String...)
,
KStream.flatTransformValues(ValueTransformerSupplier, String...)
,
KStream.flatTransformValues(ValueTransformerSupplier, Named, String...)
,
KStream.flatTransformValues(ValueTransformerWithKeySupplier, String...)
,
KStream.flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
Modifier and Type | Method and Description |
---|---|
default Set<StoreBuilder<?>> |
stores() |
default Set<StoreBuilder<?>> stores()