コネクターの管理

Confluent for Kubernetes (CFK)では、Kubernetes のコネクターカスタムリソース(CR)としてコネクターを宣言的に作成および管理できます。

コネクターは Kafka Connect によって管理されます。コネクター CR を作成する前に、「コネクタープラグインのインストール」の説明に従って、コネクタープラグインをインストールします。

コネクターの作成

コネクター CR はそれぞれ、Kafka Connect ワーカー内の 1 つのコネクターにマップされます。

  1. 次のように、コネクター CR としてコネクターを作成します。

    kind: Connector
    spec:
      name:                --- [1]
      taskMax:             --- [2]
      class:               --- [3]
      configs:             --- [4]
      restartPolicy:
        type:              --- [5]
        maxRetry:          --- [6]
      connectClusterRef:   --- [7]
      connectRest:         --- [8]
    
    • [1] Optional. Typically, this is the same as metadata.name. If you need to have a name which Kubernetes cannot support, specify that name here.

    • [2] Required. The max number of tasks for the connector.

      これを設定する場合、リソース消費量について検討してください。この値を大きな数に設定する場合は、最初に開発環境におけるテストが必要になることがあります。

    • [3] Required. The class name of the connector.

    • [4] Connector-specific configuration settings as key-value maps. Consult the connector documentation for the required settings.

      構成で機密データや証明書が必要になる場合は、「コネクター構成」で詳細を確認してください。

    • [5] Required. The policy to restart failed tasks of the connector. Set to OnFailure or Never. The default value is OnFailure, which means it will automatically restart the task when a task failed if maxRetry is not reached.

    • [6] The max retry times to restart when restartPolicy type is OnFailure. The default value is 10.

    • [7] See Connect のクラスター名を使用した Connect の検出.

    • [8] See Connect エンドポイントを使用した Connect の検出.

  2. 構成を適用してコネクターを作成します。

    kubectl apply -f <connector CR>
    

コネクター構成

設定のカテゴリーを示すためにコネクター構成(spec.configs)を使用できます。

認証情報のマウントされたシークレット

マウントされたシークレット を使用して、コネクター構成内にあるパスワードなどの機密データを保護できます。

コネクターのマウントされたシークレットを使用するには、以下のようにします。

  1. マウントされたシークレットを使用した Connect 認証情報の指定」の説明に従って、マウントされたシークレットを作成し、Connect CR でシークレット名を指定します。

  2. コネクター CR で、シークレットの場所を変数として指定します。それらの変数は、コネクターの起動時に CFK によって動的に解決されます。

    たとえば、my-credential という名前のシークレットを使用すると、次のようになります。

    spec:
      config:
        connection.url: "${file:/mnt/secrets/my-credential/custom.properties:connector-url}"
        connection.user: "${file:/mnt/secrets/my-credential/custom.properties:connector-username}"
        connection.password: "${file:/mnt/secrets/my-credential/custom.properties:connector-password}"
    

コネクターの TLS 証明書

必要なコネクター TLS 証明書が Connect クラスターに存在しない場合は、以下の手順に従って、Connect クラスター内のコネクターで証明書を使用できるようにします。

  1. コネクターの証明書の指定」の説明に従って、シークレットを作成し、Connect CR でシークレット名を指定します。

  2. 以下のコマンドを使用して、証明書の詳細を取得します。たとえば、キーストアのパス、トラストストアのパス、JKS パスワードなどです。

    kubectl get connect -oyaml
    

    これらのパスは、status.connectorTLSFilePaths にあります。

  3. コネクター CR で、上記の手順で取得した情報を指定します。

    以下に例を示します。

    spec:
      configs:
        ssl.truststore.location: "/mnt/sslcerts/truststore.jks"
        ssl.truststore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
        ssl.keystore.location: "/mnt/sslcerts/keystore.jks"
        ssl.keystore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
    

Connect クラスターの検出

