Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect S3 Source Connector¶
The Kafka Connect S3 Source Connector provides the capability to read data exported to S3 by the Kafka Connect S3 Sink connector and publish it back to an Kafka topic. Depending on the format and partitioner used to write the data to S3, this connector can write to the destination topic using the same partitions as the original messages exported to S3 and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders S3 objects in alphabetical order. Each record is read based on the format selected. Configuration is setup to mirror the Kafka Connect S3 Sink connector and should be possible to make only minor changes to the original sink configuration.
Important
The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on auto-topic creation and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops the moment it tries to write to a Kafka partition that does not exist.
Be aware of the following connector actions:
- The connector ignores any S3 object with a name that does not start with the configured topics directory. This name is “/topics/” by default.
- The connector ignores any S3 object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when
format.class
is set for Avro files. - The connector stops and fails if the S3 object’s name does not match the expected format or is in an unexpected location.
Avoid the following configuration issues:
- A file with the correct extension and a valid name format e.g.
<topic>+<partition>+<offset>.<extension>
, placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename. - If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of S3 sink that used a deterministic sink partitioner.
Features¶
The S3 Source Connector offers a variety of features:
- Pluggable Data Format with or without Schema: Out of the box, the connector supports reading data from S3 in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.
- Non-AWS Object Storage Support: Amazon S3 is an industry-standard object storage service. You can use the Kafka Connect S3 connector to connect object storage storage on non-AWS cloud platforms by using a different store URL to point at this alternative cloud platform.
- At Least Once Delivery: In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.
- Matching Source Partitioning: Messages will be put back on to the same Kafka partition for that topic when it was written.
- Source Partition Ordering: The connector will read records back in time order in each topic-source partition if the
DefaultPartitioner
or aTimeBasedPartitioner
is used. If a FieldPartitioner is used it isn’t possible to guarantee the order of these messages. - Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.
Important
All partitioners will notice new topic folders with the inbuilt task
reconfiguration thread. The DefaultPartitioner
will detect new partition
folders. The FieldPartitioner will notice new folders for the fields
specified. However, the TimeBasedPartititoner will not currently detect new
files for a new time period.
Important
Be careful when both the Connect S3 Sink connector and the S3 Source Connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and S3 objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the RegexRouter with the source connector to change the names of the topics where the records are written. Or, use the Extract Topic SMT with the source connector to change the topic name based upon a field in each message.
Install S3 Source Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-s3-source:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-s3-source:1.0.0-preview
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Quick Start¶
The following uses the S3SinkConnector
to write a file from the Kafka topic named s3_topic
to S3.
Then, the S3SourceConnector
loads that Avro file from S3 to the Kafka topic named copy_of_s3_topic
.
Follow the instructions from the S3 Sink Connector quick start to set up the data to use below.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-s3-source:latest
Tip
By default, the plugin is installed into
share/confluent-hub-components
and the directory is added to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.Create a
quickstart-s3source.properties
file with the following contents or use thequickstart-s3source.properties
.:name=s3-source tasks.max=1 connector.class=io.confluent.connect.s3.source.S3SourceConnector s3.bucket.name=confluent-kafka-connect-s3-testing format.class=io.confluent.connect.s3.format.avro.AvroFormat confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Tip
The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
replication.factor
may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.Edit the
quickstart-s3source.properties
to add the following properties:transforms=AddPrefix transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter transforms.AddPrefix.regex=.* transforms.AddPrefix.replacement=copy_of_$0
Important
Adding this renames the output of topic of the messages to
copy_of_s3_topic
. This prevents a continuous feedback loop of messages.Load the S3 Source Connector.
confluent load s3-source -d quickstart-s3source.properties
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent status s3-source
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic copy_of_s3_topic \ --from-beginning | jq '.'
The response should be 18 records as follows.
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}