public final class TaskAssignmentUtils extends Object
TaskAssignor
Modifier and Type | Class and Description |
---|---|
static interface |
TaskAssignmentUtils.MoveStandbyTaskPredicate |
static class |
TaskAssignmentUtils.RackAwareOptimizationParams
A simple config container for necessary parameters and optional overrides to apply when
running the active or standby task rack-aware optimizations.
|
Modifier and Type | Method and Description |
---|---|
static void |
defaultStandbyTaskAssignment(ApplicationState applicationState,
Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
Assign standby tasks to KafkaStreams clients according to the default logic.
|
static Map<ProcessId,KafkaStreamsAssignment> |
identityAssignment(ApplicationState applicationState)
Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
|
static void |
optimizeRackAwareActiveTasks(TaskAssignmentUtils.RackAwareOptimizationParams optimizationParams,
Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
Optimize active task assignment for rack awareness.
|
static void |
optimizeRackAwareStandbyTasks(TaskAssignmentUtils.RackAwareOptimizationParams optimizationParams,
Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
Optimize standby task assignment for rack awareness.
|
static TaskAssignor.AssignmentError |
validateTaskAssignment(ApplicationState applicationState,
TaskAssignor.TaskAssignment taskAssignment)
Validate the passed-in
TaskAssignor.TaskAssignment and return an TaskAssignor.AssignmentError representing the
first error detected in the assignment, or TaskAssignor.AssignmentError.NONE if the assignment passes the
verification check. |
public static TaskAssignor.AssignmentError validateTaskAssignment(ApplicationState applicationState, TaskAssignor.TaskAssignment taskAssignment)
TaskAssignor.TaskAssignment
and return an TaskAssignor.AssignmentError
representing the
first error detected in the assignment, or TaskAssignor.AssignmentError.NONE
if the assignment passes the
verification check.
Note: this verification is performed automatically by the StreamsPartitionAssignor on the assignment
returned by the TaskAssignor, and the error returned to the assignor via the TaskAssignor.onAssignmentComputed(org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupAssignment, org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription, org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError)
callback. Therefore, it is not required to call this manually from the TaskAssignor.assign(org.apache.kafka.streams.processor.assignment.ApplicationState)
method.
However, if an invalid assignment is returned it will fail the rebalance and kill the thread, so it may be useful to
utilize this method in an assignor to verify the assignment before returning it and fix any errors it finds.
applicationState
- The application for which this task assignment is being assessed.taskAssignment
- The task assignment that will be validated.AssignmentError.NONE
if the assignment created for this application is valid,
or another AssignmentError
otherwise.public static Map<ProcessId,KafkaStreamsAssignment> identityAssignment(ApplicationState applicationState)
applicationState
- the metadata and other info describing the current application statepublic static void defaultStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
If rack-aware client tags are configured, the rack-aware standby task assignor will be used
applicationState
- the metadata and other info describing the current application statekafkaStreamsAssignments
- the KafkaStreams client assignments to add standby tasks topublic static void optimizeRackAwareActiveTasks(TaskAssignmentUtils.RackAwareOptimizationParams optimizationParams, Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
trafficCost
and nonOverlapCost
configs which balance cross rack traffic minimization and task movement.
Setting trafficCost
to a larger number reduces the overall cross rack traffic of the resulting
assignment, but can increase the number of tasks shuffled around between clients.
Setting nonOverlapCost
to a larger number increases the affinity of tasks to their intended client
and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
cross-rack traffic.
In an extreme case, if we set nonOverlapCost
to 0 and @{code trafficCost} to a positive value,
the resulting assignment will have an absolute minimum of cross rack traffic. If we set trafficCost
to 0,
and nonOverlapCost
to a positive value, the resulting assignment will be identical to the input assignment.
Note: this method will modify the input KafkaStreamsAssignment
objects and return the same map.
It does not make a copy of the map or the KafkaStreamsAssignment objects.
This method optimizes cross-rack traffic for active tasks only. For standby task optimization,
use optimizeRackAwareStandbyTasks(org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams, java.util.Map<org.apache.kafka.streams.processor.assignment.ProcessId, org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment>)
.
It is recommended to run this optimization before assigning any standby tasks, especially if you have configured your KafkaStreams clients with assignment tags via the rack.aware.assignment.tags config since this method may shuffle around active tasks without considering the client tags and can result in a violation of the original client tag assignment's constraints.
optimizationParams
- optional configuration parameters to applykafkaStreamsAssignments
- the current assignment of tasks to KafkaStreams clientspublic static void optimizeRackAwareStandbyTasks(TaskAssignmentUtils.RackAwareOptimizationParams optimizationParams, Map<ProcessId,KafkaStreamsAssignment> kafkaStreamsAssignments)
trafficCost
and nonOverlapCost
configs which balance cross rack traffic minimization and task movement.
Setting trafficCost
to a larger number reduces the overall cross rack traffic of the resulting
assignment, but can increase the number of tasks shuffled around between clients.
Setting nonOverlapCost
to a larger number increases the affinity of tasks to their intended client
and reduces the amount by which the rack-aware optimization can shuffle tasks around, at the cost of higher
cross-rack traffic.
In an extreme case, if we set nonOverlapCost
to 0 and @{code trafficCost} to a positive value,
the resulting assignment will have an absolute minimum of cross rack traffic. If we set trafficCost
to 0,
and nonOverlapCost
to a positive value, the resulting assignment will be identical to the input assignment.
Note: this method will modify the input KafkaStreamsAssignment
objects and return the same map.
It does not make a copy of the map or the KafkaStreamsAssignment objects.
This method optimizes cross-rack traffic for standby tasks only. For active task optimization,
use optimizeRackAwareActiveTasks(org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams, java.util.Map<org.apache.kafka.streams.processor.assignment.ProcessId, org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment>)
.
optimizationParams
- optional configuration parameters to applykafkaStreamsAssignments
- the current assignment of tasks to KafkaStreams clients