Kafka Connect のログ記録

Kafka Connect およびその他の Confluent Platform コンポーネントでは、Java ベースのログ記録ユーティリティ Apache Log4j を使用してランタイムデータを収集し、コンポーネントイベントを記録します。以下の表は、各ログレベルについてまとめたものです。Kafka Connect Log4j のプロパティファイルは、Confluent Platform のインストールディレクトリパス etc/kafka/connect-log4j.properties にあります。

レベル 説明
OFF ログ記録をオフにします。
FATAL 未完了状態での終了につながる重大なエラー。
ERROR その他のランタイムエラーまたは予期しない状態。
WARN 好ましくない、または予期しないランタイムの状況。ただし、必ずしも誤りとは言えないもの。
INFO 起動時およびシャットダウン時の注目すべきランタイムイベント。
DEBUG イベントに関する詳細な診断情報。
TRACE あらゆることに関する詳細な診断情報。

Connect では、デフォルトで INFOWARNERRORFATAL の情報が標準出力(stdout)に出力されます。Connect が起動されると、使用されている設定、処理に伴う WARN および ERROR (または FATAL )のメッセージが出力されます。たとえば、構成に不足がある場合やコネクターに問題がある場合などです。すべてのイベント、起動からシャットダウンまでに Connect およびコネクターで実行された 1 つひとつのステップを表示するには、ログレベルとして DEBUG または TRACE を設定します。

Connect ログの表示

Kafka Connect では、デフォルトでは stdout にログを出力します。次のセクションでは、Connect ログの表示に使用できるコマンドについて説明します。

Docker

Docker で実行されている Confluent Platform に関する Connect ログの最新部分を表示する(tail)には、次のコマンドを入力します。

docker logs -f kafka-connect

Docker でのログレベルの設定方法の詳細については、「Docker のログレベルの変更」を参照してください。

Confluent CLI

ローカルで実行されている Confluent Platform に関するログのスナップショットを表示するには、次のコマンドを入力します。

confluent local services connect log

ログに対してパイプと grep を組み合わせると、特定のログ情報を表示できます。たとえば、S3 コネクターのメッセージを表示するには、次のコマンドを入力します。

confluent local services connect log | grep s3

標準のログコマンドを実行すると、表示される出力に connect.stdout パスが含まれます。このパスをログコマンドで使用すると、ログに追加された新しいメッセージを表示(tail)することができます。以下に例を示します。

confluent local services connect log -f

出力例:

[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_REQUEST_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_STARTED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: CLIENT_REQUEST_SUCCESS_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: REQUEST_BYTE_TRANSFER_EVENT, bytes: 406 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: TRANSFER_PART_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)

コマンドリファレンスについては、「confluent local services connect log」を参照してください。

スクリプトによる Confluent Platform の起動

Confluent Platform の実行に Confluent Platform systemd サービスユニットファイル を使用している場合、Connect のログファイルは /var/log/confluent/connect に書き込まれます。systemd ジャーナルサービスがログを書き込みます。Kafka Connect のログメッセージを表示するには、次の journalctl コマンドを入力します。

sudo journalctl -u confluent-connect

ログの最新部分を表示するには、次のコマンドを入力します。

sudo journalctl -f -u confluent-connect

Log4j プロパティファイル

Confluent Platform には、デフォルトの Connect Log4j テンプレートファイルが用意されています。このプロパティファイルは、Confluent Platform ディレクトリ etc/kafka/connect-log4j.properties にあります。このファイルの内容の例を以下に示します。

log4j.rootLogger=INFO, stdout

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

この構成ファイルを更新して、項目を追加することや、ログレベルを変更することができます。

ログレベルの変更

以下のセクションでは、Connect および実行中のコネクターのデバッグを行うためのログレベルの追加や変更について説明します。

Connect Log4j プロパティファイルの使用

etc/kafka/connect-log4j.properties にある基本的な Connect log4j テンプレートでは、問題のデバッグを行うには不十分である可能性があります。以下に、コンシューマー、プロデューサー、コネクターの DEBUG レベルを設定するために使用する Log4j テンプレートの例を示します。単純にすべてに対して DEBUG を有効にすると、ログが冗長になり、内容を確認しづらくなるため、この方法をお勧めします。また、コネクターのログ記録で TRACE を有効にすると、処理されているレコードを確認できます。

以下の Log4j プロパティファイルの例では、Connect、ワーカータスク、Datagen Source Connector、Amazon S3 Sink Connector について DEBUG レベルを構成しています。この例では、connect.log という名前の別のファイルにもログを送信するための行はコメントアウトされていることに注意してください。

