MQTT Proxy Configuration Properties for Confluent Platform

Network

listeners

The hostname and the port of the listener. For example: tcp://myhost:1883 or https://0.0.0.0:8081.

  • Type: string
  • Default: 0.0.0.0:1883
  • Valid Values: A host/port pair identifying the listener in the form host:port.
  • Importance: high
network.threads.num

Number of threads deployed by Confluent MQTT Proxy to handle network IO.

  • Type: int
  • Default: 1
  • Valid Values: [1,…]
  • Importance: high
network.epoll.enabled

Use epoll for incoming connections on Linux instead of standard NIO. Enabling this property when epoll is not available will result in configuration error.

  • Type: boolean
  • Default: false
  • Valid Values: [false, true]
  • Importance: high

Protocol

mqtt.message.max.bytes

Maximum allowed MQTT message size in bytes (excluding the fixed header).

  • Type: int
  • Default: 8092
  • Valid Values: [0,…]
  • Importance: medium
listeners.security.protocol

Protocol used between MQTT clients and Confluent MQTT Proxy.

  • Type: string
  • Default: PLAINTEXT
  • Valid Values: [PLAINTEXT, SSL, TLS, SASL_PLAINTEXT, SASL_SSL, SASL_TLS]
  • Importance: high

For information about communication settings for security, authentication, and encryption, see Secure Communication for MQTT Proxy on Confluent Platform.

listeners.ssl.protocol

The TLS protocol used to generate the SSLContext. The default is TLSv1.3 when running with Java 11 or newer, TLSv1.2 otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 might be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this configuration and ssl.enabled.protocols, clients downgrade to TLSv1.2 if the server does not support TLSv1.3. If this configuration is set to TLSv1.2, clients do not use TLSv1.3, even if it is one of the values in ssl.enabled.protocols and the server only supports TLSv1.3.

  • Type: string
  • Default: TLSv1.3
  • Importance: medium
listeners.ssl.provider

The name of the security provider used for TLS connections. Default value is the default security provider of the JVM.

  • Type: string
  • Default: null
  • Importance: medium
listeners.ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC, and key exchange algorithms used to negotiate the security settings for a network connection using TLS. By default, all the available cipher suites are supported.

  • Type: list
  • Default: null
  • Importance: low
listeners.ssl.enabled.protocols

The comma-separated list of protocols enabled for TLS connections. The default value is TLSv1.2,TLSv1.3 when running with Java 11 or later, TLSv1.2 otherwise. With the default value for Java 11 (TLSv1.2,TLSv1.3), Kafka clients and brokers prefer TLSv1.3 if both support it, and falls back to TLSv1.2 otherwise (assuming both support at least TLSv1.2).

  • Type: list
  • Default: TLSv1.2,TLSv1.3
  • Importance: medium
listeners.ssl.keystore.type

The file format of the key store file. This is optional for client.

  • Type: string
  • Default: JKS
  • Importance: medium
listeners.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way client authentication.

  • Type: string
  • Default: null
  • Importance: high
listeners.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
listeners.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
listeners.ssl.truststore.type

The file format of the trust store file.

  • Type: string
  • Default: JKS
  • Importance: medium
listeners.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
listeners.ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
listeners.ssl.keymanager.algorithm

The algorithm used by key manager factory for TLS connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: SunX509
  • Importance: low
listeners.ssl.trustmanager.algorithm

The algorithm used by trust manager factory for TLS connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: PKIX
  • Importance: low
listeners.ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate.

  • Type: string
  • Default: https
  • Importance: low
listeners.ssl.secure.random.implementation

The SecureRandom PRNG implementation to use for TLS cryptography operations.

  • Type: string
  • Default: null
  • Importance: low
listeners.ssl.client.auth

Configures server to request client authentication.

  • Type: string
  • Default: none
  • Valid Values: [none, requested, required]
  • Importance: medium

Stream

bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Apache Kafka® cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,…</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

  • Type: string
  • Importance: high
topic.regex.list

A comma-separated list of pairs of type <kafka topic>:<regex> that is used to map MQTT topics to Kafka topics.

  • Type: list
  • Valid Values: A list of pairs in the form <kafka topic1>:<regex1>, <kafka topic2>:<regex2>, ...
  • Importance: high
stream.threads.num

Number of threads publishing records to Kafka

  • Type: int
  • Default: 1
  • Valid Values: [1,…]
  • Importance: high
producer.buffer.memory

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

  • Type: long
  • Default: 33554432
  • Valid Values: [0,…]
  • Importance: high
producer.compression.type

The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

  • Type: string
  • Default: none
  • Importance: high
producer.batch.size

The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

  • Type: int
  • Default: 16384
  • Valid Values: [0,…]
  • Importance: medium
