.. _schemaregistry_kafka_connect: Using |kconnect-long| with |sr| =============================== |kconnect-long| and |sr| integrate to capture schema information from connectors. :ref:`Kafka Connect converters ` provide a mechanism for converting data from the internal data types used by |kconnect-long| to data types represented as Avro, Protobuf, or JSON Schema. The ``AvroConverter``, ``ProtobufConverter``, and ``JsonSchemaConverter`` automatically register schemas generated by source connectors. Sink Connectors receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. Each of the converters change schema data into the internal data types used by |kconnect-long|. For additional information about converters and how they work, see :ref:`connect_configuring_converters`. Example Converter Properties ---------------------------- To use |kconnect-long| with |sr|, you must specify the ``key.converter`` or ``value.converter`` properties in the :ref:`connector ` or in the |kconnect| :ref:`worker configuration `. The converters need an additional configuration for the |sr| URL, which is specified by providing the URL converter prefix as shown in the following property examples. .. _schemaregistry_kafka_connect-avro-converters: ---- Avro ---- Example Avro converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 The following additional configuration properties can be used with the Avro converter (``io.confluent.connect.avro.AvroConverter``). These Avro-specific properties are added to the worker or connector configuration where the Avro converter properties are located. Note that when added to the worker or connector configuration, these properties require the ``key.converter.`` and ``value.converter.`` prefix. For example: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 key.converter.enhanced.avro.schema.support=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.enhanced.avro.schema.support=true Note that when using Avro in a secure enviroment, you need to add ``value.converter.schema.registry.ssl.`` properties. An example of these additional properties is shown below: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 key.converter.enhanced.avro.schema.support=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.ssl.truststore= value.converter.schema.registry.ssl.truststore= value.converter.schema.registry.ssl.keystore= value.converter.schema.registry.ssl.keystore= value.converter.schema.registry.ssl.key.password= value.converter.enhanced.avro.schema.support=true The following lists definitions for the Avro-specific properties shown in the examples above. For a complete list of |kconnect| |sr| configuration options, see :ref:`sr-connect-config-options`. ``schema.cache.config`` The size of the schema cache used in the Avro converter. * Type: int * Default: 1000 * Importance: low ``enhanced.avro.schema.support`` Enable enhanced Avro schema support in the Avro Converter. When set to ``true``, this property preserves Avro schema package information and Enums when going from Avro schema to |kconnect| schema. This information is added back in when going from |kconnect| schema to Avro schema. * Type: boolean * Default: false * Importance: low ``connect.meta.data`` Allow the |kconnect| converter to add its metadata to the output schema. * Type: boolean * Default: true * Importance: low The ``connect.meta.data`` property preserves the following |kconnect| schema metadata when going from |kconnect| schema to Avro schema. The following metadata is added back in when going from Avro schema to |kconnect| schema. * doc * version * parameters * default value * name * type -------- Protobuf -------- Protobuf example converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=http://localhost:8081 ----------- JSON Schema ----------- JSON Schema example converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=http://localhost:8081 Using Independent Key and Value Converters ------------------------------------------ The key and value converters can be used independently from each other. For example, you may want to use a ``StringConverter`` for keys and a converter used with |sr| for values. An example of independent key and value properties is shown below: .. sourcecode:: properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Converter Property Location and Inheritance -------------------------------------------- |cp| first looks for converter configuration properties in the :ref:`connector ` configuration. If none are found there, properties in the |kconnect| :ref:`worker configuration ` are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors. - Specify all converter properties (with |sr| URL prefixes) in each connector configuration. - Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties. - Specify all converter properties in the worker configuration and add converter overrides in the connector configuration. .. important:: - If converter values and associated |sr| URL are defined in both the worker and the connector, settings in the connector **overwrite** those in the worker. - If you specify a converter in a connector or worker (as an override or as the only setting), you must **always include** both the converter and the |sr| URL, otherwise the connector or worker will fail. - If you specify a converter in a connecter that is not defined in the worker, must **supply all converter properties** (key converter, value converter, and |sr| host:port) in the connector configuration. ---------------- Example Scenario ---------------- The following are the worker configuration properties used in this example scenario: :: group.id=connect-cluster key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-1:port value.converter=org.apache.kafka.connect.storage.StringConverter Using the worker properties above, start three connectors with the following configuration properties: - connector-1 configuration: :: name=connector-1 - connector-2 configuration: :: name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port - connector-3 configuration: :: name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter The results of the deployment are: - **connector-1** uses the worker configuration properties, with the Avro converter (``io.confluent.connect.avro.AvroConverter``) and the |sr| ``host:port``. - **connector-2** uses the Avro converter (``io.confluent.connect.avro.AvroConverter``) and the |sr| ``host-2:port``. - **connector-3** fails because it attempts to use the connector configuration, but does not find the |sr| URL configuration property. The |sr| URL configuration property is required for Avro, Protobuf, and JSON Schema. - **All connectors** use the ``value.converter`` worker property ``org.apache.kafka.connect.storage.StringConverter``. .. tip:: For a deep dive into converters, see: `Converters and Serialization Explained `__. .. _sr-connect-config-options: Configuration Options --------------------- ``schema.registry.url`` Comma-separated list of URLs for |sr| instances that can be used to register or look up schemas. * Type: list * Default: "" * Importance: high ``auto.register.schemas`` Specify if the Serializer should attempt to register the Schema with |sr|. * Type: boolean * Default: true * Importance: medium ``use.latest.version`` Only applies when ``auto.register.schemas`` is set to ``false``. If ``auto.register.schemas`` is set to ``false`` and ``use.latest.version`` is set to ``true``, then instead of deriving a schema for the object passed to the client for serialization, |sr| will use the latest version of the schema in the subject for serialization. * Type: boolean * Default: true * Importance: medium .. seealso:: How to use schema references to combine :ref:`multiple event types in the same topic ` with :ref:`Avro `, :ref:`JSON Schema `, or :ref:`Protobuf `. ``latest.compatibility.strict`` Only applies when ``use.latest.version`` is set to ``true``. If ``latest.compatibility.strict`` is ``true`` (the default), then when using ``use.latest.version=true`` during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized. If the check fails, then an error results. If the check succeeds, then serialization is performed. If ``latest.compatibility.strict`` is ``false``, then the latest subject version is used for serialization, without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by setting ``latest.compatibility.strict`` to ``false``) may be useful, for example, when implementing :ref:`Kafka Connect converters ` and :ref:`schema references `. * Type: boolean * Default: true * Importance: medium .. seealso:: See also, :ref:`schema_evolution_and_compatibility`. ``max.schemas.per.subject`` Maximum number of schemas to create or cache locally. * Type: int * Default: 1000 * Importance: low ``key.subject.name.strategy`` Determines how to construct the subject name under which the key schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-key`` is used as subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``value.subject.name.strategy`` Determines how to construct the subject name under which the value schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-value`` is used as subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``basic.auth.credentials.source`` Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT * Type: string * Default: "URL" * Importance: medium ``basic.auth.user.info`` Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration. * Type: password * Default: "" * Importance: medium The following |sr| dedicated properties, configurable on the client, are available on |cp| version 5.4.0 (and later). To learn more, see the information on configuring clients in :ref:`sr-https-additional`. ``schema.registry.ssl.truststore.location`` The location of the trust store file. For example, ``schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks`` * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.truststore.password`` The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.keystore.location`` The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, ``schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks``. * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.keystore.password`` The store password for the key store file. This is optional for the client and only needed if ``ssl.keystore.location`` is configured. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.key.password`` The password of the private key in the key store file. This is optional for the client. * Type: password * Default: "" * Importance: medium