HTTP Streaming API in ksqlDB for Confluent Platform¶
Note
- These endpoints are used by the ksqlDB Java client. If you are using Java, you might want to use the Java client rather than using this API directly.
- These endpoints are only available when using HTTP 2.
Executing pull or push queries¶
The request method is a POST.
Send requests to the /query-stream
endpoint.
The body of the request is a JSON object UTF-8 encoded as text, containing the arguments for the operation. Newlines have been added here for the sake of clarity, but the actual JSON must not contain unescaped newlines.
{
"sql": "select * from foo", <----- the SQL of the query to execute
"properties": { <----- Optional properties for the query
"prop1": "val1",
"prop2": "val2"
}
}
The endpoint produces responses with two possible content types:
application/json
and application/vnd.ksqlapi.delimited.v1
. To
specify the content type, set the Accept
header in the request. The
default is application/vnd.ksqlapi.delimited.v1
.
In the case of a successful query, if the content type is
application/vnd.ksqlapi.delimited.v1
, the results are returned as a
header JSON object followed by zero or more JSON arrays that are
delimited by newlines. Newline-delimited formats are easy to parse by
clients and don’t require a streaming JSON parser on the client in the
case that intermediate results need to be output.
{
"queryId", "xyz123", <---- unique ID, provided for push queries only
"columnNames":["col", "col2", "col3"], <---- the names of the columns
"columnTypes":["BIGINT", "STRING", "BOOLEAN"] <---- The types of the columns
}
Followed by zero or more JSON arrays:
[123, "blah", true]
[432, "foo", true]
[765, "whatever", false]
If you prefer to receive the entire response as valid JSON, request the
content type application/json
. In this case you receive the results
as a single JSON array, as shown in the following example. Newlines have
been added for clarity and the response body won’t contain newlines.
[
{
"queryId": "xyz123", <---- unique ID, provided for push queries only
"columnNames":["col", "col2", "col3"], <---- the names of the columns
"columnTypes":["BIGINT", "STRING", "BOOLEAN"] <---- The types of the columns
},
[123, "blah", true],
[432, "foo", true],
[765, "whatever", false]
]
Starting from 0.18, variable substitution can be applied by passing a
map of variables and definitions to the sessionVariables
argument of
the request body and referencing variables by enclosing them in ${}
.
For example, the following requests are equivalent:
{
"sql": "SELECT profileId AS ${name} FROM riderLocations EMIT CHANGES;",
"sessionVariables": {
"name": "user"
}
}
{
"sql": "SELECT profileId AS user FROM riderLocations EMIT CHANGES;"
}
Terminating queries¶
You can terminate push queries explicitly in the client by making a request to this endpoint.
The request method is POST.
Send requests to the /close-query
endpoint.
The body of the request is a JSON object UTF-8 encoded as text, containing the id of the query to close. Newlines have been added here for the sake of clarity but the actual JSON must not contain newlines.
{
"queryId": "xyz123" <----- the ID of the query to terminate
}
Running print statements¶
The /query-stream
endpoint also supports
print statements.
Similar to pull and push queries, the request method is a POST and the request body is a JSON object UTF-8 encoded as text, containing the arguments for the operation. Newlines have been added here for the sake of clarity, but the actual JSON must not contain unescaped newlines.
{
"sql": "print `my-topic-123` limit 3;", <----- the SQL of the query to execute
}
The endpoint only supports ``application/vnd.ksqlapi.delimited.v1`` as
a content type for the print statement. If you specify another content
type in the Accept
header, you’ll receive 406 Not Acceptable.
In the case of a successful query, you should see records from the requested topic delimited by newlines.
rowtime: 2022/12/02 17:22:02.556 Z, key: 18, value: {\"ordertime\":1497014222380,\"orderid\":18,\"itemid\":\"Item_184\",\"address\":{\"city\":\"Mountain View\",\"state\":\"CA\",\"zipcode\":94041}}, partition: 0
rowtime: 2022/12/02 17:22:04.556 Z, key: 19, value: {\"ordertime\":1497014224380,\"orderid\":19,\"itemid\":\"Item_185\",\"address\":{\"city\":\"Mountain View\",\"state\":\"CA\",\"zipcode\":94041}}, partition: 0
rowtime: 2022/12/02 17:22:08.556 Z, key: 20, value: {\"ordertime\":1497014228380,\"orderid\":20,\"itemid\":\"Item_185\",\"address\":{\"city\":\"Mountain View\",\"state\":\"CA\",\"zipcode\":94041}}, partition: 0
Inserting rows into an existing stream¶
This endpoint allows you to insert rows into an existing ksqlDB stream. The stream must have already been created in ksqlDB.
The request method is a POST.
Send requests to the /inserts-stream
endpoint.
The body of the request is a JSON object UTF-8 encoded as text, containing the arguments for the operation. Newlines have been added for clarity, but the actual JSON must not contain newlines.
{
"target": "my-stream" <----- The name of the KSQL stream to insert into
}
The stream name is case insensitive.
Followed by zero or more JSON objects representing the values to insert:
{
"col1" : "val1",
"col2": 2.3,
"col3", true
}
Each JSON object is separated by a newline.
To terminate the insert stream the client must end the request.
An acks is written to the response when each row has been committed successfully to the underlying topic. Rows are committed in the order they are provided. Each ack in the response is a JSON object, separated by newlines:
{"status":"ok","seq":0}
{"status":"ok","seq":2}
{"status":"ok","seq":1}
{"status":"ok","seq":3}
A successful ack contains a status
field with value ok
.
All ack responses also contain a seq
field with a 64-bit signed
integer value. This number corresponds to the sequence of the insert on
the request. The first send has sequence 0
, the second 1
, the
third 2
, etc. It allows the client to correlate the ack to the
corresponding send.
In case of error, an error response (see below) is sent. For an error
response for a send, the seq
field is included.
!!!note
Acks can be returned in a different sequence compared with the order in
which inserts were submitted.
Example curl command¶
curl -X "POST" "http://<ksqldb-host-name>:8088/query-stream" \
-d $'{
"sql": "SELECT * FROM PAGEVIEWS EMIT CHANGES;",
"streamsProperties": {}
}'