Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Using Kafka Connect with Schema Registry

Kafka Connect and Schema Registry integrate to capture schema information from connectors. Kafka Connect converters provide a mechanism for converting data from the internal data types used by Kafka Connect to data types represented as Avro. The AvroConverter will automatically register schemas generated by source connectors. Sink Connectors will receive schema information in addition to the data for each message. This allows sink connectors to know the structure of the data to provide additional capabilities like maintaining a database table structure or creating a search index. The AvroConverter will convert existing Avro data to the internal data types used by Kafka Connect.

Example Configuration

To use Kafka Connect with Schema Registry, you must change the key.converter or value.converter properties in the Connect worker configuration. The key.converter and value.converter properties can be configured independently of each other.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

Configuration Options

schema.registry.url

Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.

  • Type: list
  • Default: “”
  • Importance: high
auto.register.schemas

Specify if the Serializer should attempt to register the Schema with Schema Registry

  • Type: boolean
  • Default: true
  • Importance: medium
max.schemas.per.subject

Maximum number of schemas to create or cache locally.

  • Type: int
  • Default: 1000
  • Importance: low
key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-key is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
basic.auth.credentials.source

Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT

  • Type: string
  • Default: “URL”
  • Importance: medium
basic.auth.user.info

Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.

  • Type: password
  • Default: “”
  • Importance: medium