.. _azure_blob_storage_source_connector: |az| Blob Storage Source Connector for |cp| =========================================== The |kconnect-long| |az| Blob Storage Source Connector provides the capability to read data exported to |az| Blob Storage by the |ak| :ref:`Connect Azure Blob Storage Sink connector ` and publish it back to a |ak| topic. Depending on the format and partitioner used to write the data to |az| Blob Storage, this connector can write to the destination topic using the same partitions as the original messages exported to |az| Blob Storage and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders |az| Blob objects in alphabetical order. Each record is read based on the format selected. Configuration is designed to mirror the |ak| :ref:`Connect Azure Blob Storage Sink connector ` and should be possible to create source connector configs with only minor changes to the original sink configuration. .. important:: The recommended practice is to create topics manually in the destination |ak| cluster with the correct number of partitions before running the source connector. If the topics don't exist, |kconnect| relies on auto-topic creation and the number of partitions are based upon the |ak| broker defaults. If there are more partitions in the destination cluster, the extra partitions aren't used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops when it tries to write to a |ak| partition that doesn't exist. Be aware of the following connector actions: * The connector ignores any |az| Blob object with a name that doesn't start with the configured ``topics`` directory. This name is "/topics/" by default. * The connector ignores any |az| Blob object that is below the ``topics`` directory but has an extension that doesn't match the configured format. For example, a JSON file is ignored when ``format.class`` is set for Avro files. * The connector stops and fails if the |az| Blob object's name doesn't match the expected format or is in an unexpected location. Avoid the following configuration issues: * A file with the correct extension and a valid name format like ``++.``, placed in a folder of a different topic is read normally and written to the topic defined by its filename. * If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of |az| Blob Storage sink that used a deterministic sink partitioner. .. _azure_blob_storage_source_connector_features: Features -------- The |az| Blob Storage Source Connector offers a variety of features: * **Pluggable Data Format with or without Schema**: Out of the box, the connector supports reading data from |az| Blob Storage in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface. * **At Least Once Delivery**: In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again. * **Matching Source Partitioning**: Messages are put back on the same |ak| partition for that topic when it was written. * **Source Partition Ordering**: If the ``DefaultPartitioner`` or a ``TimeBasedPartitioner`` is used, the connector reads records back in time order in each topic-source partition. If a ``FieldPartitioner`` is used, it isn't possible to guarantee the order of these messages. * **Pluggable Partitioner**: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the ``Partitioner`` class. Additionally, you can customize time based partitioning by extending the ``TimeBasedPartitioner`` class. .. important:: All partitioners will notice new topic folders with the inbuilt task reconfiguration thread. The ``DefaultPartitioner`` detects new partition folders. The ``FieldPartitioner`` notices new folders for the fields specified. However, the ``TimeBasedPartititoner`` doesn't detect new files for a new time period. .. important:: Be careful when both the :ref:`Connect Azure Blob Storage Sink connector ` and the |az| Blob Storage Source Connector use the same |ak| cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate |ak| records and |az| Blob Storage objects. You can avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the :ref:`RegexRouter ` with the source connector to change the names of the topics where the records are written. Or, use the :ref:`Extract Topic SMT` with the source connector to change the topic name based upon a field in each message. Prerequisites ------------- The following are required to run the |kconnect-long| |az| Blob Storage Source Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java 1.8 .. _azure_blob_storage_source_connector_install: Install the |az| Blob Storage Source Connector ---------------------------------------------- .. include:: ../../includes/connector-install.rst .. include:: ../../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:latest .. include:: ../../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:1.0.0-preview ------------------------------ Install the connector manually ------------------------------ `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. .. _azure_blob_storage_source_connector_license: License ------- .. include:: ../../includes/enterprise-license.rst See :ref:`azure_blob_storage_source_connector_license_config` for license properties and :ref:`azure_blob_storage_source_license_topic_configuration` for information about the license topic. Configuration Properties ------------------------ For a complete list of configuration properties for this connector, see :ref:`azure-blob-storage-source-configuration-options`. .. _azure_blob_storage_source_connector_quickstart: Quick Start ----------- The following quickstart uses the ``AzureBlobStorageSinkConnector`` to write an Avro file from the |ak| topic named ``blob_topic`` to |az| Blob Storage. Also, the ``AzureBlobStorageSinkConnector`` should be completely stopped before starting the ``AzureBlobStorageSourceConnector`` to avoid creating source/sink cycle. Then, the ``AzureBlobStorageSourceConnector`` loads that Avro file from |az| Blob Storage to the |ak| topic named ``copy_of_blob_topic``. #. Follow the instructions from :ref:`Connect Azure Blob Storage Sink connector ` to set up the data to use below. #. Install the connector through the `Confluent Hub Client `_. .. codewithvars:: bash # run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-azure-blob-storage-source:latest .. tip:: By default, the plugin is installed into ``share/confluent-hub-components`` and the directory is added to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. ---------------------- Property-based example ---------------------- #. Create a ``quickstart-azureblobstoragesource.properties`` file with the following contents. This file should be placed under |cp| installation directory. This configuration is used typically along with :ref:`standalone workers `. .. codewithvars:: properties name=azure-blob-storage-source tasks.max=1 connector.class=io.confluent.connect.azure.blob.storage.AzureBlobStorageSourceConnector # enter your Azure blob account, key and container name here azblob.account.name= azblob.account.key= azblob.container.name= format.class=io.confluent.connect.azure.blob.storage.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 .. tip:: The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. ``replication.factor`` may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. #. Edit the ``quickstart-azureblobstoragesource.properties`` to add the following properties: .. codewithvars:: properties transforms=AddPrefix transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter transforms.AddPrefix.regex=.* transforms.AddPrefix.replacement=copy_of_$0 .. important:: Adding this renames the output of topic of the messages to ``copy_of_blob_topic``. This prevents a continuous feedback loop of messages. #. Load the |az| Blob Storage Source Connector. .. include:: ../../../includes/confluent-local-connector-limit.rst .. codewithvars:: bash |confluent_load| azblobstorage-source|dash| -d quickstart-azureblobstoragesource.properties .. important:: Don't use the :ref:`cli` in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| azureblobstorage-source #. Confirm that the messages are being sent to |ak|. .. codewithvars:: bash kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic copy_of_blob_topic \ --from-beginning | jq '.' #. The response should be 9 records as follows. .. sourcecode:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} .. _azure_blob_storage_source_connector-rest-example: ------------------ REST-based example ------------------ #. Use this setting with :ref:`distributed workers `. Write the following JSON to `config.json`, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the |kconnect-long| :ref:`REST API ` .. code-block:: json { "name" : "AzureBlobStorageSourceConnector", "config" : { "connector.class" : "io.confluent.connect.azure.blob.storage.AzureBlobStorageSourceConnector", "tasks.max" : "1", "azblob.account.name" : "your-account", "azblob.account.key" : "your-key", "azblob.container.name" : "confluent-kafka-connect-azBlobStorage-testing", "format.class" : "io.confluent.connect.azure.blob.storage.format.avro.AvroFormat", "confluent.topic.bootstrap.servers" : "localhost:9092", "confluent.topic.replication.factor" : "1", "transforms" : "AddPrefix", "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.AddPrefix.regex" : ".*", "transforms.AddPrefix.replacement" : "copy_of_$0" } } .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. #. Use curl to post a configuration to one of the |kconnect-long| Workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect-long| worker(s). .. code-block:: bash curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors #. Use the following command to update the configuration of existing connector. .. code-block:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/AzureBlobStorageSourceConnector/config #. To consume records written by connector to the configured |ak| topic, run the following command: .. codewithvars:: bash kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic copy_of_blob_topic --from-beginning .. _azure_blob_storage_source_connector_partitioners: |az| Blob Storage Source Connector Partitions --------------------------------------------- The |az| Blob Storage Source connector's *partitioner* determines how records read from |az| Blob Storage objects are pushed into a |ak| topic. The partitioner is specified in the connector configuration with the ``partitioner.class`` configuration property. The |az| Blob Storage Source connector comes with the following partitioner: * **Default Partitioner**: The ``io.confluent.connect.storage.partitioner.DefaultPartitioner`` reads records from each |az| Blob Storage objects with names that include the |ak| topic and push it to the same topic partitions as in |ak|. The ```` is always ``/partition=``, resulting in |az| Blob Storage object names such as ``//partition=/++.``. * **Field Partitioner**: The ``io.confluent.connect.storage.partitioner.FieldPartitioner`` determines the partition from the field within each each record identified by the connector's ``partition.field.name`` configuration property, which has no default. This partitioner requires ``STRUCT`` record type values. The ```` is always ``/=``, resulting in |az| Blob Storage object names of the form ``//=/++.``. * **Time Based Partitioner**: The ``io.confluent.connect.storage.partitioner.TimeBasedPartitioner`` determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties: * The ``path.format`` configuration property specifies the pattern used for the ```` path of the |az| Blob Storage object. For example, when ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH``, |az| Blob Storage object names will have the form ``//year=YYYY/month=MM/day=dd/hour=HH/++.``. * The ``partition.duration.ms`` configuration property defines the maximum granularity of the |az| Blob Storage objects within a single encoded partition directory. For example, setting ``partition.duration.ms=600000`` (10 minutes) will result in each |az| Blob Storage object in that directory having no more than 10 minutes of records. * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Daily Partitioner**: The ``io.confluent.connect.storage.partitioner.DailyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd`` and ``partition.duration.ms=86400000`` (one day, for one |az| Blob Storage object in each daily directory). This partitioner always reads from |az| Blob Storage object names of the form ``//year=YYYY/month=MM/day=dd/++.``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Hourly Partitioner**: The ``io.confluent.connect.storage.partitioner.HourlyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH`` and ``partition.duration.ms=3600000`` (one hour, for one |az| Blob Storage object in each hourly directory). This partitioner always reads from |az| Blob Storage object names of the form ``//year=YYYY/month=MM/day=dd/hour=HH/++.``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. You can also choose to use a custom partitioner by implementing the ``io.confluent.connect.storage.partitioner.Partitioner`` interface, packaging your implementation into a JAR file, and then: #. Place the JAR file into the ``share/java/kafka-connect-azure-blob-storage`` directory of your |cp| installation **on each worker node**. #. Restart all of the |kconnect| worker nodes. #. Configure |az| Blob Storage Source connectors to use your fully-qualified partitioner class name. .. _azure_blob_storage_source_connector_data_formats: |az| Blob Storage Source Connector Data Formats ----------------------------------------------- |az| Blob Storage source connector supports several data formats: * **Avro Format**: For supporting Avro Format. You must configure the ``format.class`` = ``io.confluent.connect.azure.blob.storage.format.avro.AvroFormat``. * **Parquet Format**: For supporting Parquet Format. You must configure the ``format.class`` = ``io.confluent.connect.azure.blob.storage.format.parquet.ParquetFormat``. * **JSON Format**: For supporting JSON Format. You must configure the ``format.class`` = ``io.confluent.connect.azure.blob.storage.format.json.JsonFormat``. * **Raw Bytes Format**: For supporting Raw Bytes Format. You must configure the ``format.class`` = ``io.confluent.connect.azure.blob.storage.format.bytearray.ByteArrayFormat``. Troubleshooting Connector and Task Failures ------------------------------------------- ----------- Stack Trace ----------- You can use the |kconnect| :ref:`REST API ` to check the status of the connectors and tasks. If a task or connector has failed, the ``trace`` field will include a reason and a stack trace. --------------------------------------- Fewer Partitions in Destination Cluster --------------------------------------- If there are fewer partitions in the destination cluster than in the source topic, the connector task throws an exception and immediately stops when it tries to write to a |ak| partition that does not exist. You will see the following error messages in the |kconnect| worker log. The recommended practice is to create topics manually in the destination |ak| cluster with the correct number of partitions before running the source connector. :: INFO WorkerSourceTask{id=azblob-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:409) INFO WorkerSourceTask{id=azblob-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:426) ERROR WorkerSourceTask{id=azblob-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431) ERROR WorkerSourceTask{id=azblob-source-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114) .. include:: ../../kafka-connect-s3-source/includes/error-handling.rst .. note:: The connector always ignores a file which is not in ``++.`` format. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog