Connecting Kafka Streams to Confluent Cloud¶
You can connect Kafka Streams to your Confluent Platform Apache Kafka® cluster in Confluent Cloud.
Prerequisites
To connect Streams to Confluent Cloud, update your existing Streams configs with the properties described here.
Create a
java.util.Properties
instance.Configure your streams application. Kafka and Kafka Streams configuration options must be configured in the
java.util.Properties
instance before using Streams. In this example you must configure the Confluent Cloud broker endpoints (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
) and SASL config (SASL_JAAS_CONFIG
)import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.streams.StreamsConfig; Properties props = new Properties(); // Comma-separated list of the Confluent Cloud broker endpoints. For example: // r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required \ username=\"<api-key>\" password=\"<api-secret>\";"); // Recommended performance/resilience settings props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647); props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807); // Any further settings props.put(... , ...);
(Optional) Add configs for Confluent Cloud Schema Registry to your streams application per the example in java_streams.delta on GitHub at ccloud/examples/template_delta_configs.
// Confluent Schema Registry for Java props.put("basic.auth.credentials.source", "USER_INFO"); props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>"); props.put("schema.registry.url", "https://<SCHEMA_REGISTRY_ENDPOINT>");
- For more information, see Configuring a Streams Application.
- To view a working example of hybrid Apache Kafka® clusters from self-hosted to Confluent Cloud, see cp-demo.
- For example configs for all Confluent Platform components and clients connecting to Confluent Cloud, see template examples for components.
- To look at all the code used in the Confluent Cloud demo, see the Confluent Cloud demo examples.