SET Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® enables setting Flink SQL shell properties to different values.

Syntax

SET 'key' = 'value';

Description

Modify or list the Flink SQL shell configuration.

If no key and value are specified, SET prints all of the properties that you have assigned for the session.

To reset a session property to its default value, use the RESET Statement in Confluent Cloud for Apache Flink.

Note

In a Cloud Console workspace, the SET statement can’t be run separately and must be submitted along with another Flink SQL statement, like SELECT, CREATE, or INSERT, for example:

SET 'sql.current-catalog' = 'default';
SET 'sql.current-database' = 'cluster_0';
SELECT * FROM pageviews;

Example

The following examples show how to run a SET statement in the Flink SQL shell.

SET 'table.local-time-zone' = 'America/Los_Angeles';

Your output should resemble:

Statement successfully submitted.
Statement phase is COMPLETED.
configuration updated successfully.

To list the current session settings, run the SET command with no parameters.

SET;

Your output should resemble:

 Statement successfully submitted.
 Statement phase is COMPLETED.
+-----------------------+--------------------------+
|          Key          |          Value           |
+-----------------------+--------------------------+
| catalog               | default (default)        |
| default_database      | <your_cluster> (default) |
| table.local-time-zone | America/Los_Angeles      |
+-----------------------+--------------------------+

The SET; operation is not supported in Cloud Console workspaces.

Available SET Options

These are the available configuration options available by using the SET statement in Confluent Cloud for Apache Flink.

Note

In a Cloud Console workspace, the only client option you can set is client.statement-name. The client.results-timeout, client.output-format, and client.service-account options are available only in the Flink SQL shell.

Key Default Type Description
client.output-format standard String Output format. Valid values are “standard” or “plain-text”.
client.results-timeout 600000 Long Total amount of time, in milliseconds, to wait before timing out the request waiting for results to be ready.
client.service-account (None) String Service account to use instead of running statements attached to your user account. For more information, see Submit long-running statements.
client.statement-name (None) String Give your Flink statement a meaningful name that can help you identify it more easily. Instead of 123e4567-e89b-12d3, this sets the statement name to the given value. To avoid naming conflicts, the name resets itself after successful submission.
sql.current-catalog (None) String Default catalog to use. Is also set by USE CATALOG [catalog_name].
sql.current-database (None) String Default database to use. Is also set by USE [database_id].
sql.local-time-zone Local device time zone String Specify the local time zone offset for SQL. When converting to data types that don’t include a time zone (for example, TIMESTAMP, TIME, or simply STRING), this time zone is used. The input for this option is either a full name, like “America/Los_Angeles”, or a custom timezone id, like “GMT-08:00”.
sql.state-ttl 0 ms Duration Specifies a minimum time interval for how long idle state, meaning state that is not updated, is retained. State is never cleared when idle for less than the minimum time, and is cleared at some time after the idle duration.
sql.tables.scan.bounded.mode (None) Enum Specifies the bounded mode for a Kafka consumer. For more information, see scan.bounded.mode.
sql.tables.scan.bounded.timestamp-millis (None) Long End at the specified epoch timestamp (milliseconds). Used by the timestamp bounded mode.
sql.tables.scan.idle-timeout 15 s Duration When a source does not receive any elements for the timeout time, it is marked as temporarily idle. This enables each downstream task to advance its watermark without the need to wait for watermarks from this source while it is idle. By default, Confluent Cloud for Apache Flink has progressive idleness detection that starts at 15 s and increases to a maximum of 5 m over time. You can turn off progressive idleness by setting sql.tables.scan.idle-timeout to 0 ms, or you can can set a fixed idleness timeout with your desired value.
sql.tables.scan.startup.mode (None) Enum Specifies the startup mode for Kafka consumer for this specific statement. For more information, see scan.startup.mode.
sql.tables.scan.startup.timestamp-millis (None) Long Start from the specified epoch timestamp (milliseconds) used in case of ‘timestamp’ startup mode.