Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Using Kafka Connect with Schema Registry¶
Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro. The AvroConverter will automatically register schemas generated by source connectors. Sink Connectors will receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. The AvroConverter will convert existing Avro data to the internal data types used by Kafka Connect.
Example Configuration¶
Tip
For an example that shows this in action, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml for a configuration reference.
To use Kafka Connect with Schema Registry, you must change the key.converter
or value.converter
properties in the Connect worker configuration. The key.converter
and value.converter
properties can be configured independently of each other.
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
If you set converters in the Connect worker, you must also define the Schema Registry URL in the connector configuration file. For example:
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
Or, simply:
schema.registry.url=http://localhost:8081
With the Schema Registry URL set in the connector as shown above, the appropriate prefixes are added for the defined converters.
Tip
- As an alternative to defining converters in the Connect worker, you can define
key.converter
,value.converter
, and associated Schema Registry URLs entirely in the connector. Confluent Platform first looks for converter configurations in the connector. If none are found there, the settings in the worker configs are used. - If converter values and associated Schema Registry URLs are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
Configuration Options¶
schema.registry.url
Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
- Type: list
- Default: “”
- Importance: high
auto.register.schemas
Specify if the Serializer should attempt to register the Schema with Schema Registry
- Type: boolean
- Default: true
- Importance: medium
max.schemas.per.subject
Maximum number of schemas to create or cache locally.
- Type: int
- Default: 1000
- Importance: low
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry.
Any implementation of
io.confluent.kafka.serializers.subject.SubjectNameStrategy
can be specified. By default, <topic>-key is used as subject.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
Any implementation of
io.confluent.kafka.serializers.subject.SubjectNameStrategy
can be specified. By default, <topic>-value is used as subject.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
basic.auth.credentials.source
Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT
- Type: string
- Default: “URL”
- Importance: medium
basic.auth.user.info
Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.
- Type: password
- Default: “”
- Importance: medium