Using Kafka Connect with Schema Registry

Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro. The AvroConverter will automatically register schemas generated by source connectors. Sink Connectors will receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. The AvroConverter will convert existing Avro data to the internal data types used by Kafka Connect.

Example Configuration

See also

For an example that shows this in action, see the Confluent Platform demo. Refer to the demo’s docker-compose.yml for a configuration reference.

To use Kafka Connect with Schema Registry, you must specify the key.converter or value.converter properties in the connector or in the Connect worker configuration. The key.converter and value.converter properties can be configured independently of each other. The converters need an additional configuration for the Schema Registry URL, which is specified by providing the required .converter prefix. For example:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Confluent Platform first looks for converter configurations in the connector. If none are found there, the settings in the Connect worker configs are used. Therefore, you have options as to where to specify these properties, which affects how they are inherited among the worker and connectors.

  • Specify all (converters and Schema Registry URL prefixed properties) individually in each connector configuration.
  • Specify all properties only in the worker configuration, in which case the same settings will be used by all connectors.
  • Specify all properties in the worker configuration, and specify all overrides in the connectors.

Tip

  • If converter values and associated Schema Registry URLs are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
  • If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.

Example Scenario

Given the following worker configuration:

group.id=xyz
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://url:port
value.converter=org.apache.kafka.connect.storage.StringConverter

In that worker, start three connectors, with the following configurations.

  • connector-1:

    name=connector-1
    <no converter configs>
    
  • connector-2:

    name=connector-2
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://url_2:port
    
  • connector-3:

    name=connector-3
    key.converter=io.confluent.connect.avro.AvroConverter
    

The results of the deployment will be:

  • connector-1 will use the worker configurations and use the Avro converter (io.confluent.connect.avro.AvroConverter) with Schema Registry at url:port.
  • connector-2 will use the Avro converter (io.confluent.connect.avro.AvroConverter) with Schema Registry at url_2:port.
  • connector-3 will fail because it tries to pick up the configuration from the connector and does not find a config for Schema Registry URL, which is mandatory for the Avro converter.

value.converter will be org.apache.kafka.connect.storage.StringConverter for all three converters.

Configuration Options

schema.registry.url

Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.

  • Type: list
  • Default: “”
  • Importance: high
auto.register.schemas

Specify if the Serializer should attempt to register the Schema with Schema Registry

  • Type: boolean
  • Default: true
  • Importance: medium
max.schemas.per.subject

Maximum number of schemas to create or cache locally.

  • Type: int
  • Default: 1000
  • Importance: low
key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-key is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
basic.auth.credentials.source

Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT

  • Type: string
  • Default: “URL”
  • Importance: medium
basic.auth.user.info

Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.

  • Type: password
  • Default: “”
  • Importance: medium

The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.

schema.registry.ssl.truststore.location

The location of the trust store file. For example, schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.truststore.password

The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.location

The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks.

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.password

The store password for the key store file. This is optional for the client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.key.password

The password of the private key in the key store file. This is optional for the client.

  • Type: password
  • Default: “”
  • Importance: medium