Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Query With Structured Data¶
KSQL enables querying structured, or nested, data, by using the STRUCT data type.
You use familiar syntax to declare and access structured data, like
mystruct STRUCT<fieldName1 type1, fieldName2 type2>
and
mystruct->fieldName1
.
The following example shows how to create a KSQL stream from an Apache Kafka® topic that has structured data. Also, it shows how to run queries to access the structured data.
- Set up the KSQL environment.
- Use the kafkacat utility to create and populate a new topic, named
raw-topic
. - Create a stream on the topic that models the topic’s data.
- Inspect the stream to ensure that the data model matches the topic.
- Query the stream to access the structured data.
Set up the KSQL Environment¶
To set up KSQL, follow the first three steps in Writing Streaming Queries Against Apache Kafka® Using KSQL (Docker), or if you have git and Docker installed already, run the following commands:
# Step 1
git clone https://github.com/confluentinc/ksql.git
cd ksql
# Step 2
git checkout 5.1.4-post
# Step 3
cd docs/tutorials/
docker-compose up -d
After all of the Docker images are pulled, confirm that the KSQL and Kafka containers are running:
docker-compose ps
Your output should resemble:
Name Command State Ports
----------------------------------------------------------------------------------------------------
tutorials_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:39092->39092/tcp, 9092/tcp
tutorials_ksql-server_1 /etc/confluent/docker/run Up 8088/tcp
tutorials_schema-registry_1 /etc/confluent/docker/run Up 8081/tcp
tutorials_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
The KSQL environment is ready for you to develop real-time streaming applications.
Create and Populate a New Topic With Structured Data¶
Use the kafkacat utility to create and populate a new topic, named
raw-topic
, with some records that have nested data. The records are
formatted as JSON arrays. For more information, see kafkacat Utility.
docker run --interactive --rm --network tutorials_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:39092 \
-t raw-topic \
-K: \
-P <<EOF
1:{"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
2:{"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
3:{"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
4:{"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.4","field-a":3,"field-c":22,"field-d":"updated-value-for-key2"}}
EOF
The nested structure is named data
and has five fields:
timestamp
, a stringfield-a
, an integerfield-b
, a stringfield-c
, an integerfield-d
, a string
In the following KSQL queries, the data
structure is modeled by using the
STRUCT type:
DATA STRUCT<timestamp VARCHAR, "field-a" INT, "field-b" VARCHAR, "field-c" INT, "field-d" VARCHAR>
Double-quotes are necessary for the fieldnames that contain the -
character.
Note
Properties
is not a valid field name.
Create a Stream With Structured Data¶
Start the KSQL CLI:
docker run --network tutorials_default --rm --interactive --tty \
confluentinc/cp-ksql-cli:5.1.4 \
http://ksql-server:8088
In the KSQL CLI, ensure that raw-topic
is available:
SHOW TOPICS;
Your output should resemble:
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
raw-topic | false | 1 | 1 | 0 | 0
------------------------------------------------------------------------------------------------
Inspect raw-topic
to ensure that kafkacat populated it:
PRINT 'raw-topic' FROM BEGINNING;
Your output should resemble:
Format:JSON
{"ROWTIME":1544042630406,"ROWKEY":"1","type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
{"ROWTIME":1544042630406,"ROWKEY":"2","type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
{"ROWTIME":1544042630406,"ROWKEY":"3","type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
{"ROWTIME":1544042630406,"ROWKEY":"4","type":"key2","data":{"timestamp":"2018-12-21 23:58:42.4","field-a":3,"field-c":22,"field-d":"updated-value-for-key2"}}
^CTopic printing ceased
Press Ctrl+C to stop printing the topic.
Run the following CREATE STREAM statement to register the topic with KSQL:
CREATE STREAM T (TYPE VARCHAR, \
DATA STRUCT< \
timestamp VARCHAR, \
"field-a" INT, \
"field-b" VARCHAR, \
"field-c" INT, \
"field-d" VARCHAR>) \
WITH (KAFKA_TOPIC='raw-topic',\
VALUE_FORMAT='JSON');
Your output should resemble:
Message
----------------
Stream created
----------------
Run KSQL Queries to Access the Structured Data¶
Run the following command to tell KSQL to read from the beginning of the topic:
SET 'auto.offset.reset' = 'earliest';
Run a SELECT query to inspect the T
stream:
SELECT * FROM T;
Your output should resemble:
1544042630406 | 1 | key1 | {TIMESTAMP=2018-12-21 23:58:42.1, field-a=1, field-b=first-value-for-key1, field-c=null, field-d=null}
1544042630406 | 2 | key2 | {TIMESTAMP=2018-12-21 23:58:42.2, field-a=1, field-b=null, field-c=11, field-d=first-value-for-key2}
1544042630406 | 3 | key1 | {TIMESTAMP=2018-12-21 23:58:42.3, field-a=2, field-b=updated-value-for-key1, field-c=null, field-d=null}
1544042630406 | 4 | key2 | {TIMESTAMP=2018-12-21 23:58:42.4, field-a=3, field-b=null, field-c=22, field-d=updated-value-for-key2}
^CQuery terminated
Press Ctrl+C to cancel the SELECT query.
Note
KSQL assigns null
to the fields that were omitted when kafkacat
populated raw-topic
, like field-c
and field-d
in record
key1
.
Query field-a
and field-b
by using the ->
operator to access the
nested elements:
SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' LIMIT 2;
Your output should resemble:
1 | first-value-for-key1
2 | updated-value-for-key1
Limit Reached
Query terminated
Query the other nested elements:
SELECT DATA->"field-a", DATA->"field-c", DATA->"field-d" FROM T WHERE TYPE='key2' LIMIT 2;
Your output should resemble:
1 | 11 | first-value-for-key2
3 | 22 | updated-value-for-key2
Limit Reached
Query terminated
Create persistent queries based on the previous SELECT statements. In this example, two different queries are used to separate the input data into two new streams.
CREATE STREAM TYPE_1 AS SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1';
CREATE STREAM TYPE_2 AS SELECT DATA->"field-a", DATA->"field-c",DATA->"field-d" FROM T2 WHERE TYPE='key2';
For both statements, your output should resemble:
Message
----------------------------
Stream created and running
----------------------------
Inspect the schema of the TYPE_1
stream:
DESCRIBE TYPE_1;
Your output should resemble:
Name : TYPE_1
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__field-a | INTEGER
DATA__field-b | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Inspect the schema of the TYPE_2
stream:
DESCRIBE TYPE_2;
Your output should resemble:
Name : TYPE_2
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__field-a | INTEGER
DATA__field-c | INTEGER
DATA__field-d | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;