producer.linger.ms

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay&mdash;that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will ‘linger’ for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

  • Type: long
  • Default: 0
  • Valid Values: [0,…]
  • Importance: medium
producer.client.id

An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

  • Type: string
  • Default: “”
  • Importance: medium
producer.send.buffer.bytes

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.

  • Type: int
  • Default: 131072
  • Valid Values: [-1,…]
  • Importance: medium
producer.receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

  • Type: int
  • Default: 32768
  • Valid Values: [-1,…]
  • Importance: medium
producer.max.request.size

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.

  • Type: int
  • Default: 1048576
  • Valid Values: [0,…]
  • Importance: medium
producer.reconnect.backoff.ms

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

  • Type: long
  • Default: 50
  • Valid Values: [0,…]
  • Importance: low
producer.reconnect.backoff.max.ms

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

  • Type: long
  • Default: 1000
  • Valid Values: [0,…]
  • Importance: low
producer.max.block.ms

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

  • Type: long
  • Default: 60000
  • Valid Values: [0,…]
  • Importance: medium
producer.request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

  • Type: int
  • Default: 30000
  • Valid Values: [0,…]
  • Importance: medium
producer.metadata.max.age.ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions.

  • Type: long
  • Default: 300000
  • Valid Values: [0,…]
  • Importance: low
producer.metrics.sample.window.ms

The window of time a metrics sample is computed over.

  • Type: long
  • Default: 30000
  • Valid Values: [0,…]
  • Importance: low
producer.metrics.num.samples

The number of samples maintained to compute metrics.

  • Type: int
  • Default: 2
  • Valid Values: [1,…]
  • Importance: low
producer.metrics.recording.level

The highest recording level for metrics.

  • Type: string
  • Default: INFO
  • Valid Values: [INFO, DEBUG]
  • Importance: low
producer.metric.reporters

A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

producer.max.in.flight.requests.per.connection

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

  • Type: int
  • Default: 5
  • Valid Values: [1,…]
  • Importance: low
producer.connections.max.idle.ms

Close idle connections after the number of milliseconds specified by this config.

  • Type: long
  • Default: 540000
  • Importance: medium
producer.partitioner.class

Partitioner class that implements the org.apache.kafka.clients.producer.Partitioner interface.

  • Type: class
  • Default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • Importance: medium
producer.interceptor.classes

A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

producer.security.protocol

Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

  • Type: string
  • Default: PLAINTEXT
  • Importance: medium
producer.ssl.protocol

The TLS protocol used to generate the SSLContext. The default is TLSv1.3 when running with Java 11 or newer, TLSv1.2 otherwise. This value should be fine for most use cases. Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 might be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. With the default value for this configuration and ssl.enabled.protocols, clients downgrade to TLSv1.2 if the server does not support TLSv1.3. If this configuration is set to TLSv1.2, clients do not use TLSv1.3, even if it is one of the values in ssl.enabled.protocols and the server only supports TLSv1.3.

  • Type: string
  • Default: TLSv1.3
  • Importance: medium
producer.ssl.provider

The name of the security provider used for TLS connections. Default value is the default security provider of the JVM.

  • Type: string
  • Default: null
  • Importance: medium
producer.ssl.cipher.suites

A list of cipher suites. This is a named combination of authentication, encryption, MAC, and key exchange algorithms used to negotiate the security settings for a network connection using TLS. By default, all the available cipher suites are supported.

  • Type: list
  • Default: null
  • Importance: low
producer.ssl.enabled.protocols

The comma-separated list of protocols enabled for TLS connections. The default value is TLSv1.2,TLSv1.3 when running with Java 11 or later, TLSv1.2 otherwise. With the default value for Java 11 (TLSv1.2,TLSv1.3), Kafka clients and brokers prefer TLSv1.3 if both support it, and falls back to TLSv1.2 otherwise (assuming both support at least TLSv1.2).

  • Type: list
  • Default: TLSv1.2,TLSv1.3
  • Importance: medium
producer.ssl.keystore.type

The file format of the key store file. This is optional for client.

  • Type: string
  • Default: JKS
  • Importance: medium
producer.ssl.keystore.location

The location of the key store file. This is optional for client and can be used for two-way client authentication.

  • Type: string
  • Default: null
  • Importance: high
producer.ssl.keystore.password

The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: null
  • Importance: high
producer.ssl.key.password

The password of the private key in the key store file. This is optional for client.

  • Type: password
  • Default: null
  • Importance: high
producer.ssl.truststore.type

The file format of the trust store file.

  • Type: string
  • Default: JKS
  • Importance: medium
producer.ssl.truststore.location

The location of the trust store file.

  • Type: string
  • Default: null
  • Importance: high
producer.ssl.truststore.password

