コネクターの管理¶
Confluent for Kubernetes (CFK)では、Kubernetes のコネクターカスタムリソース(CR)としてコネクターを宣言的に作成および管理できます。
コネクターは Kafka Connect によって管理されます。コネクター CR を作成する前に、「コネクタープラグインのインストール」の説明に従って、コネクタープラグインをインストールします。
コネクターの作成¶
コネクター CR はそれぞれ、Kafka Connect ワーカー内の 1 つのコネクターにマップされます。
次のように、コネクター CR としてコネクターを作成します。
kind: Connector spec: name: --- [1] taskMax: --- [2] class: --- [3] configs: --- [4] restartPolicy: type: --- [5] maxRetry: --- [6] connectClusterRef: --- [7] connectRest: --- [8]
[1] Optional. Typically, this is the same as
metadata.name
. If you need to have a name which Kubernetes cannot support, specify that name here.[2] Required. The max number of tasks for the connector.
これを設定する場合、リソース消費量について検討してください。この値を大きな数に設定する場合は、最初に開発環境におけるテストが必要になることがあります。
[3] Required. The class name of the connector.
[4] Connector-specific configuration settings as key-value maps. Consult the connector documentation for the required settings.
構成で機密データや証明書が必要になる場合は、「コネクター構成」で詳細を確認してください。
[5] Required. The policy to restart failed tasks of the connector. Set to
OnFailure
orNever
. The default value isOnFailure
, which means it will automatically restart the task when a task failed ifmaxRetry
is not reached.[6] The max retry times to restart when
restartPolicy
type isOnFailure
. The default value is10
.[7] See Connect のクラスター名を使用した Connect の検出.
[8] See Connect エンドポイントを使用した Connect の検出.
構成を適用してコネクターを作成します。
kubectl apply -f <connector CR>
コネクター構成¶
設定のカテゴリーを示すためにコネクター構成(spec.configs
)を使用できます。
- コネクター固有の設定
- 機密情報(ユーザー認証情報など)
- コネクターの TLS 証明書
認証情報のマウントされたシークレット¶
マウントされたシークレット を使用して、コネクター構成内にあるパスワードなどの機密データを保護できます。
コネクターのマウントされたシークレットを使用するには、以下のようにします。
「マウントされたシークレットを使用した Connect 認証情報の指定」の説明に従って、マウントされたシークレットを作成し、Connect CR でシークレット名を指定します。
コネクター CR で、シークレットの場所を変数として指定します。それらの変数は、コネクターの起動時に CFK によって動的に解決されます。
たとえば、
my-credential
という名前のシークレットを使用すると、次のようになります。spec: config: connection.url: "${file:/mnt/secrets/my-credential/custom.properties:connector-url}" connection.user: "${file:/mnt/secrets/my-credential/custom.properties:connector-username}" connection.password: "${file:/mnt/secrets/my-credential/custom.properties:connector-password}"
コネクターの TLS 証明書¶
必要なコネクター TLS 証明書が Connect クラスターに存在しない場合は、以下の手順に従って、Connect クラスター内のコネクターで証明書を使用できるようにします。
「コネクターの証明書の指定」の説明に従って、シークレットを作成し、Connect CR でシークレット名を指定します。
以下のコマンドを使用して、証明書の詳細を取得します。たとえば、キーストアのパス、トラストストアのパス、JKS パスワードなどです。
kubectl get connect -oyaml
これらのパスは、
status.connectorTLSFilePaths
にあります。コネクター CR で、上記の手順で取得した情報を指定します。
以下に例を示します。
spec: configs: ssl.truststore.location: "/mnt/sslcerts/truststore.jks" ssl.truststore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}" ssl.keystore.location: "/mnt/sslcerts/keystore.jks" ssl.keystore.password: "${file:/mnt/sslcerts/jksPassword.txt:jksPassword}"
Connect クラスターの検出¶
以下のいずれかの方法を使用して、コネクターはそれが属している Connect クラスターを検出できます。
connectClusterRef
に Connect クラスターの名前を指定する。このオプションは、Connect クラスターがカスタムリソースとして CFK によってデプロイされている場合に使用します。
connectRest
に Connect クラスターの接続情報を指定する。このオプションは、Connect クラスターが CFK の外部でデプロイされている場合に使用します。
コネクター CR に
connectClusterRef
もconnectRest
も指定されていない場合、CFK は同じ名前空間にある Connect クラスターの検出を試みます。同じ名前空間に複数の Connect クラスターがある場合、CFK はエラーをログに記録します。
Connect のクラスター名を使用した Connect の検出¶
コネクターが Connect クラスター名によってその Connect クラスターを検出できるようにするには、コネクター CR で connectClusterRef
を使用します。
spec:
connectClusterRef:
name: --- [1]
namespace: --- [2]
- [1](必須)|kconnect| クラスターの名前。
- [2](省略可) Connect クラスターのデプロイ先となる名前空間。省略すると、このコネクターと同じ名前空間が想定されます。
Connect エンドポイントを使用した Connect の検出¶
コネクターが属している Connect クラスターの情報をコネクターの CR で指定できます。
Kafka Connect エンドポイント
spec:
connectRest:
endpoint: --- [1]
- [1] Connect URL およびポート。
以下に例を示します。
spec:
connectRest:
endpoint: https://connect.operator.svc.cluster.local:8083
Connect に対する基本認証
spec:
connectRest:
authentication:
type: basic --- [1]
basic:
secretRef: --- [2]
- [1](必須)
- [2](必須) 認証情報を格納するシークレットの名前。必要な形式については、「基本認証」を参照してください。
Connect に対する mTLS 認証
spec:
connectRest:
authentication:
type: mtls --- [1]
tls:
secretRef: --- [2]
- [1](必須)
- [2](必須) 証明書を格納するシークレットの名前。証明書の詳細については、「Confluent for Kubernetes でのネットワーク暗号化の構成」を参照してください。
Connect に対するベアラー認証(RBAC の場合)
RBAC が有効になっている場合、ベアラー認証を次のように構成できます。
spec:
connectRest:
authentication:
type: bearer --- [1]
bearer:
secretRef: --- [2]
- [1](必須)
- [2](必須) ベアラー認証情報を格納するシークレットの名前。必要な形式については、「ベアラー認証」を参照してください。
コネクターの削除¶
コネクター CR を使用してコネクターを削除するには、次のようにします。
kubectl delete -f <connector CR>
コネクター名を使用してコネクターを削除するには、次のようにします。
kubectl delete connector <connector name>
コネクターのステートの表示¶
コネクターの現在の状態を表示するには、以下のコマンドを実行します。
kubectl describe connector <connector-CR-name>
この出力には、以下のフィールドが含まれています。
conditions
- コネクターの最新の観測状況。
connectRestEndpoint
- Connect クラスターの REST エンドポイント。
connectorState
- コネクターインスタンスの実際の状態。
failedTasks
FAILED
ステートのコネクタータスク。failedTasksCount
- 失敗したタスクの数。
kafkaClusterId
- コネクターが属している Kafka クラスターの ID。
restartPolicy
- コネクターの失敗したタスクを再開するためのポリシー。
state
- コネクターの CR ステート。
tasksReady
taskMax
に基づいて実行されているタスクの数。trace
- コネクターインスタンスのエラートレースメッセージ。
workerId
- コネクターインスタンスのワーカー ID。
次のコマンドを実行して、上記の状態フィールドの最新のリストを取得できます。
kubectl explain connector.status