Run a Statement in ksqlDB for Confluent Platform

The /ksql resource runs a sequence of SQL statements. All statements, except those starting with SELECT and PRINT, can be run on this endpoint. To run SELECT and PRINT statements use the /query endpoint.

Note

If you use the SET or UNSET statements to assign query properties by using the REST API, the assignment is scoped only to the current request. In contrast, SET and UNSET assignments in the ksqlDB CLI persist throughout the CLI session.

POST /ksql

Run a sequence of SQL statements.

JSON Parameters

  • ksql (string): A semicolon-delimited sequence of SQL statements to run.
  • streamsProperties (map): Property overrides to run the statements with. Refer to the Configuration Parameter Reference for details on properties that you can set.
  • streamsProperties[``property-name``] (string): The value of the property
  • named by property-name. Both the value and property-name should be strings.
  • sessionVariables (map): Optional. Starting from 0.18, the parameter sessionVariables accepts a map of string variable names and values of any type as initial variable substitution values. See ksqlDB Variable Substitution for more information on variable substitution.
  • commandSequenceNumber (long): Optional. If specified, the statements will not be run until all existing commands up to and including the specified sequence number have completed. If unspecified, the statements are run immediately. When a command is processed, the result object contains its sequence number.

The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement.

Common Fields

The following fields are common to all responses.

Response JSON Object:

  • statementText (string): The SQL statement whose result is being returned.
  • warnings (array): A list of warnings about conditions that may be unexpected by the user, but don’t result in failure to execute the statement.
  • warnings[i].message (string): A message detailing the condition being warned on.

CREATE, DROP, TERMINATE

Response JSON Object:

  • commandId (string): A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
  • commandStatus.status (string): One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
  • commandStatus.message (string): Detailed message regarding the status of the execution statement.
  • commandSequenceNumber (long): The sequence number of the requested operation in the command queue, or -1 if the operation was unsuccessful.

LIST STREAMS, SHOW STREAMS

Response JSON Object:

  • streams (array): List of streams.
  • streams[i].name (string): The name of the stream.
  • streams[i].topic (string): The topic backing the stream.
  • streams[i].format (string): The serialization format of the data in the stream. One of JSON, AVRO, PROTOBUF, or DELIMITED.
  • streams[i].type (string): The source type. Always returns STREAM.

LIST TABLES, SHOW TABLES

Response JSON Object:

  • tables (array): List of tables.
  • tables[i].name (string): The name of the table.
  • tables[i].topic (string): The topic backing the table.
  • tables[i].format (string): The serialization format of the data in the table. One of JSON, AVRO, PROTOBUF, or DELIMITED.
  • tables[i].type (string): The source type. Always returns TABLE.
  • tables[i].isWindowed (boolean): true if the table provides windowed results; otherwise, false.

LIST QUERIES, SHOW QUERIES

Response JSON Object:

  • queries (array): List of queries.
  • queries[i].queryString (string): The text of the statement that started the query.
  • queries[i].sinks (string): The streams and tables being written to by the query.
  • queries[i].id (string): The query ID.

LIST PROPERTIES, SHOW PROPERTIES

Response JSON Object:

  • properties (map): The ksqlDB server query properties.
  • properties[``property-name``] (string): The value of the property named by property-name.

DESCRIBE

Response JSON Object:

  • sourceDescription.name (string): The name of the stream or table.
  • sourceDescription.readQueries (array): The queries reading from the stream or table.
  • sourceDescription.writeQueries (array): The queries writing into the stream or table
  • sourceDescription.fields (array): A list of field objects that describes each field in the stream/table.
  • sourceDescription.fields[i].name (string): The name of the field.
  • sourceDescription.fields[i].schema (object): A schema object that describes the schema of the field.
  • sourceDescription.fields[i].schema.type (string): The type the schema represents. One of INTEGER, BIGINT, BOOLEAN, BYTES, DOUBLE, STRING, TIMESTAMP, TIME, DATE, MAP, ARRAY, or STRUCT.
  • sourceDescription.fields[i].schema.memberSchema (object): A schema object. For MAP and ARRAY types, contains the schema of the map values and array elements, respectively. For other types this field is not used and its value is undefined.
  • sourceDescription.fields[i].schema.fields (array): For STRUCT types, contains a list of field objects that describes each field within the struct. For other types this field is not used and its value is undefined.
  • sourceDescription.type (string): STREAM or TABLE
  • sourceDescription.key (string): The name of the key column.
  • sourceDescription.timestamp (string): The name of the timestamp column.
  • sourceDescription.format (string): The serialization format of the data in the stream or table. One of JSON, AVRO, PROTOBUF, or DELIMITED.
  • sourceDescription.topic (string): The topic backing the stream or table.
  • sourceDescription.extended (boolean): A boolean that indicates whether this is an extended description.
  • sourceDescription.statistics (string): A string that contains statistics about production and consumption to and from the backing topic (extended only).
  • sourceDescription.errorStats (string): A string that contains statistics about errors producing and consuming to and from the backing topic (extended only).
  • sourceDescription.replication (int): The replication factor of the backing topic (extended only).
  • sourceDescription.partitions (int): The number of partitions in the backing topic (extended only).

