重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Kafka Streams を Confluent Cloud に接続

Confluent Cloud の Confluent Platform Apache Kafka® クラスターに Kafka Streams を接続できます。

前提条件

Streams を Confluent Cloud に接続するには、ここに示すプロパティを使用して、既存の Streams 構成 をアップデートします。

  1. java.util.Properties インスタンスを作成します。

  2. Streams アプリケーションを構成します。Streams を使用する前に、Kafka と Kafka Streams の構成オプションを java.util.Properties インスタンスで構成する必要があります。この例では、Confluent Cloud ブローカーエンドポイント(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)と SASL(SASL_JAAS_CONFIG)を構成する必要があります。

    import java.util.Properties;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.streams.StreamsConfig;
    
    Properties props = new Properties();
    // Comma-separated list of the Confluent Cloud broker endpoints. For example:
    // r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required \
    username=\"<api-key>\" password=\"<api-secret>\";");
    
    // Recommended performance/resilience settings
    props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647);
    props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807);
    
    // Any further settings
    props.put(... , ...);
    
  3. (オプション)GitHub の ccloud/examples/template_delta_configs にある java_streams.delta の例に従って Confluent Cloud スキーマレジストリ の構成を Streams アプリケーションに追加します。

    // Confluent Schema Registry for Java
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");
    props.put("schema.registry.url", "https://<SCHEMA_REGISTRY_ENDPOINT>");