Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Using Kafka Connect with Schema Registry

Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro. The AvroConverter will automatically register schemas generated by source connectors. Sink Connectors will receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. The AvroConverter will convert existing Avro data to the internal data types used by Kafka Connect.

Example Configuration

To use Kafka Connect with Schema Registry, you must change the key.converter or value.converter properties in the Connect worker configuration. The key.converter and value.converter properties can be configured independently of each other.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

If you set converters in the Connect worker, you must also define the Schema Registry URL in the connector configuration file. For example:

key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

Or, simply:

schema.registry.url=http://localhost:8081

With the Schema Registry URL set in the connector as shown above, the appropriate prefixes are added for the defined converters.

Tip

  • As an alternative to defining converters in the Connect worker, you can define key.converter, value.converter, and associated Schema Registry URLs entirely in the connector. Confluent Platform first looks for converter configurations in the connector. If none are found there, the settings in the worker configs are used.
  • If converter values and associated Schema Registry URLs are defined in both the worker and the connector, settings in the connector overwrite those in the worker.

Configuration Options

schema.registry.url

Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.

  • Type: list
  • Default: “”
  • Importance: high
auto.register.schemas

Specify if the Serializer should attempt to register the Schema with Schema Registry

  • Type: boolean
  • Default: true
  • Importance: medium
max.schemas.per.subject

Maximum number of schemas to create or cache locally.

  • Type: int
  • Default: 1000
  • Importance: low
key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-key is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
basic.auth.credentials.source

Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT

  • Type: string
  • Default: “URL”
  • Importance: medium
basic.auth.user.info

Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.

  • Type: password
  • Default: “”
  • Importance: medium