Integrate Schemas from Kafka Connect in Confluent Platform

Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by source connectors. Sink Connectors receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. Each of the converters change schema data into the internal data types used by Kafka Connect.

For additional information about converters and how they work, see Configuring Key and Value Converters.

Example Converter Properties

To use Kafka Connect with Schema Registry, you must specify the key.converter or value.converter properties in the connector or in the Connect worker configuration. The converters need an additional configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.

Avro

Example Avro converter properties are shown below:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The following additional configuration properties can be used with the Avro converter (io.confluent.connect.avro.AvroConverter). These Avro-specific properties are added to the worker or connector configuration where the Avro converter properties are located. Note that when added to the worker or connector configuration, these properties require the key.converter. and value.converter. prefix. For example:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true

When using Avro with basic authentication, you add the following properties:

key.converter.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.user.info={username}:{password}
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info={username}:{password}

When using Avro in a secure environment, you add value.converter.schema.registry.ssl. properties. An example of these additional properties is shown below:

key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<truststore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>

value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<truststore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>

The following lists definitions for the Avro-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.

scrub.invalid.names

Whether to scrub invalid names by replacing invalid characters with valid characters.

  • Type: boolean
  • Default: false
  • Importance: medium
schemas.cache.config

The size of the schema cache used in the Avro converter.

  • Type: int
  • Default: 1000
  • Importance: low
enhanced.avro.schema.support

Enable enhanced Avro schema support in the Avro Converter. When set to true, this property preserves Avro schema package information and Enums when going from Avro schema to Connect schema. This information is added back in when going from Connect schema to Avro schema.

  • Type: boolean
  • Default: false
  • Importance: low
connect.meta.data

Allow the Connect converter to add its metadata to the output schema.

  • Type: boolean
  • Default: true
  • Importance: low

The connect.meta.data property preserves the following Connect schema metadata when going from Connect schema to Avro schema. The following metadata is added back in when going from Avro schema to Connect schema.

  • doc
  • version
  • parameters
  • default value
  • name
  • type

Protobuf

Protobuf example converter properties are shown below:

key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081

The following lists definitions for the Protobuf-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.

enhanced.protobuf.schema.support

Enable enhanced Protobuf schema support in the Protobuf Converter. When set to true, this property preserves Protobuf schema package information when going from Protobuf schema to Connect schema. This information is added back when going from Connect schema to Protobuf schema.

  • Type: boolean
  • Default: false
  • Importance: medium
generate.index.for.unions

Whether to generate an index suffix for unions. By default, oneOf messages have their field names suffixed with an index (for example _0), which results in a column name of value_0.thing. To configure oneOf message field names without this suffix, set generate.index.for.unions to false. To learn more about oneOfs in Protobuf, see Multiple event types in the same topic.

  • Type: boolean
  • Default: true
  • Importance: medium
scrub.invalid.names

Whether to scrub invalid names by replacing invalid characters with valid characters.

  • Type: boolean
  • Default: false
  • Importance: medium
int.for.enums

Whether to represent enums as integers. The default is false. To represent enums as integers, set int.for.enums to true.

  • Type: boolean
  • Default: false
  • Importance: medium
optional.for.nullables

Whether nullable fields should be specified with an optional label.

  • Type: boolean
  • Default: false
  • Importance: medium
generate.struct.for.nulls

Whether to generate a struct variable for null values.

  • Type: boolean
  • Default: false
  • Importance: medium
wrapper.for.nullables

Whether nullable fields should use primitive wrapper messages.

  • Type: boolean
  • Default: false
  • Importance: medium
wrapper.for.raw.primitives

Whether a wrapper message should be interpreted as a raw primitive at the root level.

  • Type: boolean
  • Default: true
  • Importance: medium
schemas.cache.config

The size of the schema cache used in the Protobuf converter.

  • Type: int
  • Default: 1000
  • Importance: low

JSON Schema

JSON Schema example converter properties are shown below:

key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081

The following lists definitions for the JSON Schema-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.

object.additional.properties

Whether to allow additional properties for object schemas.

  • Type: boolean
  • Default: true
  • Importance: medium
use.optional.for.nonrequired

