Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
KSQL REST API Reference¶
REST Endpoint¶
The default REST API endpoint is http://localhost:8088/
.
Change the server configuration that controls the REST API endpoint by setting
the listeners
parameter in the KSQL server config file. For more info, see
listeners.
Content Types¶
The KSQL REST API uses content types for requests and responses to indicate the
serialization format of the data. Currently, the only serialization format
supported is JSON, specified as application/json
. Your request should
specify this content type in the Accept
header:
Accept: application/json
Here’s an example request that returns the results from the LIST STREAMS
command:
curl -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/json; charset=utf-8" \
-d $'{
"ksql": "LIST STREAMS;",
"streamsProperties": {}
}'
Here’s an example request that retrieves streaming data from TEST_STREAM
:
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM TEST_STREAM;",
"streamsProperties": {}
}'
Errors¶
All API endpoints use a standard error message format for any requests that return an HTTP status indicating an error (any 4xx or 5xx statuses):
HTTP/1.1 <Error Status>
Content-Type: application/json
{
"message": <Error Message>
}
Some endpoints may include additional fields that provide more context for handling the error.
Get the Status of a KSQL Server¶
The /info
resource gives you information about the status of a KSQL
server, which can be useful for health checks and troubleshooting. You can
use the curl
command to query the /info
endpoint:
curl -sX GET "http://localhost:8088/info" | jq '.'
Your output should resemble:
{
"KsqlServerInfo": {
"version": "4.1.3",
"kafkaClusterId": "j3tOi6E_RtO_TMH3gBmK7A",
"ksqlServiceId": "default_"
}
}
Run a KSQL Statement¶
The /ksql
resource runs a sequence of KSQL statements. All statements, except
those starting with SELECT, can be run on this endpoint. To run SELECT
statements use the /query
endpoint.
Note
If you use the SET or UNSET statements to assign query properties by using the REST API, the assignment is scoped only to the current request. In contrast, SET and UNSET assignments in the KSQL CLI persist throughout the CLI session.
-
POST
/ksql
¶ Run a sequence of KSQL statements.
JSON Parameters: - ksql (string) – A semicolon-delimited sequence of KSQL statements to run.
- streamsProperties (map) – Property overrides to run the statements with. Refer to the Config Reference for details on properties that can be set.
- streamsProperties[property-name] (string) – The value of the property named by
property-name
. Both the value andproperty-name
should be strings.
The response JSON is an array of result objects. The result object contents depend on the statement that it is returning results for. The following sections detail the contents of the result objects by statement.
CREATE, DROP, TERMINATE
Response JSON Object: - currentStatus.statementText (string) – The KSQL statement whose result is being returned.
- currentStatus.commandId (string) – A string that identifies the requested operation. You can use this ID to poll the result of the operation using the status endpoint.
- currentStatus.commandStatus.status (string) – One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
- currrentStatus.commandStatus.message (string) – Detailed message regarding the status of the execution statement.
LIST STREAMS, SHOW STREAMS
Response JSON Object: - streams.statementText (string) – The KSQL statement whose result is being returned.
- streams.streams (array) – List of streams.
- streams.streams[i].name (string) – The name of the stream.
- streams.streams[i].topic (string) – The topic backing the stream.
- streams.streams[i].format (string) – The serialization format of the data in the stream. One of JSON, AVRO, or DELIMITED.
LIST TABLES, SHOW TABLES
Response JSON Object: - tables.statementText (string) – The KSQL statement whose result is being returned.
- tables.tables (array) – List of tables.
- tables.tables[i].name (string) – The name of the table.
- tables.tables[i].topic (string) – The topic backing the table.
- tables.tables[i].format (string) – The serialization format of the data in the table. One of JSON, AVRO, or DELIMITED.
LIST QUERIES, SHOW QUERIES
Response JSON Object: - queries.statementText (string) – The KSQL statement whose result is being returned.
- queries.queries (array) – List of queries.
- queries.queries[i].queryString (string) – The text of the statement that started the query.
- queries.queries[i].kafkaTopic (string) – The name of the topic that the query is writing into.
- queries.queries[i].id.id (string) – The query ID.
LIST PROPERTIES, SHOW PROPERTIES
Response JSON Object: - properties.statementText (string) – The KSQL statement whose result is being returned.
- properties.properties (map) – The KSQL server query properties.
- properties.roperties[property-name] (string) – The value of the property named by
property-name
.
DESCRIBE
Response JSON Object: - description.statementText (string) – The KSQL statement whose result is being returned.
- description.name (string) – The name of the stream or table.
- description.readQueries (array) – The id and statement text of the queries reading from the stream or table
- description.writeQueries (array) – The id and statement text of the queries writing into the stream or table
- description.schema (array) – The schema of the stream or table as a list of column names and types.
- description.schema[i].name (string) – The name of the column.
- description.schema[i].type (string) – The data type of the column.
- description.type (string) – STREAM or TABLE
- description.key (string) – The name of the key column.
- description.timestamp (string) – The name of the timestamp column.
- description.serdes (string) – The serialization format of the data in the stream or table. One of JSON, AVRO, or DELIMITED.
- description.kafkaTopic (string) – The topic backing the stream or table.
- description.extended (boolean) – A boolean that indicates whether this is an extended description.
- description.statistics (string) – A string containing statistics about production/consumption to/from the backing topic (extended only).
- description.errorStats (string) – A string containing statistics about errors producing/consuming to/from the backing topic (extended only).
- description.replication (int) – The replication factor of the backing topic (extended only).
- description.partitions (int) – The number of partitions in the backing topic (extended only).
EXPLAIN
Response JSON Object: - description.statementText (string) – The KSQL statement for which the query being explained is running.
- description.name (string) – The KSQL statement for which the query being explained is running.
- description.type (string) – QUERY
- description.serdes (string) – The serialization format of the data in the query’s output topic. One of JSON, AVRO, or DELIMITED.
- description.kafkaTopic (string) – The topic the query is writing into.
- description.statistics (string) – A string containing statistics about production/consumption to/from the topic the query is writing to.
- description.errorStats (string) – A string containing statistics about errors producing/consuming to/from the topic the query is writing to.
- description.replication (int) – The replication factor of the topic the query is writing to.
- description.partitions (int) – The number of partitions in the topis the query is writing to.
Errors
If KSQL fails to execute a statement, it returns a response with a successful status code (200) and writes the error in a result object with the following contents:
Response JSON Object: - error.statementText (string) – The statement for which the error is being reported.
- error.errorMessage.message (string) – Details about the error that was encountered.
Example request
POST /ksql HTTP/1.1 Accept: application/json Content-Type: application/json { "ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice'", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" } }
Example response
HTTP/1.1 200 OK Content-Type: application/json [ { "currentStatus": { "statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';", "commandId":"stream/PAGEVIEWS_HOME/create", "commandStatus": { "status":"SUCCESS", "message":"Stream created and running" } } }, { "currentStatus": { "statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';", "commandId":"stream/PAGEVIEWS_ALICE/create", "commandStatus": { "status":"SUCCESS", "message":"Stream created and running" } } } ]
Run A Query And Stream Back The Output¶
The /query
resource lets you stream the output records of a SELECT
statement via a chunked transfer encoding. The response is streamed back until the LIMIT
specified in the statement is reached, or the client closes the connection. If no LIMIT
is specified in the statement, then the response is streamed until the client closes the connection.
-
POST
/query
¶ Run a
SELECT
statement and stream back the results.JSON Parameters: - ksql (string) – The SELECT statement to run.
- streamsProperties (map) – Property overrides to run the statements with. Refer to the Config Reference for details on properties that can be set.
- streamsProperties[property-name] (string) – The value of the property named by
property-name
. Both the value andproperty-name
should be strings.
Each response chunk is a JSON object with the following format:
Response JSON Object: - row (object) – A single row being returned. This will be null if an error is being returned.
- row.columns (array) – The values contained in the row.
- row.columns[i] (?) – The value contained in a single column for the row. The value type depends on the type of the column.
- errorMessage (string) – If this field is non-null, an error has been encountered while running the statement. No additional rows are returned and the server will end the response. Note that when the limit is reached for a query that specified a limit in the LIMIT clause, the server returns a row with error message “LIMIT reached for the partition.”
Example request
POST /query HTTP/1.1 Accept: application/json Content-Type: application/json { "ksql": "SELECT * FROM pageviews;" "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" } }
Example response
HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked ... {"row":{"columns":[1524760769983,"1",1524760769747,"alice","home"]},"errorMessage":null} ...
Get the Status of a CREATE, DROP, or TERMINATE¶
CREATE, DROP, and TERMINATE statements return an object that indicates the current state of statement execution. A statement can be in one of the following states:
- QUEUED, PARSING, EXECUTING: The statement was accepted by the server and is being processed.
- SUCCESS: The statement was successfully processed.
- ERROR: There was an error processing the statement. The statement was not executed.
- TERMINATED: The query started by the statement was terminated. Only returned for
CREATE STREAM|TABLE AS SELECT
.
If a CREATE, DROP, or TERMINATE statement returns a command status with state QUEUED, PARSING, or EXECUTING from the /ksql
endpoint, you can use the /status
endpoint to poll the status of the command.
-
GET
/status/
(string: commandId)¶ Get the current command status for a CREATE, DROP, or TERMINATE statement.
Parameters: - commandId (string) – The command ID of the statement. This ID is returned by the /ksql endpoint.
Response JSON Object: - status (string) – One of QUEUED, PARSING, EXECUTING, TERMINATED, SUCCESS, or ERROR.
- message (string) – Detailed message regarding the status of the execution statement.
Example request
GET /status/stream/PAGEVIEWS/create HTTP/1.1 Accept: application/json Content-Type: application/json
Example response
HTTP/1.1 200 OK Content-Type application/json { "status": "SUCCESS", "message":"Stream created and running" }