Streams Configurations¶
This topic provides configuration parameters available for Confluent Platform. The Kafka Streams parameters are organized by order of importance, ranked from high to low.
- 
application.idAn identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix. Type: string Default: Valid Values: Importance: high 
- 
bootstrap.serversA list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).Type: list Default: Valid Values: Importance: high 
- 
replication.factorThe replication factor for change log topics and repartition topics created by the stream processing application. If your broker cluster is on version 2.4 or newer, you can set -1 to use the broker default replication factor. Type: int Default: 1 Valid Values: Importance: high 
- 
state.dirDirectory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem. Type: string Default: /tmp/kafka-streams Valid Values: Importance: high 
- 
acceptable.recovery.lagThe maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up for an active task.Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0. Type: long Default: 10000 Valid Values: [0,...] Importance: medium 
- 
cache.max.bytes.bufferingMaximum number of memory bytes to be used for buffering across all threads Type: long Default: 10485760 Valid Values: [0,...] Importance: medium 
- 
client.idAn ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern ' -StreamThread- - '. Type: string Default: "" Valid Values: Importance: medium 
- 
default.deserialization.exception.handlerException handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.LogAndFailExceptionHandler Valid Values: Importance: medium 
- 
default.key.serdeDefault serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serdeinterface. Note when windowed serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serdeinterface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as wellType: class Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde Valid Values: Importance: medium 
- 
default.production.exception.handlerException handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandlerinterface.Type: class Default: org.apache.kafka.streams.errors.DefaultProductionExceptionHandler Valid Values: Importance: medium 
- 
default.timestamp.extractorDefault timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractorinterface.Type: class Default: org.apache.kafka.streams.processor.FailOnInvalidTimestamp Valid Values: Importance: medium 
- 
default.value.serdeDefault serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serdeinterface. Note when windowed serde class is used, one needs to set the inner serde class that implements theorg.apache.kafka.common.serialization.Serdeinterface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as wellType: class Default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde Valid Values: Importance: medium 
- 
default.windowed.key.serde.innerDefault serializer / deserializer for the inner class of a windowed key. Must implement the org.apache.kafka.common.serialization.Serdeinterface.Type: class Default: null Valid Values: Importance: medium 
- 
default.windowed.value.serde.innerDefault serializer / deserializer for the inner class of a windowed value. Must implement the org.apache.kafka.common.serialization.Serdeinterface.Type: class Default: null Valid Values: Importance: medium 
- 
max.task.idle.msMaximum amount of time in milliseconds a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams. Type: long Default: 0 Valid Values: Importance: medium 
- 
max.warmup.replicasThe maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker traffic and cluster state can be used for high availability. Must be at least 1. Type: int Default: 2 Valid Values: [1,...] Importance: medium 
- 
num.standby.replicasThe number of standby replicas for each task. Type: int Default: 0 Valid Values: Importance: medium 
- 
num.stream.threadsThe number of threads to execute stream processing. Type: int Default: 1 Valid Values: Importance: medium 
- 
processing.guaranteeThe processing guarantee that should be used. Possible values are at_least_once(default),exactly_once(requires brokers version 0.11.0 or higher), andexactly_once_beta(requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker settingtransaction.state.log.replication.factorandtransaction.state.log.min.isr.Type: string Default: at_least_once Valid Values: [at_least_once, exactly_once, exactly_once_beta] Importance: medium 
- 
security.protocolProtocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Type: string Default: PLAINTEXT Valid Values: Importance: medium 
- 
task.timeout.msThe maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task would raise an error for the first internal error. For any timeout larger than 0ms, a task will retry at least once before an error is raised. Type: long Default: 300000 (5 minutes) Valid Values: [0,...] Importance: medium 
- 
topology.optimizationA configuration telling Kafka Streams if it should optimize the topology, disabled by default Type: string Default: none Valid Values: [none, all] Importance: medium 
- 
application.serverA host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance. Type: string Default: "" Valid Values: Importance: low 
- 
buffered.records.per.partitionMaximum number of records to buffer per partition. Type: int Default: 1000 Valid Values: Importance: low 
- 
built.in.metrics.versionVersion of the built-in metrics to use. Type: string Default: latest Valid Values: [0.10.0-2.4, latest] Importance: low 
- 
commit.interval.msThe frequency in milliseconds with which to save the position of the processor. (Note, if processing.guaranteeis set toexactly_once, the default value is100, otherwise the default value is30000.Type: long Default: 30000 (30 seconds) Valid Values: [0,...] Importance: low 
- 
connections.max.idle.msClose idle connections after the number of milliseconds specified by this config. Type: long Default: 540000 (9 minutes) Valid Values: Importance: low 
- 
metadata.max.age.msThe period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Type: long Default: 300000 (5 minutes) Valid Values: [0,...] Importance: low 
- 
metric.reportersA list of classes to use as metrics reporters. Implementing the org.apache.kafka.common.metrics.MetricsReporterinterface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.Type: list Default: "" Valid Values: Importance: low 
- 
metrics.num.samplesThe number of samples maintained to compute metrics. Type: int Default: 2 Valid Values: [1,...] Importance: low 
- 
metrics.recording.levelThe highest recording level for metrics. Type: string Default: INFO Valid Values: [INFO, DEBUG, TRACE] Importance: low 
- 
metrics.sample.window.msThe window of time a metrics sample is computed over. Type: long Default: 30000 (30 seconds) Valid Values: [0,...] Importance: low 
- 
partition.grouperPartition grouper class that implements the org.apache.kafka.streams.processor.PartitionGrouperinterface. WARNING: This config is deprecated and will be removed in 3.0.0 release.Type: class Default: org.apache.kafka.streams.processor.DefaultPartitionGrouper Valid Values: Importance: low 
- 
poll.msThe amount of time in milliseconds to block waiting for input. Type: long Default: 100 Valid Values: Importance: low 
- 
probing.rebalance.interval.msThe maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute. Type: long Default: 600000 (10 minutes) Valid Values: [60000,...] Importance: low 
- 
receive.buffer.bytesThe size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. Type: int Default: 32768 (32 kibibytes) Valid Values: [-1,...] Importance: low 
- 
reconnect.backoff.max.msThe maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Type: long Default: 1000 (1 second) Valid Values: [0,...] Importance: low 
- 
reconnect.backoff.msThe base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. Type: long Default: 50 Valid Values: [0,...] Importance: low 
- 
request.timeout.msThe configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. Type: int Default: 40000 (40 seconds) Valid Values: [0,...] Importance: low 
- 
retriesSetting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request. Type: int Default: 0 Valid Values: [0,...,2147483647] Importance: low 
- 
retry.backoff.msThe amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Type: long Default: 100 Valid Values: [0,...] Importance: low 
- 
rocksdb.config.setterA Rocks DB config setter class or class name that implements the org.apache.kafka.streams.state.RocksDBConfigSetterinterfaceType: class Default: null Valid Values: Importance: low 
- 
send.buffer.bytesThe size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. Type: int Default: 131072 (128 kibibytes) Valid Values: [-1,...] Importance: low 
- 
state.cleanup.delay.msThe amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.mswill be removedType: long Default: 600000 (10 minutes) Valid Values: Importance: low 
- 
upgrade.fromAllows upgrading in a backward compatible way. This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. When upgrading from 2.4 to a newer version it is not required to specify this config. Default is `null`. Accepted values are "0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", "1.1", "2.0", "2.1", "2.2", "2.3" (for upgrading from the corresponding old version). Type: string Default: null Valid Values: [null, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3] Importance: low 
- 
window.size.msSets window size for the deserializer in order to calculate window end times. Type: long Default: null Valid Values: Importance: low 
- 
windowstore.changelog.additional.retention.msAdded to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day Type: long Default: 86400000 (1 day) Valid Values: Importance: low 
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.