Using Kafka Connect with Schema Registry¶
Kafka Connect and Schema Registry integrate to capture schema information from
connectors. Kafka Connect converters provide a
mechanism for converting data from the internal data types used by
Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The
AvroConverter
, ProtobufConverter
, and JsonSchemaConverter
automatically register schemas generated by source connectors. Sink Connectors
receive schema information in addition to the data for each message. This allows
sink connectors to know the structure of the data to provide additional
capabilities like maintaining a database table structure or creating a search
index. Each of the converters change schema data into the internal data types
used by Kafka Connect.
For additional information about converters and how they work, see Configuring Key and Value Converters.
Example Configuration¶
To use Kafka Connect with Schema Registry, you must specify the key.converter
or
value.converter
properties in the connector or in the Connect worker
configuration.
The converters need an additional configuration for the Schema Registry URL, which is
specified by providing the required .converter
prefix.
Avro¶
Avro example properties are shown below:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Protobuf¶
Protobuf example properties are shown below:
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
JSON Schema¶
JSON Schema example properties are shown below:
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
Using Independent Key and Value Converters¶
The key and value converters can be used independently from each other. For
example, you may want to use a StringConverter
for keys and a converter used
with Schema Registry for values. An example of independent key and value properties is
shown below:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Where to Specify Converter Configurations and How Properties are Inherited¶
Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.
- Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
- Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
- Specify all converter properties in the worker configuration and add converter overrides in the connector configuration.
Important
- If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
- If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
- If you specify a converter in a connecter that is not defined in the worker, must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.
Example Scenario¶
The following are the worker configuration properties used in this example scenario:
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter
Using the worker properties above, start three connectors with the following configuration properties:
connector-1 configuration:
name=connector-1 <no converter configuration properties used>
connector-2 configuration:
name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port
connector-3 configuration:
name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter
The results of the deployment are:
- connector-1 uses the worker configuration properties, with the Avro converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost:port
. - connector-2 uses the Avro converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost-2:port
. - connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
- All connectors use the
value.converter
worker propertyorg.apache.kafka.connect.storage.StringConverter
.
Tip
For a deep dive into converters, see: Converters and Serialization Explained.
Configuration Options¶
schema.registry.url
Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
- Type: list
- Default: “”
- Importance: high
auto.register.schemas
Specify if the Serializer should attempt to register the Schema with Schema Registry
- Type: boolean
- Default: true
- Importance: medium
max.schemas.per.subject
Maximum number of schemas to create or cache locally.
- Type: int
- Default: 1000
- Importance: low
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-key
is used as subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-value
is used as subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
basic.auth.credentials.source
Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT
- Type: string
- Default: “URL”
- Importance: medium
basic.auth.user.info
Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.
- Type: password
- Default: “”
- Importance: medium
The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.
schema.registry.ssl.truststore.location
The location of the trust store file. For example,
schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.truststore.password
The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.
- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.location
The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example,
schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks
.- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.password
The store password for the key store file. This is optional for the client and only needed if
ssl.keystore.location
is configured.- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.key.password
The password of the private key in the key store file. This is optional for the client.
- Type: password
- Default: “”
- Importance: medium