EXPLAIN

Response JSON Object:

  • queryDescription.statementText (string): The ksqlDB statement for which the query being explained is running.
  • queryDescription.fields (array): A list of field objects that describes each field in the query output.
  • queryDescription.fields[i].name (string): The name of the field.
  • queryDescription.fields[i].schema (object): A schema object that describes the schema of the field.
  • queryDescription.fields[i].schema.type (string): The type the schema represents. One of INTEGER, BIGINT, BOOLEAN, BYTES, DOUBLE, STRING, TIMESTAMP, TIME, DATE, MAP, ARRAY, or STRUCT.
  • queryDescription.fields[i].schema.memberSchema (object): A schema object. For MAP and ARRAY types, contains the schema of the map values and array elements, respectively. For other types this field is not used and its value is undefined.
  • queryDescription.fields[i].schema.fields (array): For STRUCT types, contains a list of field objects that describes each field within the struct. For other types this field is not used and its value is undefined.
  • queryDescription.sources (array): The streams and tables being read by the query.
  • queryDescription.sources[i] (string): The name of a stream or table being read from by the query.
  • queryDescription.sinks (array): The streams and tables being written to by the query.
  • queryDescription.sinks[i] (string): The name of a stream or table being written to by the query.
  • queryDescription.executionPlan (string): The query execution plan.
  • queryDescription.topology (string): The Kafka Streams topology that the query is running.
  • overriddenProperties (map): The property overrides that the query is running with.

Errors

If ksqlDB fails to execute a statement, it returns a response with an error status code (4xx/5xx). Even if an error is returned, the server may have been able to successfully execute some statements in the request. In this case, the response includes the error_code and message fields, a statementText field with the text of the failed statement, and an entities field that contains an array of result objects:

Response JSON Object:

  • statementText (string): The text of the SQL statement where the error occurred.
  • entities (array): Result objects for statements that were successfully executed by the server.

The /ksql endpoint may return the following error codes in the error_code field:

  • 40001 (BAD_STATEMENT): The request contained an invalid SQL statement.
  • 40002 (QUERY_ENDPOINT): The request contained a statement that should be issued to the /query endpoint.

Examples

Example curl command

curl --http1.1 \
     -X "POST" "http://<ksqldb-host-name>:8088/ksql" \
     -H "Accept: application/vnd.ksql.v1+json" \
     -H "Content-Type: application/json" \
     -d $'{
  "ksql": "LIST STREAMS;",
  "streamsProperties": {}
}'

Example request

POST /ksql HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
  "streamsProperties": {
    "ksql.streams.auto.offset.reset": "earliest"
  }
}

Example response

HTTP/1.1 200 OK
Content-Type: application/vnd.ksql.v1+json

[
  {
    "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';",
    "commandId":"stream/PAGEVIEWS_HOME/create",
    "commandStatus": {
      "status":"SUCCESS",
      "message":"Stream created and running"
    },
    "commandSequenceNumber":10
  },
  {
    "statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
    "commandId":"stream/PAGEVIEWS_ALICE/create",
    "commandStatus": {
      "status":"SUCCESS",
      "message":"Stream created and running"
    },
    "commandSequenceNumber":11
  }
]

Coordinate Multiple Requests

To submit multiple, interdependent requests, there are two options. The first is to submit them as a single request, similar to the example request above:

POST /ksql HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;"
}

The second method is to submit the statements as separate requests and incorporate the interdependency by using commandSequenceNumber. Send the first request:

POST /ksql HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;"
}

Make note of the commandSequenceNumber returned in the response:

HTTP/1.1 200 OK
Content-Type: application/vnd.ksql.v1+json

[
  {
    "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;",
    "commandId":"stream/PAGEVIEWS_HOME/create",
    "commandStatus": {
      "status":"SUCCESS",
      "message":"Stream created and running"
    },
    "commandSequenceNumber":10
  }
]

Provide this commandSequenceNumber as part of the second request, indicating that this request should not execute until after command number 10 has finished executing:

POST /ksql HTTP/1.1
Accept: application/vnd.ksql.v1+json
Content-Type: application/vnd.ksql.v1+json

{
  "ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;",
  "commandSequenceNumber":10
}