Quick Start with ksqlDB for Confluent Platform¶
This quick start gets you up and running with ksqlDB for Confluent Platform. The following steps show how to create an Apache Kafka® installation and a ksqlDB Server instance that runs with it.
To get started with hosted ksqlDB in Confluent Cloud, see ksqlDB for Confluent Cloud Quick Start.
In this quick start guide, you perform the following steps to create a simple streaming application that tracks the latest location of simulated riders.
- Step 1: Create a docker-compose file
- Step 2: Start the Kafka broker and ksqlDB Server
- Step 3: Start the ksqlDB CLI
- Step 4: Create a stream
- Step 5: Create materialized views
- Step 6: Run a push query over the stream
- Step 7: Populate the stream with events
- Step 8: Run a pull query against the materialized view
- Step 9: Clean up
Prerequisites¶
- Docker and Docker Compose installed on a computer with a supported Operating System.
- Make sure Docker is running.
- For full prerequisites, expand Detailed prerequisites in the following section.
Detailed prerequisites
A connection to the internet.
Operating System currently supported by Confluent Platform.
Note
You can run the ksqlDB Quick Start on Windows if you are running Docker Desktop for Windows on WSL 2. For more information, see How to Run Confluent on Windows in Minutes.
Docker version 1.11 or later is installed and running.
On Mac: Docker memory is allocated minimally at 6 GB (Mac). When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. Change the default allocation to 6 GB in the Docker Desktop app by navigating to Preferences > Resources > Advanced.
Networking and Kafka on Docker: Configure your hosts and ports to allow both internal and external components to the Docker network to communicate. For more information, see Kafka Listeners Explained.
Step 1: Create a docker-compose file¶
The minimum set of services for running ksqlDB comprises a Kafka broker
and ksqlDB Server. The ksqlDB CLI is required for developing applications
with SQL code. The following docker-compose
file specifies the Docker
images that you need for a minimal local environment:
- confluentinc/cp-kafka
- confluentinc/cp-ksqldb-server
- confluentinc/cp-ksqldb-cli
Run the following command to create a file named
docker-compose.yml
.touch docker-compose.yml
Copy the following YAML into docker-compose.yml and save the file.
--- version: '2' services: broker: image: confluentinc/cp-kafka:7.8.0 hostname: broker container_name: broker ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' ksqldb-server: image: confluentinc/cp-ksqldb-server:7.8.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.8.0 container_name: ksqldb-cli depends_on: - broker - ksqldb-server entrypoint: /bin/sh tty: true
Step 2: Start the Kafka broker and ksqlDB Server¶
In the directory where you saved the docker-compose.yml
file, run the
following command to start the Kafka broker and ksqlDB Server.
docker-compose up
The docker-compose
tool starts downloading the Docker images.
Downloading may take a few minutes. Once the images are downloaded,
the containers are created.
Your output should resemble:
[+] Running 4/4
✔ Network ksqldb-quick-start_default Created 0.0s
✔ Container broker Created 0.1s
✔ Container ksqldb-server Created 0.1s
✔ Container ksqldb-cli Created
The Kafka broker and the ksqlDB Server emit numerous status messages as they start. This behavior is normal.
ksqlDB runs as a server which clients connect to in order to issue queries.
Step 3: Start the ksqlDB CLI¶
To submit SQL statements to the ksqlDB Server, start the ksqlDB CLI.
Open a new command shell.
Run the following command to start the ksqlDB CLI container.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Your output should resemble:
=========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. CLI v7.8.0, Server v7.8.0 located at http://localhost:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
The
ksql>
prompt is where you enter SQL statments that run on the ksqlDB Server.
Step 4: Create a stream¶
You’re ready to create a stream. A stream associates a schema with an underlying Kafka topic. You use the CREATE STREAM statement to register a stream on a topic. If the topic doesn’t exist yet, ksqlDB creates it on the Kafka broker.
In the ksqlDB CLI, copy the following SQL and press Enter to run the statement.
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
Your output should resemble:
Message
----------------
Stream created
----------------
Here’s what each parameter in the CREATE STREAM statement does:
kafka_topic
: Name of the Kafka topic underlying the stream. In this case, it’s created automatically, because it doesn’t exist yet, but you can create streams over topics that exist already.value_format
: Encoding of the messages stored in the Kafka topic. For JSON encoding, each row is stored as a JSON object whose keys and values are column names and values, for example:{"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
partitions
: Number of partitions to create for thelocations
topic. This parameter is not needed for topics that exist already.
Step 5: Create materialized views¶
The goal of this quick start is to keep track of the latest location of the riders by using a materialized view.
- You create a
currentLocation
table by running a CREATE TABLE AS SELECT statement over the previously created stream. The table is updated incrementally as new rider location data arrives. The LATEST_BY_OFFSET aggregate function specifies that the application is interested only in the latest location of each rider. - To add more interest, the application materializes a derived table, named
ridersNearMountainView
, that captures the distance of riders from a given location or city.
In the ksqlDB CLI, run the following statement to create the
currentLocation
table.-- Create the currentLocation table CREATE TABLE currentLocation AS SELECT profileId, LATEST_BY_OFFSET(latitude) AS la, LATEST_BY_OFFSET(longitude) AS lo FROM riderlocations GROUP BY profileId EMIT CHANGES;
Your output should resemble:
Message ---------------------------------------------- Created query with ID CTAS_CURRENTLOCATION_3 ----------------------------------------------
The CREATE TABLE AS SELECT statement created a persistent query, named
CTAS_CURRENTLOCATION_3
, that runs continuously on the ksqlDB server. The query’s name is prepended with “CTAS”, which stands for “CREATE TABLE AS SELECT”.Run the following statement to create the
ridersNearMountainView
table.-- Create the ridersNearMountainView table CREATE TABLE ridersNearMountainView AS SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders, COUNT(*) AS count FROM currentLocation GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Your output should resemble:
Message ----------------------------------------------------- Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5 -----------------------------------------------------
Step 6: Run a push query over the stream¶
In this step, you create a
push query that filters rows in
the riderLocations
stream.
This query outputs all rows from the riderLocations
stream that have
coordinates within 5 miles of Mountain View, California.
This query never returns, until it’s terminated explicitly. It pushes output
rows perpetually to the client as events are written to the riderLocations
stream.
In the ksqlDB CLI, run the following statement to create a push query that
filters rows in the riderLocations
stream.
-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
Your output should resemble:
+--------------------+-------------------+--------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+--------------------+-------------------+--------------------+
Press CTRL-C to interrupt
For now, leave this query running in the CLI session. In the next step, you
write some mock data into the riderLocations
stream so that the query
produces output.
Step 7: Populate the stream with events¶
Because the CLI session from the previous step is busy waiting for output from the push query, start another session that you can use to write some data into ksqlDB.
Open a new command shell.
Run the following command to start another ksqlDB CLI instance.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
In the new CLI session, run the following INSERT statements.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
Open the previous command shell to view the output from the push query that you started in Step 6.
Your output should resemble:
+--------------------+-------------------+--------------------+ |PROFILEID |LATITUDE |LONGITUDE | +--------------------+-------------------+--------------------+ |4ab5cbad |37.3952 |-122.0813 | |8b6eae59 |37.3944 |-122.0813 | |4a7c7b41 |37.4049 |-122.0822 | Press CTRL-C to interrupt
Only the rows that have riders who are within 5 miles of Mountain View are displayed.
Step 8: Run a pull query against the materialized view¶
In this step, you run a pull query against the materialized view to retrieve all of the riders who are currently within 10 miles of Mountain View.
In contrast to the previous push query, which runs continuously, the pull query follows a traditional request-response model and retrieves the latest result from the materialized view.
In the second ksqlDB CLI, run the following statement.
SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
Your output should resemble:
+--------------------+-------------------------------+------+
|DISTANCEINMILES |RIDERS |COUNT |
+--------------------+-------------------------------+------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
Query terminated
Step 9: Clean up¶
When you’re done with the quick start, close the ksqlDB CLI instances and stop the ksqlDB Server and Kafka broker.
In the second ksqlDB CLI instance, type “exit” and press enter.
Your output should resemble:
Exiting ksqlDB.
In the ksqlDB CLI instance that’s running the push query, press Ctrl-C to end the push query.
Your output should resemble:
^CQuery terminated
Type “exit” and press enter to close the first ksqlDB CLI instance.
Run the following command to stop the running containers.
docker-compose down
Your output should resemble:
[+] Running 4/4 ✔ Container ksqldb-cli Removed ✔ Container ksqldb-server Removed ✔ Container broker Removed ✔ Network test-ksqldb-quick-start_default Removed