The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.

  • Type: password
  • Default: null
  • Importance: high
producer.ssl.keymanager.algorithm

The algorithm used by key manager factory for TLS connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: SunX509
  • Importance: low
producer.ssl.trustmanager.algorithm

The algorithm used by trust manager factory for TLS connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.

  • Type: string
  • Default: PKIX
  • Importance: low
producer.ssl.endpoint.identification.algorithm

The endpoint identification algorithm to validate server hostname using server certificate.

  • Type: string
  • Default: https
  • Importance: low
producer.ssl.secure.random.implementation

The SecureRandom PRNG implementation to use for TLS cryptography operations.

  • Type: string
  • Default: null
  • Importance: low
producer.sasl.kerberos.service.name

The Kerberos principal name that Kafka runs as. This can be defined either in Kafka’s JAAS config or in Kafka’s config.

  • Type: string
  • Default: null
  • Importance: medium
producer.sasl.kerberos.kinit.cmd

Kerberos kinit command path.

  • Type: string
  • Default: /usr/bin/kinit
  • Importance: low
producer.sasl.kerberos.ticket.renew.window.factor

Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket.

  • Type: double
  • Default: 0.8
  • Importance: low
producer.sasl.kerberos.ticket.renew.jitter

Percentage of random jitter added to the renewal time.

  • Type: double
  • Default: 0.05
  • Importance: low
producer.sasl.kerberos.min.time.before.relogin

Login thread sleep time between refresh attempts.

  • Type: long
  • Default: 60000
  • Importance: low
producer.sasl.login.refresh.window.factor

Login refresh thread will sleep until the specified window factor relative to the credential’s lifetime has been reached, at which time it will try to refresh the credential. Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if no value is specified. Currently applies only to OAUTHBEARER.

  • Type: double
  • Default: 0.8
  • Valid Values: [0.5,…,1.0]
  • Importance: low
producer.sasl.login.refresh.window.jitter

The maximum amount of random jitter relative to the credential’s lifetime that is added to the login refresh thread’s sleep time. Legal values are between 0 and 0.25 (25%) inclusive; a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.

  • Type: double
  • Default: 0.05
  • Valid Values: [0.0,…,0.25]
  • Importance: low
producer.sasl.login.refresh.min.period.seconds

The desired minimum time for the login refresh thread to wait before refreshing a credential, in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

  • Type: short
  • Default: 60
  • Valid Values: [0,…,900]
  • Importance: low
producer.sasl.login.refresh.buffer.seconds

The amount of buffer time before credential expiration to maintain when refreshing a credential, in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

  • Type: short
  • Default: 300
  • Valid Values: [0,…,3600]
  • Importance: low
producer.sasl.mechanism

SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.

  • Type: string
  • Default: GSSAPI
  • Importance: medium
producer.sasl.jaas.config

JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described <a href=”https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html”>here</a>. The format for the value is: ‘<code>loginModuleClass controlFlag (optionName=optionValue)*;</code>’. For brokers, the config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;

  • Type: password
  • Default: null
  • Importance: medium
producer.sasl.client.callback.handler.class

The fully qualified name of a SASL client callback handler class that implements the AuthenticateCallbackHandler interface.

  • Type: class
  • Default: null
  • Importance: medium
producer.sasl.login.callback.handler.class

The fully qualified name of a SASL login callback handler class that implements the AuthenticateCallbackHandler interface. For brokers, login callback handler config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

  • Type: class
  • Default: null
  • Importance: medium
producer.sasl.login.class

The fully qualified name of a class that implements the Login interface. For brokers, login config must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin

  • Type: class
  • Default: null
  • Importance: medium

License

Tip

For complete license information for Confluent Platform, see Manage Confluent Platform Licenses.

confluent.license

Confluent will issue a license key to each subscriber. The license key will be a short snippet of text that you can copy and paste. Without the license key, you can use Kafka-MQTT for a 30-day trial period. If you are a subscriber, contact Confluent Support for more information.

  • Type: string
  • Default: “”
  • Importance: high
confluent.topic.bootstrap.servers

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster used for licensing. All servers in the cluster will be discovered from the initial connection. This list should be in the form <code>host1:port1,host2:port2,…</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

  • Type: list
  • Default: The value of bootstrap.servers property
  • Importance: high
confluent.topic

Name of the Kafka topic used for Confluent Platform configuration, including licensing information.

  • Type: string
  • Default: _confluent-command
  • Importance: low
confluent.topic.replication.factor

The replication factor for the Kafka topic used for Confluent Platform configuration, including licensing information. This is used only if the topic does not already exist, and the default of 3 is appropriate for production use. If you are using a development environment with less than 3 brokers, you must set this to the number of brokers (often 1).

  • Type: int
  • Default: 3
  • Importance: low