Whether to set non-required properties to be optional.

  • Type: boolean
  • Default: false
  • Importance: medium
decimal.format
Controls which format this converter will serialize decimals in. This value is case insensitive and can be either BASE64 (default) or NUMERIC.

Note

JSON Schema supports an empty schema that has no type defined. For example:

"f2": {}

This is not allowed with JsonSchemaConverter in Connect. Connect requires a strongly-typed structure for its internal record schema. For example:

"holiday": {
   "oneOf": [
     {
       "title": "Not included",
       "type": "null"
     },
     {}
   ]
 }

Using Independent Key and Value Converters

The key and value converters can be used independently from each other. For example, you may want to use a StringConverter for keys and a converter used with Schema Registry for values. An example of independent key and value properties is shown below:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Converter Property Location and Inheritance

Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.

  • Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
  • Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
  • Specify all converter properties in the worker configuration and add all converter properties in the connector configuration if you need to override the worker ones

Important

  • If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
  • If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
  • If you specify a converter in a connector that is not defined in the worker, you must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.

Example Scenario

The following are the worker configuration properties used in this example scenario:

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter

Using the previous worker properties, start three connectors with the following configuration properties:

  • connector-1 configuration:

    name=connector-1
    <no converter configuration properties used>
    
  • connector-2 configuration:

    name=connector-2
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://host-2:port
    
  • connector-3 configuration:

    name=connector-3
    key.converter=io.confluent.connect.avro.AvroConverter
    

The results of the deployment are:

  • connector-1 uses the worker configuration properties, with the Avro converter (io.confluent.connect.avro.AvroConverter) and the Schema Registry host-1:port.
  • connector-2 uses the Avro converter (io.confluent.connect.avro.AvroConverter) and the Schema Registry host-2:port.
  • connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
  • All connectors use the value.converter worker property org.apache.kafka.connect.storage.StringConverter.

Tip

For a deep dive into converters, see: Converters and Serialization Explained.

NULL values replaced with default values

The configuration property ignore.default.for.nullables allows you to use a NULL value for a nullable (optional) column that has a default value configured for it. When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. The property defaults to false. A configuration snippet using this configuration property is shown below:

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
value.converter.ignore.default.for.nullables=true

Configuration Options

schema.registry.url

Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.

  • Type: list
  • Default: “”
  • Importance: high
auto.register.schemas

Specify if the Serializer should attempt to register the Schema with Schema Registry.

  • Type: boolean
  • Default: true
  • Importance: medium
use.latest.version

Only applies when auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry will use the latest version of the schema in the subject for serialization. The property use.latest.version can be set on producers or consumers to serialize or deserialize messages per the latest version.

  • Type: boolean
  • Default: false
  • Importance: medium

Note

To learn more, see how to use schema references to combine multiple event types in the same topic with Avro, JSON Schema, or Protobuf.

latest.compatibility.strict

Only applies when use.latest.version is set to true.

If latest.compatibility.strict is true (the default), then when using use.latest.version=true during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized. If the check fails, then an error results. If the check succeeds, then serialization is performed.

If latest.compatibility.strict is false, then the latest subject version is used for serialization, without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by setting latest.compatibility.strict to false) may be useful, for example, when implementing Kafka Connect converters and schema references.

  • Type: boolean
  • Default: true
  • Importance: medium
max.schemas.per.subject

Maximum number of schemas to create or cache locally.

  • Type: int
  • Default: 1000
  • Importance: low
key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject name strategy.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-key is used as the subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject name strategy.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-value is used as the subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
basic.auth.credentials.source

Specify how to pick the credentials for the Basic authentication header. The supported values are URL, USER_INFO and SASL_INHERIT.

  • Type: string
  • Default: “URL”
  • Importance: medium
basic.auth.user.info

Specify the user info for the Basic authentication in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.

  • Type: password
  • Default: “”
  • Importance: medium

The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.

schema.registry.ssl.truststore.location

The location of the trust store file. For example, schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.truststore.password

The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.location

The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks.

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.password

The store password for the key store file. This is optional for the client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.key.password

The password of the private key in the key store file. This is optional for the client.

  • Type: password
  • Default: “”
  • Importance: medium