public class StreamsConfig
extends org.apache.kafka.common.config.AbstractConfig
KafkaStreams
instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer
, KafkaProducer
and Admin
.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String)
, producerPrefix(String)
and adminClientPrefix(String)
, respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG
to be more resilient to non-available brokers you should also
increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
using the following guidance:
max.poll.interval.ms > max.block.msKafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"group.id"
(<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit"
(false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy"
(StreamsPartitionAssignor
) - Streams client will always use its own partition assignor"processing.guarantee"
is set to "exactly_once_v2"
,
"exactly_once"
(deprecated), or "exactly_once_beta"
(deprecated), Kafka Streams does not
allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level"
(read_committed) - Consumers will always read committed data only"enable.idempotence"
(true) - Producer will always have idempotency enabledKafkaStreams(org.apache.kafka.streams.Topology, Properties)
,
ConsumerConfig
,
ProducerConfig
Modifier and Type | Class and Description |
---|---|
static class |
StreamsConfig.InternalConfig |
Modifier and Type | Field and Description |
---|---|
static String |
ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag |
static String |
ADMIN_CLIENT_PREFIX
Prefix used to isolate
admin configs from other client configs. |
static String |
APPLICATION_ID_CONFIG
application.id |
static String |
APPLICATION_SERVER_CONFIG
application.server |
static String |
AT_LEAST_ONCE
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static String |
AUTO_INCLUDE_JMX_REPORTER_CONFIG
Deprecated.
and will be removed in 4.0.0
|
static String |
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers |
static String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition |
static String |
BUFFERED_RECORDS_PER_PARTITION_DOC |
static String |
BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version |
static String |
CACHE_MAX_BYTES_BUFFERING_CONFIG
Deprecated.
since 3.4.0 Use
"statestore.cache.max.bytes" instead. |
static String |
CACHE_MAX_BYTES_BUFFERING_DOC |
static String |
CLIENT_ID_CONFIG
client.id |
static String |
CLIENT_TAG_PREFIX
Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
|
static String |
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms |
static String |
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms |
static String |
CONSUMER_PREFIX
Prefix used to isolate
consumer configs from other client configs. |
static String |
DEFAULT_CLIENT_SUPPLIER_CONFIG
default.client.supplier |
static String |
DEFAULT_CLIENT_SUPPLIER_DOC |
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler |
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC |
static String |
DEFAULT_DSL_STORE
Deprecated.
|
static String |
DEFAULT_DSL_STORE_CONFIG
Deprecated.
|
static String |
DEFAULT_DSL_STORE_DOC
Deprecated.
|
static String |
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde |
static String |
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC |
static String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde |
static String |
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
Deprecated.
since 3.0.0 Use
"windowed.inner.class.serde" instead. |
static String |
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
Deprecated.
since 3.0.0 Use
"windowed.inner.class.serde" instead. |
static String |
DSL_STORE_SUPPLIERS_CLASS_CONFIG
dsl.store.suppliers.class |
static int |
DUMMY_THREAD_INDEX |
static String |
ENABLE_METRICS_PUSH_CONFIG
enable.metrics.push |
static String |
ENABLE_METRICS_PUSH_DOC |
static String |
EXACTLY_ONCE
Deprecated.
since 3.0.0, will be removed in 4.0. Use
"exactly_once_v2" instead. |
static String |
EXACTLY_ONCE_BETA
Deprecated.
since 3.0.0, will be removed in 4.0. Use
"exactly_once_v2" instead. |
static String |
EXACTLY_ONCE_V2
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static String |
GLOBAL_CONSUMER_PREFIX
Prefix used to override
consumer configs for the global consumer client from
the general consumer client configs. |
static String |
IN_MEMORY
Deprecated.
|
static String |
MAIN_CONSUMER_PREFIX
Prefix used to override
consumer configs for the main consumer client from
the general consumer client configs. |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH |
static String |
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms |
static long |
MAX_TASK_IDLE_MS_DISABLED |
static String |
MAX_TASK_IDLE_MS_DOC |
static String |
MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas |
static String |
MERGE_REPARTITION_TOPICS
Config value for parameter
"topology.optimization"
for enabling the specific optimization that merges duplicated repartition topics. |
static String |
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms |
static String |
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters |
static String |
METRICS_LATEST
Config value for parameter
"built.in.metrics.version" for the latest built-in metrics version. |
static String |
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples |
static String |
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level |
static String |
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms |
static String |
NO_OPTIMIZATION
Config value for parameter
"topology.optimization" for disabling topology optimization |
static String |
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas |
static String |
NUM_STREAM_THREADS_CONFIG
num.stream.threads |
static String |
OPTIMIZE
Config value for parameter
"topology.optimization" for enabling topology optimization |
static String |
POLL_MS_CONFIG
poll.ms |
static String |
PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms |
static String |
PROCESSING_GUARANTEE_CONFIG
processing.guarantee |
static String |
PRODUCER_PREFIX
Prefix used to isolate
producer configs from other client configs. |
static String |
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG |
static String |
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC |
static String |
RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY |
static String |
RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG
rack.aware.assignment.strategy
|
static String |
RACK_AWARE_ASSIGNMENT_STRATEGY_DOC |
static String |
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC |
static String |
RACK_AWARE_ASSIGNMENT_STRATEGY_NONE |
static String |
RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
rack.aware.assignment.tags |
static String |
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG |
static String |
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC |
static String |
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes |
static String |
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max |
static String |
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms |
static String |
REPARTITION_PURGE_INTERVAL_MS_CONFIG
repartition.purge.interval.ms |
static String |
REPLICATION_FACTOR_CONFIG
replication.factor |
static String |
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms |
static String |
RESTORE_CONSUMER_PREFIX
Prefix used to override
consumer configs for the restore consumer client from
the general consumer client configs. |
static String |
RETRIES_CONFIG
Deprecated.
since 2.7
|
static String |
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms |
static String |
REUSE_KTABLE_SOURCE_TOPICS
Config value for parameter
"topology.optimization"
for enabling the specific optimization that reuses source topic as changelog topic
for KTables. |
static String |
ROCKS_DB
Deprecated.
|
static String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter |
static String |
SECURITY_PROTOCOL_CONFIG
security.protocol |
static String |
SEND_BUFFER_CONFIG
send.buffer.bytes |
static String |
SINGLE_STORE_SELF_JOIN
Config value for parameter
"topology.optimization"
for enabling the optimization that optimizes inner stream-stream joins into self-joins when
both arguments are the same stream. |
static String |
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay |
static String |
STATE_DIR_CONFIG
state.dir |
static String |
STATESTORE_CACHE_MAX_BYTES_CONFIG
statestore.cache.max.bytes |
static String |
STATESTORE_CACHE_MAX_BYTES_DOC |
static String |
TASK_ASSIGNOR_CLASS_CONFIG |
static String |
TASK_TIMEOUT_MS_CONFIG
task.timeout.ms |
static String |
TASK_TIMEOUT_MS_DOC |
static String |
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static String |
TOPOLOGY_OPTIMIZATION
Deprecated.
since 2.7; use
TOPOLOGY_OPTIMIZATION_CONFIG instead |
static String |
TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization |
static String |
UPGRADE_FROM_0100
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.0.x . |
static String |
UPGRADE_FROM_0101
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.1.x . |
static String |
UPGRADE_FROM_0102
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.2.x . |
static String |
UPGRADE_FROM_0110
Config value for parameter
"upgrade.from" for upgrading an application from version 0.11.0.x . |
static String |
UPGRADE_FROM_10
Config value for parameter
"upgrade.from" for upgrading an application from version 1.0.x . |
static String |
UPGRADE_FROM_11
Config value for parameter
"upgrade.from" for upgrading an application from version 1.1.x . |
static String |
UPGRADE_FROM_20
Config value for parameter
"upgrade.from" for upgrading an application from version 2.0.x . |
static String |
UPGRADE_FROM_21
Config value for parameter
"upgrade.from" for upgrading an application from version 2.1.x . |
static String |
UPGRADE_FROM_22
Config value for parameter
"upgrade.from" for upgrading an application from version 2.2.x . |
static String |
UPGRADE_FROM_23
Config value for parameter
"upgrade.from" for upgrading an application from version 2.3.x . |
static String |
UPGRADE_FROM_24
Config value for parameter
"upgrade.from" for upgrading an application from version 2.4.x . |
static String |
UPGRADE_FROM_25
Config value for parameter
"upgrade.from" for upgrading an application from version 2.5.x . |
static String |
UPGRADE_FROM_26
Config value for parameter
"upgrade.from" for upgrading an application from version 2.6.x . |
static String |
UPGRADE_FROM_27
Config value for parameter
"upgrade.from" for upgrading an application from version 2.7.x . |
static String |
UPGRADE_FROM_28
Config value for parameter
"upgrade.from" for upgrading an application from version 2.8.x . |
static String |
UPGRADE_FROM_30
Config value for parameter
"upgrade.from" for upgrading an application from version 3.0.x . |
static String |
UPGRADE_FROM_31
Config value for parameter
"upgrade.from" for upgrading an application from version 3.1.x . |
static String |
UPGRADE_FROM_32
Config value for parameter
"upgrade.from" for upgrading an application from version 3.2.x . |
static String |
UPGRADE_FROM_33
Config value for parameter
"upgrade.from" for upgrading an application from version 3.3.x . |
static String |
UPGRADE_FROM_34
Config value for parameter
"upgrade.from" for upgrading an application from version 3.4.x . |
static String |
UPGRADE_FROM_35
Config value for parameter
"upgrade.from" for upgrading an application from version 3.5.x . |
static String |
UPGRADE_FROM_36
Config value for parameter
"upgrade.from" for upgrading an application from version 3.6.x . |
static String |
UPGRADE_FROM_37
Config value for parameter
"upgrade.from" for upgrading an application from version 3.7.x . |
static String |
UPGRADE_FROM_CONFIG
upgrade.from |
static String |
WINDOW_SIZE_MS_CONFIG
window.size.ms |
static String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms |
static String |
WINDOWED_INNER_CLASS_SERDE |
Constructor and Description |
---|
StreamsConfig(Map<?,?> props)
Create a new
StreamsConfig using the given properties. |
Modifier and Type | Method and Description |
---|---|
static String |
adminClientPrefix(String adminClientProp)
Prefix a property with
ADMIN_CLIENT_PREFIX . |
static String |
clientTagPrefix(String clientTagKey)
Prefix a client tag key with
CLIENT_TAG_PREFIX . |
static org.apache.kafka.common.config.ConfigDef |
configDef()
Return a copy of the config definition.
|
static String |
consumerPrefix(String consumerProp)
Prefix a property with
CONSUMER_PREFIX . |
DeserializationExceptionHandler |
defaultDeserializationExceptionHandler() |
org.apache.kafka.common.serialization.Serde<?> |
defaultKeySerde()
Return an
configured instance of key Serde
class . |
ProductionExceptionHandler |
defaultProductionExceptionHandler() |
TimestampExtractor |
defaultTimestampExtractor() |
org.apache.kafka.common.serialization.Serde<?> |
defaultValueSerde()
Return an
configured instance of value
Serde class . |
Map<String,Object> |
getAdminConfigs(String clientId)
Get the configs for the
admin client . |
Map<String,String> |
getClientTags()
Get the configured client tags set with
CLIENT_TAG_PREFIX prefix. |
Map<String,Object> |
getGlobalConsumerConfigs(String clientId)
Get the configs for the
global consumer . |
KafkaClientSupplier |
getKafkaClientSupplier()
Return configured KafkaClientSupplier
|
Map<String,Object> |
getMainConsumerConfigs(String groupId,
String clientId,
int threadIdx)
Get the configs to the
main consumer . |
Map<String,Object> |
getProducerConfigs(String clientId)
Get the configs for the
producer . |
Map<String,Object> |
getRestoreConsumerConfigs(String clientId)
Get the configs for the
restore-consumer . |
static String |
globalConsumerPrefix(String consumerProp)
Prefix a property with
GLOBAL_CONSUMER_PREFIX . |
static void |
main(String[] args) |
static String |
mainConsumerPrefix(String consumerProp)
Prefix a property with
MAIN_CONSUMER_PREFIX . |
static String |
producerPrefix(String producerProp)
Prefix a property with
PRODUCER_PREFIX . |
static String |
restoreConsumerPrefix(String consumerProp)
Prefix a property with
RESTORE_CONSUMER_PREFIX . |
static String |
topicPrefix(String topicProp)
Prefix a property with
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics. |
static Set<String> |
verifyTopologyOptimizationConfigs(String config) |
documentationOf, equals, getBoolean, getClass, getConfiguredInstance, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, nonInternalValues, originals, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
public static final int DUMMY_THREAD_INDEX
public static final long MAX_TASK_IDLE_MS_DISABLED
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
public static final String TOPIC_PREFIX
TopicConfig
.
It is recommended to use topicPrefix(String)
.public static final String CONSUMER_PREFIX
consumer
configs from other client configs.
It is recommended to use consumerPrefix(String)
to add this prefix to consumer
properties
.public static final String MAIN_CONSUMER_PREFIX
consumer
configs for the main consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. main.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String RESTORE_CONSUMER_PREFIX
consumer
configs for the restore consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. restore.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String GLOBAL_CONSUMER_PREFIX
consumer
configs for the global consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. global.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String PRODUCER_PREFIX
producer
configs from other client configs.
It is recommended to use producerPrefix(String)
to add this prefix to producer
properties
.public static final String ADMIN_CLIENT_PREFIX
admin
configs from other client configs.
It is recommended to use adminClientPrefix(String)
to add this prefix to admin
client properties
.public static final String CLIENT_TAG_PREFIX
public static final String TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization
public static final String NO_OPTIMIZATION
"topology.optimization"
for disabling topology optimizationpublic static final String OPTIMIZE
"topology.optimization"
for enabling topology optimizationpublic static final String REUSE_KTABLE_SOURCE_TOPICS
"topology.optimization"
for enabling the specific optimization that reuses source topic as changelog topic
for KTables.public static final String MERGE_REPARTITION_TOPICS
"topology.optimization"
for enabling the specific optimization that merges duplicated repartition topics.public static final String SINGLE_STORE_SELF_JOIN
"topology.optimization"
for enabling the optimization that optimizes inner stream-stream joins into self-joins when
both arguments are the same stream.public static final String UPGRADE_FROM_0100
"upgrade.from"
for upgrading an application from version 0.10.0.x
.public static final String UPGRADE_FROM_0101
"upgrade.from"
for upgrading an application from version 0.10.1.x
.public static final String UPGRADE_FROM_0102
"upgrade.from"
for upgrading an application from version 0.10.2.x
.public static final String UPGRADE_FROM_0110
"upgrade.from"
for upgrading an application from version 0.11.0.x
.public static final String UPGRADE_FROM_10
"upgrade.from"
for upgrading an application from version 1.0.x
.public static final String UPGRADE_FROM_11
"upgrade.from"
for upgrading an application from version 1.1.x
.public static final String UPGRADE_FROM_20
"upgrade.from"
for upgrading an application from version 2.0.x
.public static final String UPGRADE_FROM_21
"upgrade.from"
for upgrading an application from version 2.1.x
.public static final String UPGRADE_FROM_22
"upgrade.from"
for upgrading an application from version 2.2.x
.public static final String UPGRADE_FROM_23
"upgrade.from"
for upgrading an application from version 2.3.x
.public static final String UPGRADE_FROM_24
"upgrade.from"
for upgrading an application from version 2.4.x
.public static final String UPGRADE_FROM_25
"upgrade.from"
for upgrading an application from version 2.5.x
.public static final String UPGRADE_FROM_26
"upgrade.from"
for upgrading an application from version 2.6.x
.public static final String UPGRADE_FROM_27
"upgrade.from"
for upgrading an application from version 2.7.x
.public static final String UPGRADE_FROM_28
"upgrade.from"
for upgrading an application from version 2.8.x
.public static final String UPGRADE_FROM_30
"upgrade.from"
for upgrading an application from version 3.0.x
.public static final String UPGRADE_FROM_31
"upgrade.from"
for upgrading an application from version 3.1.x
.public static final String UPGRADE_FROM_32
"upgrade.from"
for upgrading an application from version 3.2.x
.public static final String UPGRADE_FROM_33
"upgrade.from"
for upgrading an application from version 3.3.x
.public static final String UPGRADE_FROM_34
"upgrade.from"
for upgrading an application from version 3.4.x
.public static final String UPGRADE_FROM_35
"upgrade.from"
for upgrading an application from version 3.5.x
.public static final String UPGRADE_FROM_36
"upgrade.from"
for upgrading an application from version 3.6.x
.public static final String UPGRADE_FROM_37
"upgrade.from"
for upgrading an application from version 3.7.x
.public static final String AT_LEAST_ONCE
"processing.guarantee"
for at-least-once processing guarantees.@Deprecated public static final String EXACTLY_ONCE
"exactly_once_v2"
instead."processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once processing semantics requires broker version 0.11.0 or higher.
If you enable this feature Kafka Streams will use more resources (like broker connections)
compared to "at_least_once"
and "exactly_once_v2"
.
@Deprecated public static final String EXACTLY_ONCE_BETA
"exactly_once_v2"
instead."processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once (beta) requires broker version 2.5 or higher.
If you enable this feature Kafka Streams will use fewer resources (like broker connections)
compared to the EXACTLY_ONCE
(deprecated) case.
public static final String EXACTLY_ONCE_V2
"processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once-v2 requires broker version 2.5 or higher.
public static final String METRICS_LATEST
"built.in.metrics.version"
for the latest built-in metrics version.public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag
public static final String APPLICATION_ID_CONFIG
application.id
public static final String APPLICATION_SERVER_CONFIG
application.server
public static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
public static final String BUFFERED_RECORDS_PER_PARTITION_DOC
public static final String BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version
@Deprecated public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
"statestore.cache.max.bytes"
instead.cache.max.bytes.buffering
public static final String CACHE_MAX_BYTES_BUFFERING_DOC
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG
statestore.cache.max.bytes
public static final String STATESTORE_CACHE_MAX_BYTES_DOC
public static final String CLIENT_ID_CONFIG
client.id
public static final String ENABLE_METRICS_PUSH_CONFIG
enable.metrics.push
public static final String ENABLE_METRICS_PUSH_DOC
public static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG
repartition.purge.interval.ms
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler
@Deprecated public static final String DEFAULT_DSL_STORE_CONFIG
default.dsl.store
@Deprecated public static final String DEFAULT_DSL_STORE_DOC
@Deprecated public static final String ROCKS_DB
@Deprecated public static final String IN_MEMORY
@Deprecated public static final String DEFAULT_DSL_STORE
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG
dsl.store.suppliers.class
@Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
"windowed.inner.class.serde"
instead.default.windowed.key.serde.inner
@Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
"windowed.inner.class.serde"
instead.default.windowed.value.serde.inner
public static final String WINDOWED_INNER_CLASS_SERDE
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC
public static final String MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms
public static final String MAX_TASK_IDLE_MS_DOC
public static final String MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas
public static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
public static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
public static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
public static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
@Deprecated public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG
auto.include.jmx.reporter
public static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
public static final String NUM_STREAM_THREADS_CONFIG
num.stream.threads
public static final String POLL_MS_CONFIG
poll.ms
public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms
public static final String PROCESSING_GUARANTEE_CONFIG
processing.guarantee
public static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
rack.aware.assignment.tags
public static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
public static final String REPLICATION_FACTOR_CONFIG
replication.factor
public static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
@Deprecated public static final String RETRIES_CONFIG
retries
This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
public static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
public static final String SECURITY_PROTOCOL_CONFIG
security.protocol
public static final String SEND_BUFFER_CONFIG
send.buffer.bytes
public static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
public static final String STATE_DIR_CONFIG
state.dir
public static final String TASK_TIMEOUT_MS_CONFIG
task.timeout.ms
public static final String TASK_TIMEOUT_MS_DOC
public static final String WINDOW_SIZE_MS_CONFIG
window.size.ms
public static final String UPGRADE_FROM_CONFIG
upgrade.from
public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG
default.client.supplier
public static final String DEFAULT_CLIENT_SUPPLIER_DOC
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC
public static final String TASK_ASSIGNOR_CLASS_CONFIG
@Deprecated public static final String TOPOLOGY_OPTIMIZATION
TOPOLOGY_OPTIMIZATION_CONFIG
insteadtopology.optimization
public StreamsConfig(Map<?,?> props)
StreamsConfig
using the given properties.props
- properties that specify Kafka Streams and internal consumer/producer configurationpublic static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX
. This is used to isolate consumer configs
from other client configs.consumerProp
- the consumer property to be maskedCONSUMER_PREFIX
+ consumerProp
public static String mainConsumerPrefix(String consumerProp)
MAIN_CONSUMER_PREFIX
. This is used to isolate main consumer configs
from other client configs.consumerProp
- the consumer property to be maskedMAIN_CONSUMER_PREFIX
+ consumerProp
public static String restoreConsumerPrefix(String consumerProp)
RESTORE_CONSUMER_PREFIX
. This is used to isolate restore consumer configs
from other client configs.consumerProp
- the consumer property to be maskedRESTORE_CONSUMER_PREFIX
+ consumerProp
public static String clientTagPrefix(String clientTagKey)
CLIENT_TAG_PREFIX
.clientTagKey
- client tag keyCLIENT_TAG_PREFIX
+ clientTagKey
public static String globalConsumerPrefix(String consumerProp)
GLOBAL_CONSUMER_PREFIX
. This is used to isolate global consumer configs
from other client configs.consumerProp
- the consumer property to be maskedGLOBAL_CONSUMER_PREFIX
+ consumerProp
public static String producerPrefix(String producerProp)
PRODUCER_PREFIX
. This is used to isolate producer configs
from other client configs.producerProp
- the producer property to be maskedproducerProp
public static String adminClientPrefix(String adminClientProp)
ADMIN_CLIENT_PREFIX
. This is used to isolate admin configs
from other client configs.adminClientProp
- the admin client property to be maskedadminClientProp
public static String topicPrefix(String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp
- the topic property to be maskedtopicProp
public static org.apache.kafka.common.config.ConfigDef configDef()
public Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
main consumer
.
Properties using the prefix MAIN_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in MAIN_CONSUMER_PREFIX
)
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by MAIN_CONSUMER_PREFIX
, main consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.groupId
- consumer groupIdclientId
- clientIdthreadIdx
- stream thread indexpublic Map<String,Object> getRestoreConsumerConfigs(String clientId)
restore-consumer
.
Properties using the prefix RESTORE_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in RESTORE_CONSUMER_PREFIX
)
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by RESTORE_CONSUMER_PREFIX
, restore consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.clientId
- clientIdpublic Map<String,Object> getGlobalConsumerConfigs(String clientId)
global consumer
.
Properties using the prefix GLOBAL_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in GLOBAL_CONSUMER_PREFIX
)
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by GLOBAL_CONSUMER_PREFIX
, global consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.clientId
- clientIdpublic Map<String,Object> getProducerConfigs(String clientId)
producer
.
Properties using the prefix PRODUCER_PREFIX
will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId
- clientIdpublic Map<String,Object> getAdminConfigs(String clientId)
admin client
.clientId
- clientIdpublic Map<String,String> getClientTags()
CLIENT_TAG_PREFIX
prefix.public static Set<String> verifyTopologyOptimizationConfigs(String config)
public KafkaClientSupplier getKafkaClientSupplier()
public org.apache.kafka.common.serialization.Serde<?> defaultKeySerde()
configured
instance of key Serde
class
.public org.apache.kafka.common.serialization.Serde<?> defaultValueSerde()
configured
instance of value
Serde class
.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(String[] args)