Using Kafka Connect with Schema Registry¶
Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Kafka Connect and Schema Registry integrate to capture schema information from
connectors. Kafka Connect converters provide a
mechanism for converting data from the internal data types used by
Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The
AvroConverter
, ProtobufConverter
, and JsonSchemaConverter
automatically register schemas generated by source connectors. Sink Connectors
receive schema information in addition to the data for each message. This allows
sink connectors to know the structure of the data to provide additional
capabilities like maintaining a database table structure or creating a search
index. Each of the converters change schema data into the internal data types
used by Kafka Connect.
For additional information about converters and how they work, see Configuring Key and Value Converters.
Example Converter Properties¶
To use Kafka Connect with Schema Registry, you must specify the key.converter
or
value.converter
properties in the connector or in the Connect worker configuration. The converters need an additional
configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.
Avro¶
Example Avro converter properties are shown below:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
The following additional configuration properties can be used with the Avro
converter (io.confluent.connect.avro.AvroConverter
). These Avro-specific
properties are added to the worker or connector configuration where the Avro
converter properties are located. Note that when added to the worker or
connector configuration, these properties require the key.converter.
and
value.converter.
prefix. For example:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
When using Avro with basic authentication, you add the following properties:
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.user.info={username}:{password}
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info={username}:{password}
When using Avro in a secure environment, you add
value.converter.schema.registry.ssl.
properties. An example of these
additional properties is shown below:
key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<trustore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>
value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<trustore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>
The following lists definitions for the Avro-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
scrub.invalid.names
Whether to scrub invalid names by replacing invalid characters with valid characters.
- Type: boolean
- Default: false
- Importance: medium
schemas.cache.config
The size of the schema cache used in the Avro converter.
- Type: int
- Default: 1000
- Importance: low
enhanced.avro.schema.support
Enable enhanced Avro schema support in the Avro Converter. When set to
true
, this property preserves Avro schema package information and Enums when going from Avro schema to Connect schema. This information is added back in when going from Connect schema to Avro schema.- Type: boolean
- Default: false
- Importance: low
connect.meta.data
Allow the Connect converter to add its metadata to the output schema.
- Type: boolean
- Default: true
- Importance: low
The
connect.meta.data
property preserves the following Connect schema metadata when going from Connect schema to Avro schema. The following metadata is added back in when going from Avro schema to Connect schema.- doc
- version
- parameters
- default value
- name
- type
Protobuf¶
Protobuf example converter properties are shown below:
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
The following lists definitions for the Protobuf-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
enhanced.protobuf.schema.support
Enable enhanced Protobuf schema support in the Protobuf Converter. When set to
true
, this property preserves Protobuf schema package information when going from Protobuf schema to Connect schema. This information is added back when going from Connect schema to Protobuf schema.- Type: boolean
- Default: false
- Importance: medium
scrub.invalid.names
Whether to scrub invalid names by replacing invalid characters with valid characters.
- Type: boolean
- Default: false
- Importance: medium
int.for.enums
Whether to represent enums as integers.
- Type: boolean
- Default: false
- Importance: medium
optional.for.nullables
Whether nullable fields should be specified with an optional label.
- Type: boolean
- Default: false
- Importance: medium
wrapper.for.nullables
Whether nullable fields should use primitive wrapper messages.
- Type: boolean
- Default: false
- Importance: medium
wrapper.for.raw.primitives
Whether a wrapper message should be interpreted as a raw primitive at the root level.
- Type: boolean
- Default: true
- Importance: medium
schemas.cache.config
The size of the schema cache used in the Protobuf converter.
- Type: int
- Default: 1000
- Importance: low
JSON Schema¶
JSON Schema example converter properties are shown below:
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
The following lists definitions for the JSON Schema-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
object.additional.properties
Whether to allow additional properties for object schemas.
- Type: boolean
- Default: true
- Importance: medium
use.optional.for.nonrequired
Whether to set non-required properties to be optional.
- Type: boolean
- Default: false
- Importance: medium
decimal.format
- Controls which format this converter will serialize decimals in. This value is case insensitive and can be either
BASE64
(default) orNUMERIC
.
Using Independent Key and Value Converters¶
The key and value converters can be used independently from each other. For
example, you may want to use a StringConverter
for keys and a converter used
with Schema Registry for values. An example of independent key and value properties is
shown below:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Converter Property Location and Inheritance¶
Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.
- Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
- Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
- Specify all converter properties in the worker configuration and add converter overrides in the connector configuration.
Important
- If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
- If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
- If you specify a converter in a connecter that is not defined in the worker, must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.
Example Scenario¶
The following are the worker configuration properties used in this example scenario:
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter
Using the previous worker properties, start three connectors with the following configuration properties:
connector-1 configuration:
name=connector-1 <no converter configuration properties used>
connector-2 configuration:
name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port
connector-3 configuration:
name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter
The results of the deployment are:
- connector-1 uses the worker configuration properties, with the Avro
converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost-1:port
. - connector-2 uses the Avro converter
(
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost-2:port
. - connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
- All connectors use the
value.converter
worker propertyorg.apache.kafka.connect.storage.StringConverter
.
Tip
For a deep dive into converters, see: Converters and Serialization Explained.
Configuration Options¶
schema.registry.url
Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
- Type: list
- Default: “”
- Importance: high
auto.register.schemas
Specify if the Serializer should attempt to register the Schema with Schema Registry.
- Type: boolean
- Default: true
- Importance: medium
use.latest.version
Only applies when
auto.register.schemas
is set tofalse
. Ifauto.register.schemas
is set tofalse
anduse.latest.version
is set totrue
, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry will use the latest version of the schema in the subject for serialization.- Type: boolean
- Default: false
- Importance: medium
Note
See also, how to use schema references to combine multiple event types in the same topic with Avro, JSON Schema, or Protobuf.
latest.compatibility.strict
Only applies when
use.latest.version
is set totrue
.If
latest.compatibility.strict
istrue
(the default), then when usinguse.latest.version=true
during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized. If the check fails, then an error results. If the check succeeds, then serialization is performed.If
latest.compatibility.strict
isfalse
, then the latest subject version is used for serialization, without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by settinglatest.compatibility.strict
tofalse
) may be useful, for example, when implementing Kafka Connect converters and schema references.- Type: boolean
- Default: true
- Importance: medium
Note
See also, Schema Evolution and Compatibility.
max.schemas.per.subject
Maximum number of schemas to create or cache locally.
- Type: int
- Default: 1000
- Importance: low
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-key
is used as the subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-value
is used as the subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
basic.auth.credentials.source
Specify how to pick the credentials for the Basic authentication header. The supported values are URL, USER_INFO and SASL_INHERIT.
- Type: string
- Default: “URL”
- Importance: medium
basic.auth.user.info
Specify the user info for the Basic authentication in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.
- Type: password
- Default: “”
- Importance: medium
The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.
schema.registry.ssl.truststore.location
The location of the trust store file. For example,
schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.truststore.password
The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.
- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.location
The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example,
schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks
.- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.password
The store password for the key store file. This is optional for the client and only needed if
ssl.keystore.location
is configured.- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.key.password
The password of the private key in the key store file. This is optional for the client.
- Type: password
- Default: “”
- Importance: medium