Clojure: Apache Kafka® のサンプルコード

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Clojure クライアントアプリケーションを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。



  • Java 8 以降(Clojure 1.10 では、Java 8 または Java 11 の使用を推奨)
  • デモのコンパイルと実行を行うための Leiningen ツール

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。


  1. confluentinc/examples GitHub リポジトリのクローンを作成し、7.1.1-post ブランチをチェックアウトします。

    git clone
    cd examples
    git checkout 7.1.1-post
  2. Clojure のサンプルのディレクトリに変更します。

    cd clients/cloud/clojure/
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/java.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      # Required for correctness in Apache Kafka clients prior to 2.6
      # Best practice for higher availability in Apache Kafka clients prior to 3.0
      # Best practice for Kafka producer to prevent data loss 
    • ローカルホストのテンプレート構成ファイル

      # Kafka


このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。


  1. プロデューサーを実行して、次の引数を渡します。

    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    • トピック名
    lein producer $HOME/.confluent/java.config test1


    Producing record: alice     {"count":0}
    Producing record: alice     {"count":1}
    Producing record: alice     {"count":2}
    Producing record: alice     {"count":3}
    Producing record: alice     {"count":4}
    Produced record to topic test1 partition [0] @ offest 0
    Produced record to topic test1 partition [0] @ offest 1
    Produced record to topic test1 partition [0] @ offest 2
    Produced record to topic test1 partition [0] @ offest 3
    Produced record to topic test1 partition [0] @ offest 4
    Producing record: alice     {"count":5}
    Producing record: alice     {"count":6}
    Producing record: alice     {"count":7}
    Producing record: alice     {"count":8}
    Producing record: alice     {"count":9}
    Produced record to topic test1 partition [0] @ offest 5
    Produced record to topic test1 partition [0] @ offest 6
    Produced record to topic test1 partition [0] @ offest 7
    Produced record to topic test1 partition [0] @ offest 8
    Produced record to topic test1 partition [0] @ offest 9
    10 messages were produced to topic test1!
  2. プロデューサーコード を表示します。


  1. コンシューマーを実行して、次の引数を渡します。

    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    • 前の手順で使用したトピック名
    lein consumer $HOME/.confluent/java.config test1
  2. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    Waiting for message in KafkaConsumer.poll
    Consumed record with key alice and value {"count":0}, and updated total count to 0
    Consumed record with key alice and value {"count":1}, and updated total count to 1
    Consumed record with key alice and value {"count":2}, and updated total count to 3
    Consumed record with key alice and value {"count":3}, and updated total count to 6
    Consumed record with key alice and value {"count":4}, and updated total count to 10
    Consumed record with key alice and value {"count":5}, and updated total count to 15
    Consumed record with key alice and value {"count":6}, and updated total count to 21
    Consumed record with key alice and value {"count":7}, and updated total count to 28
    Consumed record with key alice and value {"count":8}, and updated total count to 36
    Consumed record with key alice and value {"count":9}, and updated total count to 45
    Waiting for message in KafkaConsumer.poll
  3. コンシューマーコード を表示します。