.. _schemaregistry_kafka_connect: Integrate Schemas from |kconnect-long| in |cp| ============================================== |kconnect-long| and |sr| integrate to capture schema information from connectors. :ref:`Kafka Connect converters ` provide a mechanism for converting data from the internal data types used by |kconnect-long| to data types represented as Avro, Protobuf, or JSON Schema. The ``AvroConverter``, ``ProtobufConverter``, and ``JsonSchemaConverter`` automatically register schemas generated by source connectors. Sink Connectors receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. Each of the converters change schema data into the internal data types used by |kconnect-long|. For additional information about converters and how they work, see :connect-common:`Configuring Key and Value Converters|userguide.html#configuring-key-and-value-converters`. Example Converter Properties ---------------------------- To use |kconnect-long| with |sr|, you must specify the ``key.converter`` or ``value.converter`` properties in the :connect-common:`connector|configuring.html` or in the |kconnect| :connect-common:`worker configuration|userguide.html#worker-configuration-properties-file`. The converters need an additional configuration for the |sr| URL, which is specified by providing the URL converter prefix as shown in the following property examples. .. _schemaregistry_kafka_connect-avro-converters: ---- Avro ---- Example Avro converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 The following additional configuration properties can be used with the Avro converter (``io.confluent.connect.avro.AvroConverter``). These Avro-specific properties are added to the worker or connector configuration where the Avro converter properties are located. Note that when added to the worker or connector configuration, these properties require the ``key.converter.`` and ``value.converter.`` prefix. For example: .. sourcecode:: properties key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 key.converter.enhanced.avro.schema.support=true value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.enhanced.avro.schema.support=true When using Avro with basic authentication, you add the following properties: .. sourcecode:: properties key.converter.basic.auth.credentials.source=USER_INFO key.converter.basic.auth.user.info={username}:{password} value.converter.basic.auth.credentials.source=USER_INFO value.converter.basic.auth.user.info={username}:{password} When using Avro in a secure environment, you add ``value.converter.schema.registry.ssl.`` properties. An example of these additional properties is shown below: .. sourcecode:: properties key.converter.schema.registry.ssl.truststore.location= key.converter.schema.registry.ssl.truststore.password= key.converter.schema.registry.ssl.keystore.location= key.converter.schema.registry.ssl.keystore.password= key.converter.schema.registry.ssl.key.password= value.converter.schema.registry.ssl.truststore.location= value.converter.schema.registry.ssl.truststore.password= value.converter.schema.registry.ssl.keystore.location= value.converter.schema.registry.ssl.keystore.password= value.converter.schema.registry.ssl.key.password= The following lists definitions for the Avro-specific configuration properties. For additional |kconnect| |sr| configuration options, see :ref:`sr-connect-config-options`. ``scrub.invalid.names`` Whether to scrub invalid names by replacing invalid characters with valid characters. * Type: boolean * Default: false * Importance: medium ``schemas.cache.config`` The size of the schema cache used in the Avro converter. * Type: int * Default: 1000 * Importance: low ``enhanced.avro.schema.support`` Enable enhanced Avro schema support in the Avro Converter. When set to ``true``, this property preserves Avro schema package information and Enums when going from Avro schema to |kconnect| schema. This information is added back in when going from |kconnect| schema to Avro schema. * Type: boolean * Default: false * Importance: low ``connect.meta.data`` Allow the |kconnect| converter to add its metadata to the output schema. * Type: boolean * Default: true * Importance: low The ``connect.meta.data`` property preserves the following |kconnect| schema metadata when going from |kconnect| schema to Avro schema. The following metadata is added back in when going from Avro schema to |kconnect| schema. * doc * version * parameters * default value * name * type .. _schemaregistry_kafka_connect-protobuf-converters: -------- Protobuf -------- Protobuf example converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.protobuf.ProtobufConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.protobuf.ProtobufConverter value.converter.schema.registry.url=http://localhost:8081 The following lists definitions for the Protobuf-specific configuration properties. For additional |kconnect| |sr| configuration options, see :ref:`sr-connect-config-options`. ``enhanced.protobuf.schema.support`` Enable enhanced Protobuf schema support in the Protobuf Converter. When set to ``true``, this property preserves Protobuf schema package information when going from Protobuf schema to |kconnect| schema. This information is added back when going from |kconnect| schema to Protobuf schema. * Type: boolean * Default: false * Importance: medium ``generate.index.for.unions`` Whether to generate an index suffix for unions. By default, ``oneOf`` messages have their field names suffixed with an index (for example ``_0``), which results in a column name of ``value_0.thing``. To configure ``oneOf`` message field names without this suffix, set ``generate.index.for.unions`` to ``false``. To learn more about ``oneOfs`` in Protobuf, see :ref:`multiple-event-types-same-topic-protobuf`. * Type: boolean * Default: true * Importance: medium ``scrub.invalid.names`` Whether to scrub invalid names by replacing invalid characters with valid characters. * Type: boolean * Default: false * Importance: medium ``int.for.enums`` Whether to represent enums as integers. The default is ``false``. To represent enums as integers, set ``int.for.enums`` to ``true``. * Type: boolean * Default: false * Importance: medium ``optional.for.nullables`` Whether nullable fields should be specified with an optional label. * Type: boolean * Default: false * Importance: medium ``generate.struct.for.nulls`` Whether to generate a struct variable for null values. * Type: boolean * Default: false * Importance: medium ``wrapper.for.nullables`` Whether nullable fields should use primitive wrapper messages. * Type: boolean * Default: false * Importance: medium ``wrapper.for.raw.primitives`` Whether a wrapper message should be interpreted as a raw primitive at the root level. * Type: boolean * Default: true * Importance: medium ``schemas.cache.config`` The size of the schema cache used in the Protobuf converter. * Type: int * Default: 1000 * Importance: low .. _schemaregistry_kafka_connect-json-schema-converters: ----------- JSON Schema ----------- JSON Schema example converter properties are shown below: .. sourcecode:: properties key.converter=io.confluent.connect.json.JsonSchemaConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.json.JsonSchemaConverter value.converter.schema.registry.url=http://localhost:8081 The following lists definitions for the JSON Schema-specific configuration properties. For additional |kconnect| |sr| configuration options, see :ref:`sr-connect-config-options`. ``object.additional.properties`` Whether to allow additional properties for object schemas. * Type: boolean * Default: true * Importance: medium ``use.optional.for.nonrequired`` Whether to set non-required properties to be optional. * Type: boolean * Default: false * Importance: medium ``decimal.format`` Controls which format this converter will serialize decimals in. This value is case insensitive and can be either ``BASE64`` (default) or ``NUMERIC``. .. note:: JSON Schema supports an empty schema that has no type defined. For example: .. code-block:: json "f2": {} This is not allowed with ``JsonSchemaConverter`` in |kconnect|. |kconnect| requires a strongly-typed structure for its internal record schema. For example: .. code-block:: json "holiday": { "oneOf": [ { "title": "Not included", "type": "null" }, {} ] } Using Independent Key and Value Converters ------------------------------------------ The key and value converters can be used independently from each other. For example, you may want to use a ``StringConverter`` for keys and a converter used with |sr| for values. An example of independent key and value properties is shown below: .. sourcecode:: properties key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Converter Property Location and Inheritance -------------------------------------------- |cp| first looks for converter configuration properties in the :connect-common:`connector|configuring.html` configuration. If none are found there, properties in the |kconnect| :connect-common:`worker configuration|userguide.html#worker-configuration-properties-file` are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors. - Specify all converter properties (with |sr| URL prefixes) in each connector configuration. - Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties. - Specify all converter properties in the worker configuration and add all converter properties in the connector configuration if you need to override the worker ones .. important:: - If converter values and associated |sr| URL are defined in both the worker and the connector, settings in the connector **overwrite** those in the worker. - If you specify a converter in a connector or worker (as an override or as the only setting), you must **always include** both the converter and the |sr| URL, otherwise the connector or worker will fail. - If you specify a converter in a connector that is not defined in the worker, you must **supply all converter properties** (key converter, value converter, and |sr| host:port) in the connector configuration. ---------------- Example Scenario ---------------- The following are the worker configuration properties used in this example scenario: .. code-block:: text group.id=connect-cluster key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-1:port value.converter=org.apache.kafka.connect.storage.StringConverter Using the previous worker properties, start three connectors with the following configuration properties: - connector-1 configuration: .. code-block:: text name=connector-1 - connector-2 configuration: .. code-block:: text name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port - connector-3 configuration: .. code-block:: text name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter The results of the deployment are: - **connector-1** uses the worker configuration properties, with the Avro converter (``io.confluent.connect.avro.AvroConverter``) and the |sr| ``host-1:port``. - **connector-2** uses the Avro converter (``io.confluent.connect.avro.AvroConverter``) and the |sr| ``host-2:port``. - **connector-3** fails because it attempts to use the connector configuration, but does not find the |sr| URL configuration property. The |sr| URL configuration property is required for Avro, Protobuf, and JSON Schema. - **All connectors** use the ``value.converter`` worker property ``org.apache.kafka.connect.storage.StringConverter``. .. tip:: For a deep dive into converters, see: `Converters and Serialization Explained `__. NULL values replaced with default values ---------------------------------------- The configuration property ``ignore.default.for.nullables`` allows you to use a NULL value for a nullable (optional) column that has a default value configured for it. When set to ``true``, this property ensures that the corresponding record in |ak| is NULL, instead of showing the default column value. The property defaults to ``false``. A configuration snippet using this configuration property is shown below: .. sourcecode:: properties value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 value.converter.enhanced.avro.schema.support=true value.converter.ignore.default.for.nullables=true .. _sr-connect-config-options: Configuration Options --------------------- ``schema.registry.url`` Comma-separated list of URLs for |sr| instances that can be used to register or look up schemas. * Type: list * Default: "" * Importance: high ``auto.register.schemas`` Specify if the Serializer should attempt to register the Schema with |sr|. * Type: boolean * Default: true * Importance: medium ``use.latest.version`` Only applies when ``auto.register.schemas`` is set to ``false``. If ``auto.register.schemas`` is set to ``false`` and ``use.latest.version`` is set to ``true``, then instead of deriving a schema for the object passed to the client for serialization, |sr| will use the latest version of the schema in the subject for serialization. The property ``use.latest.version`` can be set on producers or consumers to serialize or deserialize messages per the latest version. * Type: boolean * Default: false * Importance: medium .. note:: To learn more, see how to use schema references to combine :ref:`multiple event types in the same topic ` with :ref:`Avro `, :ref:`JSON Schema `, or :ref:`Protobuf `. ``latest.compatibility.strict`` Only applies when ``use.latest.version`` is set to ``true``. If ``latest.compatibility.strict`` is ``true`` (the default), then when using ``use.latest.version=true`` during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized. If the check fails, then an error results. If the check succeeds, then serialization is performed. If ``latest.compatibility.strict`` is ``false``, then the latest subject version is used for serialization, without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by setting ``latest.compatibility.strict`` to ``false``) may be useful, for example, when implementing :ref:`Kafka Connect converters ` and :ref:`schema references `. * Type: boolean * Default: true * Importance: medium .. note:: To learn more about this setting, see :ref:`schema_evolution_and_compatibility`. ``max.schemas.per.subject`` Maximum number of schemas to create or cache locally. * Type: int * Default: 1000 * Importance: low ``key.subject.name.strategy`` Determines how to construct the subject name under which the key schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-key`` is used as the subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``value.subject.name.strategy`` Determines how to construct the subject name under which the value schema is registered with |sr|. For additional information, see |sr| :ref:`sr-schemas-subject-name-strategy`. Any implementation of ``io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy`` can be specified. By default, ``-value`` is used as the subject. Specifying an implementation of ``io.confluent.kafka.serializers.subject.SubjectNameStrategy`` is deprecated as of ``4.1.3`` and if used may have some performance degradation. * Type: class * Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy * Importance: medium ``basic.auth.credentials.source`` Specify how to pick the credentials for the Basic authentication header. The supported values are URL, USER_INFO and SASL_INHERIT. * Type: string * Default: "URL" * Importance: medium ``basic.auth.user.info`` Specify the user info for the Basic authentication in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration. * Type: password * Default: "" * Importance: medium The following |sr| dedicated properties, configurable on the client, are available on |cp| version 5.4.0 (and later). To learn more, see the information on configuring clients in :ref:`sr-https-additional`. ``schema.registry.ssl.truststore.location`` The location of the trust store file. For example, ``schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks`` * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.truststore.password`` The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.keystore.location`` The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, ``schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks``. * Type: string * Default: "" * Importance: medium ``schema.registry.ssl.keystore.password`` The store password for the key store file. This is optional for the client and only needed if ``ssl.keystore.location`` is configured. * Type: password * Default: "" * Importance: medium ``schema.registry.ssl.key.password`` The password of the private key in the key store file. This is optional for the client. * Type: password * Default: "" * Importance: medium