public interface StateStore
If the store is implemented as a persistent store, it must use the store name as directory name and write
all data into this store directory.
The store directory must be created with the state directory.
The state directory can be obtained via #stateDir()
using the
ProcessorContext
provided via init(...)
.
Using nested store directories within the state directory isolates different state stores. If a state store would write into the state directory directly, it might conflict with others state stores and thus, data might get corrupted and/or Streams might fail with an error. Furthermore, Kafka Streams relies on using the store name as store directory name to perform internal cleanup tasks.
This interface does not specify any query capabilities, which, of course, would be query engine specific. Instead, it just specifies the minimum functionality required to reload a storage engine from its changelog as well as basic lifecycle management.
Modifier and Type | Method and Description |
---|---|
void |
close()
Close the storage engine.
|
void |
flush()
Flush any cached data
|
default Position |
getPosition()
Returns the position the state store is at with respect to the input topic/partitions
|
void |
init(ProcessorContext context,
StateStore root)
Deprecated.
Since 2.7.0. Callers should invoke
init(StateStoreContext, StateStore) instead.
Implementers may choose to implement this method for backward compatibility or to throw an
informative exception instead. |
default void |
init(StateStoreContext context,
StateStore root)
Initializes this state store.
|
boolean |
isOpen()
Is this store open for reading and writing
|
String |
name()
The name of this store.
|
boolean |
persistent()
Return if the storage is persistent or not.
|
default <R> QueryResult<R> |
query(Query<R> query,
PositionBound positionBound,
QueryConfig config)
Execute a query.
|
String name()
@Deprecated void init(ProcessorContext context, StateStore root)
init(StateStoreContext, StateStore)
instead.
Implementers may choose to implement this method for backward compatibility or to throw an
informative exception instead.
The implementation of this function must register the root store in the context via the
ProcessorContext.register(StateStore, StateRestoreCallback)
function,
where the first StateStore
parameter should always be the passed-in root
object, and
the second parameter should be an object of user's implementation
of the StateRestoreCallback
interface used for restoring the state store from the changelog.
Note that if the state store engine itself supports bulk writes, users can implement another
interface BatchingStateRestoreCallback
which extends StateRestoreCallback
to
let users implement bulk-load restoration logic instead of restoring one record at a time.
This method is not called if init(StateStoreContext, StateStore)
is implemented.
IllegalStateException
- If store gets registered after initialized is already finishedStreamsException
- if the store's change log does not contain the partitiondefault void init(StateStoreContext context, StateStore root)
The implementation of this function must register the root store in the context via the
StateStoreContext.register(StateStore, StateRestoreCallback, CommitCallback)
function, where the
first StateStore
parameter should always be the passed-in root
object, and
the second parameter should be an object of user's implementation
of the StateRestoreCallback
interface used for restoring the state store from the changelog.
Note that if the state store engine itself supports bulk writes, users can implement another
interface BatchingStateRestoreCallback
which extends StateRestoreCallback
to
let users implement bulk-load restoration logic instead of restoring one record at a time.
IllegalStateException
- If store gets registered after initialized is already finishedStreamsException
- if the store's change log does not contain the partitionvoid flush()
void close()
Users only need to implement this function but should NEVER need to call this api explicitly as it will be called by the library automatically when necessary
boolean persistent()
true
if the storage is persistent—false
otherwiseboolean isOpen()
true
if the store is open@InterfaceStability.Evolving default <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config)
If the store doesn't know how to handle the given query, the result
shall be a FailureReason.UNKNOWN_QUERY_TYPE
.
If the store couldn't satisfy the given position bound, the result
shall be a FailureReason.NOT_UP_TO_BOUND
.
Note to store implementers: if your store does not support position tracking,
you can correctly respond FailureReason.NOT_UP_TO_BOUND
if the argument is
anything but PositionBound.unbounded()
. Be sure to explain in the failure message
that bounded positions are not supported.
R
- The result typequery
- The query to executepositionBound
- The position the store must be at or pastconfig
- Per query configuration parameters, such as whether the store should collect detailed execution
info for the query@InterfaceStability.Evolving default Position getPosition()