Integrate Schemas from Kafka Connect in Confluent Platform¶
Kafka Connect and Schema Registry integrate to capture schema information from
connectors. Kafka Connect converters provide a
mechanism for converting data from the internal data types used by
Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The
AvroConverter
, ProtobufConverter
, and JsonSchemaConverter
automatically register schemas generated by source connectors. Sink Connectors
receive schema information in addition to the data for each message. This allows
sink connectors to know the structure of the data to provide additional
capabilities like maintaining a database table structure or creating a search
index. Each of the converters change schema data into the internal data types
used by Kafka Connect.
For additional information about converters and how they work, see Configuring Key and Value Converters.
Example Converter Properties¶
To use Kafka Connect with Schema Registry, you must specify the key.converter
or
value.converter
properties in the connector or in the Connect worker configuration. The converters need an additional
configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.
Avro¶
Example Avro converter properties are shown below:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
The following additional configuration properties can be used with the Avro
converter (io.confluent.connect.avro.AvroConverter
). These Avro-specific
properties are added to the worker or connector configuration where the Avro
converter properties are located. Note that when added to the worker or
connector configuration, these properties require the key.converter.
and
value.converter.
prefix. For example:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
When using Avro with basic authentication, you add the following properties:
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.basic.auth.user.info={username}:{password}
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info={username}:{password}
When using Avro in a secure environment, you add
value.converter.schema.registry.ssl.
properties. An example of these
additional properties is shown below:
key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<truststore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>
value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<truststore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>
The following lists definitions for the Avro-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
scrub.invalid.names
Whether to scrub invalid names by replacing invalid characters with valid characters.
- Type: boolean
- Default: false
- Importance: medium
schemas.cache.config
The size of the schema cache used in the Avro converter.
- Type: int
- Default: 1000
- Importance: low
enhanced.avro.schema.support
Enable enhanced Avro schema support in the Avro Converter. When set to
true
, this property preserves Avro schema package information and Enums when going from Avro schema to Connect schema. This information is added back in when going from Connect schema to Avro schema.- Type: boolean
- Default: false
- Importance: low
connect.meta.data
Allow the Connect converter to add its metadata to the output schema.
- Type: boolean
- Default: true
- Importance: low
The
connect.meta.data
property preserves the following Connect schema metadata when going from Connect schema to Avro schema. The following metadata is added back in when going from Avro schema to Connect schema.- doc
- version
- parameters
- default value
- name
- type
Protobuf¶
Protobuf example converter properties are shown below:
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
The following lists definitions for the Protobuf-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
enhanced.protobuf.schema.support
Enable enhanced Protobuf schema support in the Protobuf Converter. When set to
true
, this property preserves Protobuf schema package information when going from Protobuf schema to Connect schema. This information is added back when going from Connect schema to Protobuf schema.- Type: boolean
- Default: false
- Importance: medium
generate.index.for.unions
Whether to generate an index suffix for unions. By default,
oneOf
messages have their field names suffixed with an index (for example_0
), which results in a column name ofvalue_0.thing
. To configureoneOf
message field names without this suffix, setgenerate.index.for.unions
tofalse
. To learn more aboutoneOfs
in Protobuf, see Multiple event types in the same topic.- Type: boolean
- Default: true
- Importance: medium
scrub.invalid.names
Whether to scrub invalid names by replacing invalid characters with valid characters.
- Type: boolean
- Default: false
- Importance: medium
int.for.enums
Whether to represent enums as integers. The default is
false
. To represent enums as integers, setint.for.enums
totrue
.- Type: boolean
- Default: false
- Importance: medium
optional.for.nullables
Whether nullable fields should be specified with an optional label.
- Type: boolean
- Default: false
- Importance: medium
generate.struct.for.nulls
Whether to generate a struct variable for null values.
- Type: boolean
- Default: false
- Importance: medium
wrapper.for.nullables
Whether nullable fields should use primitive wrapper messages.
- Type: boolean
- Default: false
- Importance: medium
wrapper.for.raw.primitives
Whether a wrapper message should be interpreted as a raw primitive at the root level.
- Type: boolean
- Default: true
- Importance: medium
schemas.cache.config
The size of the schema cache used in the Protobuf converter.
- Type: int
- Default: 1000
- Importance: low
JSON Schema¶
JSON Schema example converter properties are shown below:
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
The following lists definitions for the JSON Schema-specific configuration properties. For additional Connect Schema Registry configuration options, see Configuration Options.
object.additional.properties
Whether to allow additional properties for object schemas.
- Type: boolean
- Default: true
- Importance: medium
use.optional.for.nonrequired
Whether to set non-required properties to be optional.
- Type: boolean
- Default: false
- Importance: medium
decimal.format
- Controls which format this converter will serialize decimals in. This value is case insensitive and can be either
BASE64
(default) orNUMERIC
.
Note
JSON Schema supports an empty schema that has no type defined. For example:
"f2": {}
This is not allowed with JsonSchemaConverter
in Connect. Connect requires a strongly-typed structure for its internal record schema. For example:
"holiday": {
"oneOf": [
{
"title": "Not included",
"type": "null"
},
{}
]
}
Using Independent Key and Value Converters¶
The key and value converters can be used independently from each other. For
example, you may want to use a StringConverter
for keys and a converter used
with Schema Registry for values. An example of independent key and value properties is
shown below:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Converter Property Location and Inheritance¶
Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there, properties in the Connect worker configuration are used. You have the following three options for how to set these properties. Each one affects how the properties are inherited among the worker and connectors.
- Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
- Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
- Specify all converter properties in the worker configuration and add all converter properties in the connector configuration if you need to override the worker ones
Important
- If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
- If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
- If you specify a converter in a connector that is not defined in the worker, you must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.
Example Scenario¶
The following are the worker configuration properties used in this example scenario:
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter
Using the previous worker properties, start three connectors with the following configuration properties:
connector-1 configuration:
name=connector-1 <no converter configuration properties used>
connector-2 configuration:
name=connector-2 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://host-2:port
connector-3 configuration:
name=connector-3 key.converter=io.confluent.connect.avro.AvroConverter
The results of the deployment are:
- connector-1 uses the worker configuration properties, with the Avro
converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost-1:port
. - connector-2 uses the Avro converter
(
io.confluent.connect.avro.AvroConverter
) and the Schema Registryhost-2:port
. - connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
- All connectors use the
value.converter
worker propertyorg.apache.kafka.connect.storage.StringConverter
.
Tip
For a deep dive into converters, see: Converters and Serialization Explained.
NULL values replaced with default values¶
The configuration property ignore.default.for.nullables
allows you to use a
NULL value for a nullable (optional) column that has a default value configured
for it. When set to true
, this property ensures that the corresponding
record in Kafka is NULL, instead of showing the default column value. The
property defaults to false
. A configuration snippet using this configuration
property is shown below:
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
value.converter.ignore.default.for.nullables=true
Configuration Options¶
schema.registry.url
Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
- Type: list
- Default: “”
- Importance: high
auto.register.schemas
Specify if the Serializer should attempt to register the Schema with Schema Registry.
- Type: boolean
- Default: true
- Importance: medium
use.latest.version
Only applies when
auto.register.schemas
is set tofalse
. Ifauto.register.schemas
is set tofalse
anduse.latest.version
is set totrue
, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry will use the latest version of the schema in the subject for serialization. The propertyuse.latest.version
can be set on producers or consumers to serialize or deserialize messages per the latest version.- Type: boolean
- Default: false
- Importance: medium
Note
To learn more, see how to use schema references to combine multiple event types in the same topic with Avro, JSON Schema, or Protobuf.
latest.compatibility.strict
Only applies when
use.latest.version
is set totrue
.If
latest.compatibility.strict
istrue
(the default), then when usinguse.latest.version=true
during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized. If the check fails, then an error results. If the check succeeds, then serialization is performed.If
latest.compatibility.strict
isfalse
, then the latest subject version is used for serialization, without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by settinglatest.compatibility.strict
tofalse
) may be useful, for example, when implementing Kafka Connect converters and schema references.- Type: boolean
- Default: true
- Importance: medium
Note
To learn more about this setting, see Schema Evolution and Compatibility for Schema Registry on Confluent Platform.
max.schemas.per.subject
Maximum number of schemas to create or cache locally.
- Type: int
- Default: 1000
- Importance: low
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject name strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-key
is used as the subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject name strategy.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default,<topic>-value
is used as the subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
basic.auth.credentials.source
Specify how to pick the credentials for the Basic authentication header. The supported values are URL, USER_INFO and SASL_INHERIT.
- Type: string
- Default: “URL”
- Importance: medium
basic.auth.user.info
Specify the user info for the Basic authentication in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.
- Type: password
- Default: “”
- Importance: medium
The following Schema Registry dedicated properties, configurable on the client, are available on Confluent Platform version 5.4.0 (and later). To learn more, see the information on configuring clients in Additional configurations for HTTPS.
schema.registry.ssl.truststore.location
The location of the trust store file. For example,
schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.truststore.password
The password for the trust store file. If a password is not set, access to the truststore is still available but integrity checking is disabled.
- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.location
The location of the key store file. This is optional for the client and can be used for two-way authentication for the client. For example,
schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks
.- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.password
The store password for the key store file. This is optional for the client and only needed if
ssl.keystore.location
is configured.- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.key.password
The password of the private key in the key store file. This is optional for the client.
- Type: password
- Default: “”
- Importance: medium