Drop¶
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.Drop
.
Description¶
Set either the key or value of a message to null. The corresponding schema can
also be set to null, set as optional, checked that it is already optional, or
kept as-is. Use the concrete transformation type designed for the record key
(io.confluent.connect.transforms.Drop$Key
) or value
(io.confluent.connect.transforms.Drop$Value
).
Installation¶
This transformation is developed by Confluent and does not ship by default with Kafka or Confluent Platform. You can install this transformation via the Confluent Hub Client:
confluent-hub install confluentinc/connect-transforms:latest
Examples¶
The configuration snippets below show how to use and configure the Drop
SMT.
transforms=dropKeyExample, dropValueAndForceOptionalSchemaExample
# Drop the key from the message, using the default behavior for schemas, which is to nullify them if
# they are not already null
transforms.dropKeyExample.type=io.confluent.connect.transforms.Drop$Key
# Drop the value from the message, and if the schema for the value isn't already optional,
# forcefully overwrite it to become optional.
transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Value
transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=force_optional
- Record contents using
Drop$Key
withschema.behavior
set tonullify
: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=nullify
- Before:
key: 24
,schema: {"type": "integer"}
- After:
key: null
,schema: null
The key is nullified and the schema is nullified.
- Before:
- Record contents using
Drop$Key
withschema.behavior
set toretain
: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=retain
- Before:
key: 24
,schema: {"type": "integer"}
- After:
key: null
,schema: {"type": "integer"}
The key is nullified and the schema is unchanged.
- Before:
- Record contents using
Drop$Key
withschema.behavior
set tovalidate
where the schema is not optional: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=validate
- Before:
key: 24
,schema: {"type": "integer"}
- After: Throws exception because the schema is not optional.
- Before:
- Record contents using
Drop$Key
withschema.behavior
set tovalidate
, where the schema is optional: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=validate
- Before:
key: 24
,schema: {"type": "integer", "optional": true}
- After:
key: null
,schema: {"type": "integer", "optional": true}
The key is nullified and the schema is unchanged.
- Before:
- Record contents using
Drop$Key
withschema.behavior
set toforce_optional
, where the schema is not optional: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=force_optional
- Before:
key: 24
,schema: {"type": "integer"}
- After:
key: null
,schema: {"type": "integer", "optional": "true"}
The key is nullified and the schema is made optional.
- Before:
- Record contents using
Drop$Key
withschema.behavior
set toforce_optional
, where the schema is already optional: transforms.dropValueAndForceOptionalSchemaExample.type=io.confluent.connect.transforms.Drop$Key transforms.dropValueAndForceOptionalSchemaExample.schema.behavior=force_optional
- Before:
key: 24
,schema: {"type": "integer", "optional": true}
- After:
key: null
,schema: {"type": "integer", "optional": true}
The key is nullified and the schema is unchanged.
- Before:
Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
schema.behavior |
How to handle non-null schemas. If set to nullify , then the schema for the new record is null. If set to retain , the schema is used, regardless of whether it is optional. If set to validate , the schema is checked first, and if it is optional, it is used as-is; otherwise, an exception is thrown. If set to force_optional , the schema is overwritten to be optional, if it is not already. |
string | nullify | [nullify, retain, validate, force_optional] | medium |
Predicates¶
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.
Predicates are specified in the connector configuration. The following properties are used:
predicates
: A set of aliases for predicates applied to one or more transformations.predicates.$alias.type
: Fully qualified class name for the predicate.predicates.$alias.$predicateSpecificConfig
: Configuration properties for the predicate.
All transformations have the implicit config properties predicate
and
negate
. A predicular predicate is associated with a transformation by
setting the transformation’s predicate configuration to the predicate’s alias.
The predicate’s value can be reversed using the negate
configuration
property.
Kafka Connect includes the following predicates:
org.apache.kafka.connect.predicates.TopicNameMatches
: Matches records in a topic with a name matching a particular Java regular expression.org.apache.kafka.connect.predicates.HasHeaderKey
: Matches records which have a header with the given key.org.apache.kafka.connect.predicates.RecordIsTombstone
: Matches tombstone records (that is, records with a null value).
Predicate Examples¶
Example 1:
You have a source connector that produces records to many different topics and you want to do the following:
- Filter out the records in the
foo
topic entirely. - Apply the
ExtractField
transformation with the field nameother_field
to records in all topics, except the topicbar
.
To do this, you need to first filter out the records destined for the topic
foo
. The Filter transformation removes records from further processing.
Next, you use the TopicNameMatches
predicate to apply the transformation
only to records in topics which match a certain regular expression. The only
configuration property for TopicNameMatches
is a Java regular expression
used as a pattern for matching against the topic name. The following example
shows this configuration:
transforms=Filter
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo
predicates=IsFoo
predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo
Using this configuration, ExtractField
is then applied only when the topic
name of the record is not bar
. The reason you can’t use TopicNameMatches
directly is because it would apply the transformation to matching topic names,
not topic names which do not match. The transformation’s implicit negate
configuration properties inverts the set of records which a predicate matches.
This configuration addition is shown below:
transforms=Filter,Extract
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=other_field
transforms.Extract.predicate=IsBar
transforms.Extract.negate=true
predicates=IsFoo,IsBar
predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo
predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsBar.pattern=bar
Example 2:
The following configuration shows how to use a predicate in a transformation
chain with the ExtractField
transformation and the negate=true
configuration property:
transforms=t2
transforms.t2.predicate=has-my-prefix
transforms.t2.negate=true
transforms.t2.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.t2.field=c1
predicates=has-my-prefix
predicates.has-my-prefix.type=org.apache.kafka.connect.predicates.TopicNameMatch
predicates.has-my-prefix.pattern=my-prefix-.*
The transform t2
is only applied when the predicate has-my-prefix
is
false (using the negate=true
parameter). The predicate is configured by the
keys with prefix predicates.has-my-prefix
. The predicate class is
org.apache.kafka.connect.predicates.TopicNameMatch
and it’s pattern
parameter has the value my-prefix-.*
. With this configuration, the
transformation is applied only to records where the topic name does not
start with my-prefix-
.
Tip
The benefit of defining the predicate separately from the transform is it makes it easier to apply the same predicate to multiple transforms. For example, you can have one set of transforms use one predicate and another set of transforms use the same predicate for negation.
Predicate Properties¶
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
TopicNameMatches |
A predicate which is true for records with a topic name that matches the configured regular expression. | string | non-empty string, valid regex | medium | |
HasHeaderKey |
A predicate which is true for records with at least one header with the configured name. | string | non-empty string | medium | |
RecordIsTombstone |
A predicate which is true for records which are tombstones (that is, records with a null value). | medium |