public class StreamsConfig
extends org.apache.kafka.common.config.AbstractConfig
KafkaStreams
instance.
Can also be used to configure the Kafka Streams internal KafkaConsumer
, KafkaProducer
and Admin
.
To avoid consumer/producer/admin property conflicts, you should prefix those properties using
consumerPrefix(String)
, producerPrefix(String)
and adminClientPrefix(String)
, respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class).
The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name.
* Example:
Properties streamsProperties = new Properties();
// sets "my.custom.config" to "foo" for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
// sets "my.custom.config" to "bar" for producer only
streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
// sets "my.custom.config2" to "boom" for all clients universally
streamsProperties.put("my.custom.config2", "boom");
// as a result, inside producer's serde class configure(..) function,
// users can now read both key-value pairs "my.custom.config" -> "foo"
// and "my.custom.config2" -> "boom" from the config map
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG
to be more resilient to non-available brokers you should also
increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
using the following guidance:
max.poll.interval.ms > max.block.msKafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
"group.id"
(<application.id>) - Streams client will always use the application ID a consumer group ID"enable.auto.commit"
(false) - Streams client will always disable/turn off auto committing"partition.assignment.strategy"
(StreamsPartitionAssignor
) - Streams client will always use its own partition assignor"processing.guarantee"
is set to "exactly_once_v2"
,
"exactly_once"
(deprecated), or "exactly_once_beta"
(deprecated), Kafka Streams does not
allow users to overwrite the following properties (Streams setting shown in parentheses):
"isolation.level"
(read_committed) - Consumers will always read committed data only"enable.idempotence"
(true) - Producer will always have idempotency enabledKafkaStreams(org.apache.kafka.streams.Topology, Properties)
,
ConsumerConfig
,
ProducerConfig
Modifier and Type | Class and Description |
---|---|
static class |
StreamsConfig.InternalConfig |
Modifier and Type | Field and Description |
---|---|
static String |
ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag |
static String |
ADMIN_CLIENT_PREFIX
Prefix used to isolate
admin configs from other client configs. |
static String |
APPLICATION_ID_CONFIG
application.id |
static String |
APPLICATION_SERVER_CONFIG
application.server |
static String |
AT_LEAST_ONCE
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static String |
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers |
static String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition |
static String |
BUFFERED_RECORDS_PER_PARTITION_DOC |
static String |
BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version |
static String |
CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering |
static String |
CACHE_MAX_BYTES_BUFFERING_DOC |
static String |
CLIENT_ID_CONFIG
client.id |
static String |
CLIENT_TAG_PREFIX
Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
|
static String |
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms |
static String |
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms |
static String |
CONSUMER_PREFIX
Prefix used to isolate
consumer configs from other client configs. |
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler |
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC |
static String |
DEFAULT_DSL_STORE_CONFIG
default.dsl.store |
static String |
DEFAULT_DSL_STORE_DOC |
static String |
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde |
static String |
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC |
static String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde |
static String |
DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
Deprecated.
|
static String |
DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
Deprecated.
|
static int |
DUMMY_THREAD_INDEX |
static String |
EXACTLY_ONCE
Deprecated.
Since 3.0.0, will be removed in 4.0. Use
"exactly_once_v2" instead. |
static String |
EXACTLY_ONCE_BETA
Deprecated.
Since 3.0.0, will be removed in 4.0. Use
"exactly_once_v2" instead. |
static String |
EXACTLY_ONCE_V2
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static String |
GLOBAL_CONSUMER_PREFIX
Prefix used to override
consumer configs for the global consumer client from
the general consumer client configs. |
static String |
IN_MEMORY |
static String |
MAIN_CONSUMER_PREFIX
Prefix used to override
consumer configs for the main consumer client from
the general consumer client configs. |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE |
static int |
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH |
static String |
MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms |
static long |
MAX_TASK_IDLE_MS_DISABLED |
static String |
MAX_TASK_IDLE_MS_DOC |
static String |
MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas |
static String |
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms |
static String |
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters |
static String |
METRICS_LATEST
Config value for parameter
"built.in.metrics.version" for the latest built-in metrics version. |
static String |
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples |
static String |
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level |
static String |
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms |
static String |
NO_OPTIMIZATION
Config value for parameter
"topology.optimization" for disabling topology optimization |
static String |
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas |
static String |
NUM_STREAM_THREADS_CONFIG
num.stream.threads |
static String |
OPTIMIZE
Config value for parameter
"topology.optimization" for enabling topology optimization |
static String |
POLL_MS_CONFIG
poll.ms |
static String |
PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms |
static String |
PROCESSING_GUARANTEE_CONFIG
processing.guarantee |
static String |
PRODUCER_PREFIX
Prefix used to isolate
producer configs from other client configs. |
static String |
RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
rack.aware.assignment.tags |
static String |
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes |
static String |
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max |
static String |
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms |
static String |
REPARTITION_PURGE_INTERVAL_MS_CONFIG
repartition.purge.interval.ms |
static String |
REPLICATION_FACTOR_CONFIG
replication.factor |
static String |
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms |
static String |
RESTORE_CONSUMER_PREFIX
Prefix used to override
consumer configs for the restore consumer client from
the general consumer client configs. |
static String |
RETRIES_CONFIG
Deprecated.
since 2.7
|
static String |
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms |
static String |
ROCKS_DB |
static String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter |
static String |
SECURITY_PROTOCOL_CONFIG
security.protocol |
static String |
SEND_BUFFER_CONFIG
send.buffer.bytes |
static String |
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay |
static String |
STATE_DIR_CONFIG
state.dir |
static String |
TASK_TIMEOUT_MS_CONFIG
task.timeout.ms |
static String |
TASK_TIMEOUT_MS_DOC |
static String |
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static String |
TOPOLOGY_OPTIMIZATION
Deprecated.
since 2.7; use
TOPOLOGY_OPTIMIZATION_CONFIG instead |
static String |
TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization |
static String |
UPGRADE_FROM_0100
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.0.x . |
static String |
UPGRADE_FROM_0101
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.1.x . |
static String |
UPGRADE_FROM_0102
Config value for parameter
"upgrade.from" for upgrading an application from version 0.10.2.x . |
static String |
UPGRADE_FROM_0110
Config value for parameter
"upgrade.from" for upgrading an application from version 0.11.0.x . |
static String |
UPGRADE_FROM_10
Config value for parameter
"upgrade.from" for upgrading an application from version 1.0.x . |
static String |
UPGRADE_FROM_11
Config value for parameter
"upgrade.from" for upgrading an application from version 1.1.x . |
static String |
UPGRADE_FROM_20
Config value for parameter
"upgrade.from" for upgrading an application from version 2.0.x . |
static String |
UPGRADE_FROM_21
Config value for parameter
"upgrade.from" for upgrading an application from version 2.1.x . |
static String |
UPGRADE_FROM_22
Config value for parameter
"upgrade.from" for upgrading an application from version 2.2.x . |
static String |
UPGRADE_FROM_23
Config value for parameter
"upgrade.from" for upgrading an application from version 2.3.x . |
static String |
UPGRADE_FROM_24
Config value for parameter
"upgrade.from" for upgrading an application from version 2.4.x . |
static String |
UPGRADE_FROM_25
Config value for parameter
"upgrade.from" for upgrading an application from version 2.5.x . |
static String |
UPGRADE_FROM_26
Config value for parameter
"upgrade.from" for upgrading an application from version 2.6.x . |
static String |
UPGRADE_FROM_27
Config value for parameter
"upgrade.from" for upgrading an application from version 2.7.x . |
static String |
UPGRADE_FROM_28
Config value for parameter
"upgrade.from" for upgrading an application from version 2.8.x . |
static String |
UPGRADE_FROM_30
Config value for parameter
"upgrade.from" for upgrading an application from version 3.0.x . |
static String |
UPGRADE_FROM_31
Config value for parameter
"upgrade.from" for upgrading an application from version 3.1.x . |
static String |
UPGRADE_FROM_32
Config value for parameter
"upgrade.from" for upgrading an application from version 3.2.x . |
static String |
UPGRADE_FROM_CONFIG
upgrade.from |
static String |
WINDOW_SIZE_MS_CONFIG
window.size.ms |
static String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms |
static String |
WINDOWED_INNER_CLASS_SERDE |
Modifier | Constructor and Description |
---|---|
|
StreamsConfig(Map<?,?> props)
Create a new
StreamsConfig using the given properties. |
protected |
StreamsConfig(Map<?,?> props,
boolean doLog) |
Modifier and Type | Method and Description |
---|---|
static String |
adminClientPrefix(String adminClientProp)
Prefix a property with
ADMIN_CLIENT_PREFIX . |
static String |
clientTagPrefix(String clientTagKey)
Prefix a client tag key with
CLIENT_TAG_PREFIX . |
static org.apache.kafka.common.config.ConfigDef |
configDef()
Return a copy of the config definition.
|
static String |
consumerPrefix(String consumerProp)
Prefix a property with
CONSUMER_PREFIX . |
DeserializationExceptionHandler |
defaultDeserializationExceptionHandler() |
org.apache.kafka.common.serialization.Serde |
defaultKeySerde()
Return an
configured instance of key Serde
class . |
ProductionExceptionHandler |
defaultProductionExceptionHandler() |
TimestampExtractor |
defaultTimestampExtractor() |
org.apache.kafka.common.serialization.Serde |
defaultValueSerde()
Return an
configured instance of value
Serde class . |
Map<String,Object> |
getAdminConfigs(String clientId)
Get the configs for the
admin client . |
Map<String,String> |
getClientTags()
Get the configured client tags set with
CLIENT_TAG_PREFIX prefix. |
Map<String,Object> |
getGlobalConsumerConfigs(String clientId)
Get the configs for the
global consumer . |
Map<String,Object> |
getMainConsumerConfigs(String groupId,
String clientId,
int threadIdx)
Get the configs to the
main consumer . |
Map<String,Object> |
getProducerConfigs(String clientId)
Get the configs for the
producer . |
Map<String,Object> |
getRestoreConsumerConfigs(String clientId)
Get the configs for the
restore-consumer . |
static String |
globalConsumerPrefix(String consumerProp)
Prefix a property with
GLOBAL_CONSUMER_PREFIX . |
static void |
main(String[] args) |
static String |
mainConsumerPrefix(String consumerProp)
Prefix a property with
MAIN_CONSUMER_PREFIX . |
protected Map<String,Object> |
postProcessParsedConfig(Map<String,Object> parsedValues) |
static String |
producerPrefix(String producerProp)
Prefix a property with
PRODUCER_PREFIX . |
static String |
restoreConsumerPrefix(String consumerProp)
Prefix a property with
RESTORE_CONSUMER_PREFIX . |
static String |
topicPrefix(String topicProp)
Prefix a property with
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics. |
documentationOf, equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, nonInternalValues, originals, originals, originalsStrings, originalsWithPrefix, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixAllOrNothing, valuesWithPrefixOverride
public static final int DUMMY_THREAD_INDEX
public static final long MAX_TASK_IDLE_MS_DISABLED
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
public static final String TOPIC_PREFIX
TopicConfig
.
It is recommended to use topicPrefix(String)
.public static final String CONSUMER_PREFIX
consumer
configs from other client configs.
It is recommended to use consumerPrefix(String)
to add this prefix to consumer
properties
.public static final String MAIN_CONSUMER_PREFIX
consumer
configs for the main consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. main.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String RESTORE_CONSUMER_PREFIX
consumer
configs for the restore consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. restore.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String GLOBAL_CONSUMER_PREFIX
consumer
configs for the global consumer client from
the general consumer client configs. The override precedence is the following (from highest to lowest precedence):
1. global.consumer.[config-name]
2. consumer.[config-name]
3. [config-name]public static final String PRODUCER_PREFIX
producer
configs from other client configs.
It is recommended to use producerPrefix(String)
to add this prefix to producer
properties
.public static final String ADMIN_CLIENT_PREFIX
admin
configs from other client configs.
It is recommended to use adminClientPrefix(String)
to add this prefix to admin
client properties
.public static final String CLIENT_TAG_PREFIX
public static final String NO_OPTIMIZATION
"topology.optimization"
for disabling topology optimizationpublic static final String OPTIMIZE
"topology.optimization"
for enabling topology optimizationpublic static final String UPGRADE_FROM_0100
"upgrade.from"
for upgrading an application from version 0.10.0.x
.public static final String UPGRADE_FROM_0101
"upgrade.from"
for upgrading an application from version 0.10.1.x
.public static final String UPGRADE_FROM_0102
"upgrade.from"
for upgrading an application from version 0.10.2.x
.public static final String UPGRADE_FROM_0110
"upgrade.from"
for upgrading an application from version 0.11.0.x
.public static final String UPGRADE_FROM_10
"upgrade.from"
for upgrading an application from version 1.0.x
.public static final String UPGRADE_FROM_11
"upgrade.from"
for upgrading an application from version 1.1.x
.public static final String UPGRADE_FROM_20
"upgrade.from"
for upgrading an application from version 2.0.x
.public static final String UPGRADE_FROM_21
"upgrade.from"
for upgrading an application from version 2.1.x
.public static final String UPGRADE_FROM_22
"upgrade.from"
for upgrading an application from version 2.2.x
.public static final String UPGRADE_FROM_23
"upgrade.from"
for upgrading an application from version 2.3.x
.public static final String UPGRADE_FROM_24
"upgrade.from"
for upgrading an application from version 2.4.x
.public static final String UPGRADE_FROM_25
"upgrade.from"
for upgrading an application from version 2.5.x
.public static final String UPGRADE_FROM_26
"upgrade.from"
for upgrading an application from version 2.6.x
.public static final String UPGRADE_FROM_27
"upgrade.from"
for upgrading an application from version 2.7.x
.public static final String UPGRADE_FROM_28
"upgrade.from"
for upgrading an application from version 2.8.x
.public static final String UPGRADE_FROM_30
"upgrade.from"
for upgrading an application from version 3.0.x
.public static final String UPGRADE_FROM_31
"upgrade.from"
for upgrading an application from version 3.1.x
.public static final String UPGRADE_FROM_32
"upgrade.from"
for upgrading an application from version 3.2.x
.public static final String AT_LEAST_ONCE
"processing.guarantee"
for at-least-once processing guarantees.@Deprecated public static final String EXACTLY_ONCE
"exactly_once_v2"
instead."processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once processing semantics requires broker version 0.11.0 or higher.
If you enable this feature Kafka Streams will use more resources (like broker connections)
compared to "at_least_once"
and "exactly_once_v2"
.
@Deprecated public static final String EXACTLY_ONCE_BETA
"exactly_once_v2"
instead."processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once (beta) requires broker version 2.5 or higher.
If you enable this feature Kafka Streams will use fewer resources (like broker connections)
compared to the EXACTLY_ONCE
(deprecated) case.
public static final String EXACTLY_ONCE_V2
"processing.guarantee"
for exactly-once processing guarantees.
Enabling exactly-once-v2 requires broker version 2.5 or higher.
public static final String METRICS_LATEST
"built.in.metrics.version"
for the latest built-in metrics version.public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG
acceptable.recovery.lag
public static final String APPLICATION_ID_CONFIG
application.id
public static final String APPLICATION_SERVER_CONFIG
application.server
public static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition
public static final String BUFFERED_RECORDS_PER_PARTITION_DOC
public static final String BUILT_IN_METRICS_VERSION_CONFIG
built.in.metrics.version
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering
public static final String CACHE_MAX_BYTES_BUFFERING_DOC
public static final String CLIENT_ID_CONFIG
client.id
public static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG
repartition.purge.interval.ms
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
default.production.exception.handler
public static final String DEFAULT_DSL_STORE_CONFIG
default.dsl.store
public static final String DEFAULT_DSL_STORE_DOC
public static final String ROCKS_DB
public static final String IN_MEMORY
@Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
default.windowed.key.serde.inner
@Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
default.windowed.value.serde.inner
public static final String WINDOWED_INNER_CLASS_SERDE
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default.timestamp.extractor
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC
public static final String MAX_TASK_IDLE_MS_CONFIG
max.task.idle.ms
public static final String MAX_TASK_IDLE_MS_DOC
public static final String MAX_WARMUP_REPLICAS_CONFIG
max.warmup.replicas
public static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.ms
public static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples
public static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level
public static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporters
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms
public static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas
public static final String NUM_STREAM_THREADS_CONFIG
num.stream.threads
public static final String POLL_MS_CONFIG
poll.ms
public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG
probing.rebalance.interval.ms
public static final String PROCESSING_GUARANTEE_CONFIG
processing.guarantee
public static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytes
public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG
rack.aware.assignment.tags
public static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max
public static final String REPLICATION_FACTOR_CONFIG
replication.factor
public static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms
@Deprecated public static final String RETRIES_CONFIG
retries
This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
public static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms
public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter
public static final String SECURITY_PROTOCOL_CONFIG
security.protocol
public static final String SEND_BUFFER_CONFIG
send.buffer.bytes
public static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay
public static final String STATE_DIR_CONFIG
state.dir
public static final String TASK_TIMEOUT_MS_CONFIG
task.timeout.ms
public static final String TASK_TIMEOUT_MS_DOC
public static final String TOPOLOGY_OPTIMIZATION_CONFIG
topology.optimization
public static final String WINDOW_SIZE_MS_CONFIG
window.size.ms
public static final String UPGRADE_FROM_CONFIG
upgrade.from
public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms
@Deprecated public static final String TOPOLOGY_OPTIMIZATION
TOPOLOGY_OPTIMIZATION_CONFIG
insteadtopology.optimization
public StreamsConfig(Map<?,?> props)
StreamsConfig
using the given properties.props
- properties that specify Kafka Streams and internal consumer/producer configurationprotected StreamsConfig(Map<?,?> props, boolean doLog)
public static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX
. This is used to isolate consumer configs
from other client configs.consumerProp
- the consumer property to be maskedCONSUMER_PREFIX
+ consumerProp
public static String mainConsumerPrefix(String consumerProp)
MAIN_CONSUMER_PREFIX
. This is used to isolate main consumer configs
from other client configs.consumerProp
- the consumer property to be maskedMAIN_CONSUMER_PREFIX
+ consumerProp
public static String restoreConsumerPrefix(String consumerProp)
RESTORE_CONSUMER_PREFIX
. This is used to isolate restore consumer configs
from other client configs.consumerProp
- the consumer property to be maskedRESTORE_CONSUMER_PREFIX
+ consumerProp
public static String clientTagPrefix(String clientTagKey)
CLIENT_TAG_PREFIX
.clientTagKey
- client tag keyCLIENT_TAG_PREFIX
+ clientTagKey
public static String globalConsumerPrefix(String consumerProp)
GLOBAL_CONSUMER_PREFIX
. This is used to isolate global consumer configs
from other client configs.consumerProp
- the consumer property to be maskedGLOBAL_CONSUMER_PREFIX
+ consumerProp
public static String producerPrefix(String producerProp)
PRODUCER_PREFIX
. This is used to isolate producer configs
from other client configs.producerProp
- the producer property to be maskedproducerProp
public static String adminClientPrefix(String adminClientProp)
ADMIN_CLIENT_PREFIX
. This is used to isolate admin configs
from other client configs.adminClientProp
- the admin client property to be maskedadminClientProp
public static String topicPrefix(String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp
- the topic property to be maskedtopicProp
public static org.apache.kafka.common.config.ConfigDef configDef()
protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
postProcessParsedConfig
in class org.apache.kafka.common.config.AbstractConfig
public Map<String,Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx)
main consumer
.
Properties using the prefix MAIN_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in MAIN_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by MAIN_CONSUMER_PREFIX
, main consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.groupId
- consumer groupIdclientId
- clientIdthreadIdx
- stream thread indexpublic Map<String,Object> getRestoreConsumerConfigs(String clientId)
restore-consumer
.
Properties using the prefix RESTORE_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in RESTORE_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by RESTORE_CONSUMER_PREFIX
, restore consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.clientId
- clientIdpublic Map<String,Object> getGlobalConsumerConfigs(String clientId)
global consumer
.
Properties using the prefix GLOBAL_CONSUMER_PREFIX
will be used in favor over
the properties prefixed with CONSUMER_PREFIX
and the non-prefixed versions
(read the override precedence ordering in GLOBAL_CONSUMER_PREFIX
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.
If not specified by GLOBAL_CONSUMER_PREFIX
, global consumer will share the general consumer configs
prefixed by CONSUMER_PREFIX
.clientId
- clientIdpublic Map<String,Object> getProducerConfigs(String clientId)
producer
.
Properties using the prefix PRODUCER_PREFIX
will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId
- clientIdpublic Map<String,Object> getAdminConfigs(String clientId)
admin client
.clientId
- clientIdpublic Map<String,String> getClientTags()
CLIENT_TAG_PREFIX
prefix.public org.apache.kafka.common.serialization.Serde defaultKeySerde()
configured
instance of key Serde
class
.public org.apache.kafka.common.serialization.Serde defaultValueSerde()
configured
instance of value
Serde class
.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public ProductionExceptionHandler defaultProductionExceptionHandler()
public static void main(String[] args)