# root log level (if an override to a class or package is not specified,
# it will now log at this level).
log4j.rootLogger=INFO, stdout

# Append logs to console. If the customer is using different appenders,
# update the following lines accordingly. The "%X{connector.context}"
# fragment instructs Connect to include connector- and task-specific information
# on every log message and is now recommended.log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Uncomment the following lines to also send the logs to a file,
# rolling the file at midnight local time. For example, the `File` option
# specifies the location of the log files (e.g. ${kafka.logs.dir}/connect.log),
# and at midnight local time the file is closed and copied in the same
# directory but with a filename that ends in the `DatePattern` option.
#
#log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
#log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
#log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line to debug consumers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.consumer=DEBUG

# Uncomment the following line to debug producers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.producer=DEBUG

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# When one or more connectors are not behaving correctly, enable debug logging only
# for those connectors. Optionally enable TRACE logging to see records that are processed.

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following to enable debug for the JDBC connector:
#log4j.logger.io.confluent.connect.jdbc=DEBUG

# Uncomment the following to enable debug for the Elasticsearch connector:
#log4j.logger.io.confluent.connect.elasticsearch=DEBUG

# Uncomment the following to enable debug for the for the HDFS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.hdfs=DEBUG

# Uncomment the following to enable debug for the for the S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

# Uncomment the following to enable debug for the for the GCS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.gcs=DEBUG

# Uncomment the following to enable debug for the JMS connectors (and derivatives IBM MQ, ActiveMq):
#log4j.logger.io.confluent.connect.jms=DEBUG
#log4j.logger.io.confluent.connect.ibm.mq=DEBUG
#log4j.logger.io.confluent.connect.activemq=DEBUG

# Add similar lines to enable debug for the specific connector(s):
#log4j.logger.<root package of the connector>=DEBUG

Connect API の使用

Confluent Platform と Kafka Connect およびコネクターを実行できたら、Connect API エンドポイントを使用して、ログレベルの確認とログレベルの変更ができます。

注釈

API を使用して行った変更は永続的ではありません。つまり、API を使用して行った変更によって、connect-log4j.properties ファイルのプロパティが変更されることはありません。ワーカーが再起動されると、ログ記録のプロパティは元に戻り、このファイルで指定されたログ記録のプロパティが使用されます。また、API による変更は、Connect の分散クラスター全体ではなく、アクセスされたワーカーのログ記録にのみ適用されます。

この例では、以下の構成プロパティが指定された Connect Log4j プロパティファイルを使用しています。

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

connect.log.pattern=[%d] %p %m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following lines to enable debug for the Amazon S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

ログレベルの確認

現在のログレベルを確認するには、次のコマンドを入力します。JSON フォーマットで出力するには、jq を使用します。

curl -Ss http://localhost:8083/admin/loggers | jq

出力例:

{
  "io.confluent.connect.s3": {
    "level": "DEBUG"
  },
  "io.confluent.connect.storage": {
    "level": "DEBUG"
  },
  "io.confluent.kafka.connect.datagen": {
    "level": "DEBUG"
  },
  "kafka": {
    "level": "ERROR"
  },
  "org.apache.kafka": {
    "level": "ERROR"
  },
  "org.apache.kafka.connect": {
    "level": "DEBUG"
  },
  "org.apache.kafka.connect.runtime.WorkerSinkTask": {
    "level": "DEBUG"
  },
  "org.apache.kafka.connect.runtime.WorkerSourceTask": {
    "level": "DEBUG"
  },
  "org.apache.zookeeper": {
    "level": "ERROR"
  },
  "org.eclipse.jetty": {
    "level": "ERROR"
  },
  "org.reflections": {
    "level": "ERROR"
  },
  "root": {
    "level": "INFO"
  }
}

特定のロガーのログレベルの取得

WorkerSourceTask ロガーのログレベルを確認するには、次のコマンドを入力します。

curl -Ss http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask | jq

出力例:

{
  "level": "DEBUG"
}

特定のロガーのログレベルの変更

WorkerSourceTask ロガーのログレベルを DEBUG から TRACE に変更するには、次のコマンドを入力します。

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "TRACE"}' | jq '.'

出力例:

[
  "org.apache.kafka.connect.runtime.WorkerSourceTask"
]

注釈

これにより、この REST リクエストを受信した特定のワーカーのログレベルが変更されます。このリクエストでは、Connect クラスターの他のワーカーには変更は適用されません。

ログレベルの復元

