K
- Type of keysV
- Type of valuespublic interface BranchedKStream<K,V>
Branches are defined with branch(Predicate, Branched)
or
defaultBranch(Branched)
methods. Each record is evaluated against the predicate
supplied via Branched
parameters, and is routed to the first branch for which its respective predicate
evaluates to true
. If a record does not match any predicates, it will be routed to the default branch,
or dropped if no default branch is created.
Each branch (which is a KStream
instance) then can be processed either by
a Function
or a Consumer
provided via a Branched
parameter. If certain conditions are met, it also can be accessed from the Map
returned by an optional
defaultBranch(Branched)
or noDefaultBranch()
method call
(see usage examples).
The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
stream for the first predicate that evaluates to true
, and is assigned to this stream only. If you need
to route a record to multiple streams, you can apply multiple KStream.filter(Predicate)
operators
to the same KStream
instance, one for each predicate, instead of branching.
The process of routing the records to different branches is a stateless record-by-record operation.
Map<String, KStream<K, V>>
entries returned by defaultBranch(Branched)
or
noDefaultBranch()
are defined by the following rules:
Named
parameter was provided for KStream.split(Named)
, its value is used as
a prefix for each key. By default, no prefix is used
branch(Predicate, Branched)
via the
Branched
parameter, its value is appended to the prefix to form the Map
key
prefix + position
of the branch
as a decimal number, starting from "1"
defaultBranch()
, then the key defaults
to prefix + "0"
Map<Stream, KStream<K, V>>
entries are formed as following:
branch(Predicate, Branched)
via
the Branched
parameter, then the branch itself is added to the Map
null
for a given branch, then no entry is added to the map
Map<String, KStream<..., ...>> result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.withConsumer(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.withFunction(ks->null)) // no entry: chain function returns null
.branch(predicate4, Branched.withFunction(ks->ks)) // "foo-4": chain function returns non-null value
.branch(predicate5) // "foo-5": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch
Branched
parameter:
source.split()
.branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
.branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));
defaultBranch()
or noDefaultBranch()
methods provides
access to all the branches in the same scope:
Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
.branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
.defaultBranch(Branched.as("non-null"));
KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));
KStream
Modifier and Type | Method and Description |
---|---|
BranchedKStream<K,V> |
branch(Predicate<? super K,? super V> predicate)
Define a branch for records that match the predicate.
|
BranchedKStream<K,V> |
branch(Predicate<? super K,? super V> predicate,
Branched<K,V> branched)
Define a branch for records that match the predicate.
|
Map<String,KStream<K,V>> |
defaultBranch()
Finalize the construction of branches and defines the default branch for the messages not intercepted
by other branches.
|
Map<String,KStream<K,V>> |
defaultBranch(Branched<K,V> branched)
Finalize the construction of branches and defines the default branch for the messages not intercepted
by other branches.
|
Map<String,KStream<K,V>> |
noDefaultBranch()
Finalize the construction of branches without forming a default branch.
|
BranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate)
predicate
- A Predicate
instance, against which each record will be evaluated.
If this predicate returns true
for a given record, the record will be
routed to the current branch and will not be evaluated against the predicates
for the remaining branches.this
to facilitate method chainingBranchedKStream<K,V> branch(Predicate<? super K,? super V> predicate, Branched<K,V> branched)
predicate
- A Predicate
instance, against which each record will be evaluated.
If this predicate returns true
for a given record, the record will be
routed to the current branch and will not be evaluated against the predicates
for the remaining branches.branched
- A Branched
parameter, that allows to define a branch name, an in-place
branch consumer or branch mapper (see code examples
for BranchedKStream
)this
to facilitate method chainingMap<String,KStream<K,V>> defaultBranch()
defaultBranch
or noDefaultBranch()
is optional.Map
of named branches. For rules of forming the resulting map, see BranchedKStream
description.Map<String,KStream<K,V>> defaultBranch(Branched<K,V> branched)
defaultBranch
or noDefaultBranch()
is optional.branched
- A Branched
parameter, that allows to define a branch name, an in-place
branch consumer or branch mapper (see code examples
for BranchedKStream
)Map
of named branches. For rules of forming the resulting map, see BranchedKStream
description.Map<String,KStream<K,V>> noDefaultBranch()
#noDefaultBranch()
or defaultBranch()
is optional.Map
of named branches. For rules of forming the resulting map, see BranchedKStream
description.