Using Kafka Connect with Schema Registry

Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The AvroConverter, ProtobufConverter, and JsonSchemaConverter automatically register schemas generated by source connectors. Sink Connectors receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. Each of the converters change schema data into the internal data types used by Kafka Connect.

For additional information about converters and how they work, see Configuring Key and Value Converters.

Example Configuration

To use Kafka Connect with Schema Registry, you must specify the key.converter or value.converter properties in the connector or in the Connect worker configuration.

The converters need an additional configuration for the Schema Registry URL, which is specified by providing the required .converter prefix.

Avro

Avro example properties are shown below:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Protobuf

Protobuf example properties are shown below:

key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081

JSON Schema

JSON Schema example properties are shown below:

key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081

Using Independent Key and Value Converters

The key and value converters can be used independently from each other. For example, you may want to use a StringConverter for keys and a converter used with Schema Registry for values. An example of independent key and value properties is shown below:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Where to Specify Converter Configurations and How Properties are Inherited

Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.

  • Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
  • Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
  • Specify all converter properties in the worker configuration and add converter overrides in the connector configuration.

Important

  • If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
  • If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
  • If you specify a converter in a connecter that is not defined in the worker, must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.

Example Scenario

The following are the worker configuration properties used in this example scenario:

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter

Using the worker properties above, start three connectors with the following configuration properties:

  • connector-1 configuration:

    name=connector-1
    <no converter configuration properties used>
    
  • connector-2 configuration:

    name=connector-2
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://host-2:port
    
  • connector-3 configuration:

    name=connector-3
    key.converter=io.confluent.connect.avro.AvroConverter
    

The results of the deployment are:

  • connector-1 uses the worker configuration properties, with the Avro converter (io.confluent.connect.avro.AvroConverter) and the Schema Registry host:port.
  • connector-2 uses the Avro converter (io.confluent.connect.avro.AvroConverter) and the Schema Registry host-2:port.
  • connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
  • All connectors use the value.converter worker property org.apache.kafka.connect.storage.StringConverter.

Tip

For a deep dive into converters, see: Converters and Serialization Explained.

Configuration Options

schema.registry.url

Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.

  • Type: list
  • Default: “”
  • Importance: high
auto.register.schemas

Specify if the Serializer should attempt to register the Schema with Schema Registry

  • Type: boolean
  • Default: true
  • Importance: medium
max.schemas.per.subject

Maximum number of schemas to create or cache locally.

  • Type: int
  • Default: 1000
  • Importance: low
key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-key is used as subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
basic.auth.credentials.source

Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT

  • Type: string
  • Default: “URL”
  • Importance: medium
basic.auth.user.info

Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.

  • Type: password
  • Default: “”
  • Importance: medium

The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.

schema.registry.ssl.truststore.location

The location of the trust store file. For example, schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.truststore.password

The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.location

The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example, schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks.

  • Type: string
  • Default: “”
  • Importance: medium
schema.registry.ssl.keystore.password

The store password for the key store file. This is optional for the client and only needed if ssl.keystore.location is configured.

  • Type: password
  • Default: “”
  • Importance: medium
schema.registry.ssl.key.password

The password of the private key in the key store file. This is optional for the client.

  • Type: password
  • Default: “”
  • Importance: medium