Variable Substitution in ksqlDB for Confluent Platform

Context

You have a set of SQL statements, and you want to vary the exact content depending on where you use them. To do that, ksqlDB supports variables so that you can supply different values.

In action

DEFINE format = 'AVRO';
DEFINE replicas = '3';

CREATE STREAM stream1 (
  id INT
) WITH (
  kafka_topic = 'stream1',
  value_format = '${format}',
  replicas = ${replicas}
);

Use a variable

Begin by defining one or more variables with a compliant name:

DEFINE format = 'AVRO';
DEFINE replicas = '3';

Now reference each variable by enclosing it between ${} characters:

CREATE STREAM stream1 (
  id INT
) WITH (
  kafka_topic = 'stream1',
  value_format = '${format}',
  replicas = ${replicas}
);

Note

Variables are case-insensitive. A reference to ${replicas} is the same as ${REPLICAS}.

At any point, you can list all variables to see what is available:

SHOW VARIABLES;

Which should output:

 Variable Name | Value
----------------------------
 replicas      | 3
 format        | AVRO
----------------------------

If you want to undefine a variable, you can unbound the name like so:

UNDEFINE replicas;

Note

Single-quotes are removed during variable substitution. To escape single-quotes, enclose the value with triple-quotes.

Escape substitution variables

If you want to escape a variable reference so it is not substituted, use double $$ characters.

DEFINE format = 'AVRO';
SELECT '$${format}' FROM stream;

The above query will become SELECT '${format}' FROM stream.

Context for substitution variables

Variable substitution is allowed in specific SQL statements. You can replace text and non-text literals, and identifiers like column names and stream/table names. You can’t use variables as reserved keywords.

The following statements show examples of using variables for stream and column names, and in other places.

CREATE STREAM ${streamName} (
  ${colName1} INT,
  ${colName2} STRING
) WITH (
  kafka_topic = '${topicName}',
  format = '${format}',
  replicas = ${replicas},
  ...
);

INSERT INTO ${streamName} (
  ${colName1},
  ${colName2}
) VALUES (
  ${val1},
  '${val2}'
);

SELECT * FROM ${streamName} EMIT CHANGES;

Using a variable in a statement that doesn’t support variables causes a SQL parsing error.

Disable substitution variables

Enable or disable variable substitution by setting the ksql.variable.substitution.enable server configuration parameter.

ksql> SET 'ksql.variable.substitution.enable' = 'false';
ksql> CREATE STREAM ... WITH (kafka_topic='${topic_${env}}');
Error: Fail because ${topic_${env}} topic name is invalid.