.. _schemaregistry_kafka_connect: Using |kconnect-long| with |sr| =============================== |kconnect-long| and |sr| integrate to capture schema information from connectors. :ref:`Kafka Connect converters ` provide a mechanism for converting data from the internal data types used by |kconnect-long| to data types represented as Avro. The AvroConverter will automatically register schemas generated by source connectors. Sink Connectors will receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. The AvroConverter will convert existing Avro data to the internal data types used by |kconnect-long|. Example Configuration --------------------- .. include:: ../includes/cp-demo-tip.rst To use |kconnect-long| with |sr|, you must specify the ``key.converter`` or ``value.converter`` properties in the :ref:`connector ` or in the :ref:`Connect worker configuration `. The ``key.converter`` and ``value.converter`` properties can be configured independently of each other. The converters need an additional configuration for the |sr| URL, which is specified by providing the required ``.converter`` prefix. For example: :: key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 |cp| first looks for converter configurations in the connector. If none are found there, the settings in the |kconnect| worker configs are used. Therefore, you have options as to where to specify these properties, which affects how they are inherited among the worker and connectors. - Specify all (converters and |sr| URL prefixed properties) individually in each connector configuration. - Specify all properties only in the worker configuration, in which case the same settings will be used by all connectors. - Specify all properties in the worker configuration, and specify all overrides in the connectors. .. tip:: - If converter values and associated |sr| URLs are defined in both the worker and the connector, settings in the connector overwrite those in the worker. - If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the |sr| URL, otherwise the connector or worker will fail. ---------------- Example Scenario ---------------- Given the following worker configuration: :: group.id=xyz key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://url:port value.converter=org.apache.kafka.connect.storage.StringConverter In that worker, start three connectors, with the following configurations. - connector-1: :: name=connector-1 - connector-2: :: name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://url_2:port - connector-3: :: name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter The results of the deployment will be: - **connector-1** will use the worker configurations and use the Avro converter (``io.confluent.connect.avro.AvroConverter``) with |sr| at ``url:port``. - **connector-2** will use the Avro converter (``io.confluent.connect.avro.AvroConverter``) with |sr| at ``url_2:port``. - **connector-3** will fail because it tries to pick up the configuration from the connector and does not find a config for |sr| URL, which is mandatory for the Avro converter. ``value.converter`` will be ``org.apache.kafka.connect.storage.StringConverter`` for all three converters. Configuration Options --------------------- ``schema.registry.url`` Comma-separated list of URLs for |sr| instances that can be used to register or look up schemas. * Type: list * Default: "" * Importance: high ``auto.register.schemas`` Specify if the Serializer should attempt to register the Schema with |sr| * Type: boolean * Default: true * Importance: medium ``max.schemas.per.subject`` Maximum number of schemas to create or cache locally. * Type: int * Default: 1000 * Importance: low ``key.subject.name.strategy`` Determines how to construct the subject name under which the key schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-key`` is used as subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``value.subject.name.strategy`` Determines how to construct the subject name under which the value schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-value`` is used as subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``basic.auth.credentials.source`` Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT * Type: string * Default: "URL" * Importance: medium ``basic.auth.user.info`` Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration. * Type: password * Default: "" * Importance: medium The following |sr| dedicated properties, configurable on the client, are available on |cp| version 5.4.0 (and later). To learn more, see the information on configuring clients in :ref:`sr-https-additional`. ``schema.registry.ssl.truststore.location`` The location of the trust store file. For example, ``schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks`` * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.truststore.password`` The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.keystore.location`` The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, ``schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks``. * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.keystore.password`` The store password for the key store file. This is optional for the client and only needed if ``ssl.keystore.location`` is configured. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.key.password`` The password of the private key in the key store file. This is optional for the client. * Type: password * Default: "" * Importance: medium