WorkerSourceTask ロガーのログレベルを DEBUG に戻すには、次のコマンドを入力します。

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "DEBUG"}' | jq '.'

出力例:

[
  "org.apache.kafka.connect.runtime.WorkerSourceTask"
]

注釈

admin/loggers REST API で設定されたログレベルは、ワーカーが再起動されると、無効になります。ワーカーの再起動により、REST コマンドによって行われた変更はすべて元に戻ります。

特定のコネクターのログレベルの変更

Amazon S3 Sink Connector のログレベルを TRACE に変更するには、次のコマンドを入力します。

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.s3 \
-d '{"level": "TRACE"}' | jq '.'

出力例:

[
  "io.confluent.connect.s3",
  "io.confluent.connect.s3.S3SinkConnector",
  "io.confluent.connect.s3.S3SinkConnectorConfig",
  "io.confluent.connect.s3.S3SinkTask",
  "io.confluent.connect.s3.TopicPartitionWriter",
  "io.confluent.connect.s3.format.avro.AvroRecordWriterProvider",
  "io.confluent.connect.s3.storage.S3OutputStream",
  "io.confluent.connect.s3.storage.S3Storage",
  "io.confluent.connect.s3.util.Version"
]

デフォルトのリスナーポートの変更

KIP-495 では /admin/loggers REST API エンドポイントが導入されています。このエンドポイントを使用して、Connect ワーカーの任意の名前付きロガーのログレベルの取得および変更ができます。ワーカーの構成の admin.listeners プロパティにより、このエンドポイントをどこで使用できるかが決まります。/admin/loggers エンドポイントでは、デフォルトの REST API ポート 8083 が使用されます。

Connect admin.listeners プロパティを変更して、admin/loggers エンドポイントのポートを別のポートまたはセキュアポートに変更することや、エンドポイントを無効にすることができます。この変更は、Connect ワーカーの構成ファイルで行います。

  • admin/loggers エンドポイントがリッスンするポートを別のポート(例ではポート 9093 を使用)に変更するには、ワーカーの構成ファイルに以下の行を追加します。

    admin.listeners=http://localhost:9093
    
  • 別の HTTPS リスナーの SSL プロパティを設定するには、ワーカーの構成ファイルに以下の行を追加します。

    admin.listeners=https://localhost:9093
    admin.listeners.https.ssl.truststore.location=/path/to/truststore.jks
    admin.listeners.https.ssl.truststore.password=<trustore-password>
    admin.listeners.https.ssl.keystore.location=/path/to/keystore.jks
    admin.listeners.https.ssl.keystore.password=<keystore-password>
    
  • admin/loggers エンドポイントを完全に無効にするには、ワーカーの構成ファイルに空の文字列を入力します。以下に例を示します。

    admin.listeners=
    

環境変数の使用(Docker)

Docker のログレベルを構成するには、環境変数を使用します。コネクターの DEBUG レベルのログ記録を有効にするには、Confluent Platform Connect コンテナーの起動時に以下の環境変数を使用します。

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG"

すべてのコネクターを含め、Connect ワーカーで DEBUG レベルのログメッセージを有効にするには、Confluent Platform Connect コンテナーの起動時に、以下の環境変数を使用します。

CONNECT_LOG4J_LOGGERS="org.apache.kafka.connect=DEBUG"

環境変数では、キー/値ペアのコンマ区切りのリストを指定できます。たとえば、以下の変数を使用すると、コネクターおよび Connect フレームワークで DEBUG が有効になります。

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG,org.apache.kafka.connect=DEBUG"

Docker のログ記録の詳細については、「Docker のログ記録の構成」を参照してください。

スタックトレース

Connect ステータス API エンドポイントを使用して、タスクのエラートレースを確認できます。障害が発生したコネクターのエラーを取得するには、以下のコマンドを入力します。

curl localhost:8083/connectors/<connector-name>/status | jq

以下に例を示します。

curl localhost:8083/connectors/http/status | jq

出力例:

{
  "name": "http",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "127.0.0.1:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\nCaused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)\n\t... 13 more\n"
    }
  ],
  "type": "sink"
}

次のコマンドを実行すると、出力を読みやすくすることができます。

echo -e $(curl localhost:8083/connectors/<connector-name>/status | jq .tasks[].trace)

以下に例を示します。

echo -e $(curl localhost:8083/connectors/http/status | jq .tasks[].trace)

出力例:

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)
     at io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)
     at io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
     ... 10 more
Caused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)
     ... 13 more

おすすめの関連情報

以下の記事には、Kafka Connect のログ記録の詳細情報が記載されています。