以下のいずれかの方法を使用して、コネクターはそれが属している Connect クラスターを検出できます。

  • connectClusterRefConnect クラスターの名前を指定する

    このオプションは、Connect クラスターがカスタムリソースとして CFK によってデプロイされている場合に使用します。

  • connectRestConnect クラスターの接続情報を指定する

    このオプションは、Connect クラスターが CFK の外部でデプロイされている場合に使用します。

  • コネクター CR に connectClusterRefconnectRest も指定されていない場合、CFK は同じ名前空間にある Connect クラスターの検出を試みます。同じ名前空間に複数の Connect クラスターがある場合、CFK はエラーをログに記録します。

Connect のクラスター名を使用した Connect の検出

コネクターが Connect クラスター名によってその Connect クラスターを検出できるようにするには、コネクター CR で connectClusterRef を使用します。

spec:
  connectClusterRef:
    name:                 --- [1]
    namespace:            --- [2]
  • [1](必須)|kconnect| クラスターの名前。
  • [2](省略可) Connect クラスターのデプロイ先となる名前空間。省略すると、このコネクターと同じ名前空間が想定されます。

Connect エンドポイントを使用した Connect の検出

コネクターが属している Connect クラスターの情報をコネクターの CR で指定できます。

Kafka Connect エンドポイント

spec:
  connectRest:
    endpoint:             --- [1]
  • [1] Connect URL およびポート。

以下に例を示します。

spec:
  connectRest:
    endpoint: https://connect.operator.svc.cluster.local:8083

Connect に対する基本認証

spec:
  connectRest:
    authentication:
      type: basic                 --- [1]
      basic:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1](必須)

  • [2] または [3] が必須です。

  • [2] 認証情報を格納するシークレットの名前。必要な形式については、「基本認証」を参照してください。

  • [3] Vault によって必要な認証情報が挿入されるコンテナーのディレクトリパス。

    必要な形式については、「基本認証」を参照してください。

    Vault を使用する際の認証情報と必要なアノテーションを提供する方法については、「Confluent Platform アプリケーションの CR へのシークレットの提供」を参照してください。

Connect に対する mTLS 認証

spec:
  connectRest:
    authentication:
      type: mtls                 --- [1]
    tls:
      secretRef:                 --- [2]
      directoryPathInContainer:  --- [3]

Connect に対するベアラー認証(RBAC の場合)

RBAC が有効になっている場合、ベアラー認証を次のように構成できます。

spec:
  connectRest:
    authentication:
      type: bearer                --- [1]
      bearer:
        secretRef:                --- [2]
        directoryPathInContainer: --- [3]
  • [1](必須)

  • [2](必須) ベアラー認証情報を格納するシークレットの名前。必要な形式については、「ベアラー認証」を参照してください。

  • [3] Vault によってベアラー認証情報が挿入されるコンテナーのディレクトリパス

    必要な形式については、「ベアラー認証」を参照してください。

    Vault を使用する際の認証情報と必要なアノテーションを提供する方法については、「Confluent Platform アプリケーションの CR へのシークレットの提供」を参照してください。

コネクターのアップデート

コネクターをアップデートするには、次のようにします。

  1. コネクター CR の構成を編集します。
  2. 変更を適用するには、kubectl apply コマンドを使用します。

コネクターの削除

コネクター CR を使用してコネクターを削除するには、次のようにします。

kubectl delete -f <connector CR>

コネクター名を使用してコネクターを削除するには、次のようにします。

kubectl delete connector <connector name>

コネクターのステートの表示

コネクターの現在の状態を表示するには、以下のコマンドを実行します。

kubectl describe connector <connector-CR-name>

この出力には、以下のフィールドが含まれています。

conditions
コネクターの最新の観測状況。
connectRestEndpoint
Connect クラスターの REST エンドポイント。
connectorState
コネクターインスタンスの実際の状態。
failedTasks
FAILED ステートのコネクタータスク。
failedTasksCount
失敗したタスクの数。
kafkaClusterId
コネクターが属している Kafka クラスターの ID。
restartPolicy
コネクターの失敗したタスクを再開するためのポリシー。
state
コネクターの CR ステート。
tasksReady
taskMax に基づいて実行されているタスクの数。
trace
コネクターインスタンスのエラートレースメッセージ。
workerId
コネクターインスタンスのワーカー ID。

次のコマンドを実行して、上記の状態フィールドの最新のリストを取得できます。

kubectl explain connector.status