Quick Start with ksqlDB for Confluent Platform

This quick start gets you up and running with ksqlDB for Confluent Platform. The following steps show how to create an Apache Kafka® installation and a ksqlDB Server instance that runs with it.

To get started with hosted ksqlDB in Confluent Cloud, see ksqlDB for Confluent Cloud Quick Start.

In this quick start guide, you perform the following steps to create a simple streaming application that tracks the latest location of simulated riders.

Prerequisites

  • Docker and Docker Compose installed on a computer with a supported Operating System.
  • Make sure Docker is running.
  • For full prerequisites, expand Detailed prerequisites in the following section.

Step 1: Create a docker-compose file

The minimum set of services for running ksqlDB comprises a Kafka broker and ksqlDB Server. The ksqlDB CLI is required for developing applications with SQL code. The following docker-compose file specifies the Docker images that you need for a minimal local environment:

  • confluentinc/cp-kafka
  • confluentinc/cp-ksqldb-server
  • confluentinc/cp-ksqldb-cli
  1. Run the following command to create a file named docker-compose.yml.

    touch docker-compose.yml
    
  2. Copy the following YAML into docker-compose.yml and save the file.

    ---
    version: '2'
    services:
    
    broker:
        image: confluentinc/cp-kafka:7.8.0
        hostname: broker
        container_name: broker
        ports:
        - "9092:9092"
        - "9101:9101"
        environment:
        KAFKA_NODE_ID: 1
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
        KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
        KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
        KAFKA_JMX_PORT: 9101
        KAFKA_JMX_HOSTNAME: localhost
        KAFKA_PROCESS_ROLES: 'broker,controller'
        KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
        KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
        KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
        KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
        KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
        # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
        # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
        CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    
    ksqldb-server:
        image: confluentinc/cp-ksqldb-server:7.8.0
        hostname: ksqldb-server
        container_name: ksqldb-server
        depends_on:
        - broker
        ports:
        - "8088:8088"
        environment:
        KSQL_CONFIG_DIR: "/etc/ksql"
        KSQL_BOOTSTRAP_SERVERS: "broker:29092"
        KSQL_HOST_NAME: ksqldb-server
        KSQL_LISTENERS: "http://0.0.0.0:8088"
        KSQL_CACHE_MAX_BYTES_BUFFERING: 0
        KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
        KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
        KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
        KSQL_KSQL_CONNECT_URL: "http://connect:8083"
        KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
        KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
        KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
    
    ksqldb-cli:
        image: confluentinc/cp-ksqldb-cli:7.8.0
        container_name: ksqldb-cli
        depends_on:
        - broker
        - ksqldb-server
        entrypoint: /bin/sh
        tty: true
    

Step 2: Start the Kafka broker and ksqlDB Server

In the directory where you saved the docker-compose.yml file, run the following command to start the Kafka broker and ksqlDB Server.

docker-compose up

The docker-compose tool starts downloading the Docker images. Downloading may take a few minutes. Once the images are downloaded, the containers are created.

Your output should resemble:

[+] Running 4/4
 ✔ Network ksqldb-quick-start_default  Created                                                                                                    0.0s
 ✔ Container broker                         Created                                                                                                    0.1s
 ✔ Container ksqldb-server                  Created                                                                                                    0.1s
 ✔ Container ksqldb-cli                     Created

The Kafka broker and the ksqlDB Server emit numerous status messages as they start. This behavior is normal.

ksqlDB runs as a server which clients connect to in order to issue queries.

Step 3: Start the ksqlDB CLI

To submit SQL statements to the ksqlDB Server, start the ksqlDB CLI.

  1. Open a new command shell.

  2. Run the following command to start the ksqlDB CLI container.

    docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    

    Your output should resemble:

                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    CLI v7.8.0, Server v7.8.0 located at http://localhost:8088
    Server Status: RUNNING
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
    
    ksql>
    

    The ksql> prompt is where you enter SQL statments that run on the ksqlDB Server.

Step 4: Create a stream

You’re ready to create a stream. A stream associates a schema with an underlying Kafka topic. You use the CREATE STREAM statement to register a stream on a topic. If the topic doesn’t exist yet, ksqlDB creates it on the Kafka broker.

In the ksqlDB CLI, copy the following SQL and press Enter to run the statement.

CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
  WITH (kafka_topic='locations', value_format='json', partitions=1);

Your output should resemble:

 Message
----------------
 Stream created
----------------

Here’s what each parameter in the CREATE STREAM statement does:

  • kafka_topic: Name of the Kafka topic underlying the stream. In this case, it’s created automatically, because it doesn’t exist yet, but you can create streams over topics that exist already.

  • value_format: Encoding of the messages stored in the Kafka topic. For JSON encoding, each row is stored as a JSON object whose keys and values are column names and values, for example:

    {"profileId": "c2309eec", "latitude": 37.7877, "longitude": -122.4205}
    
  • partitions: Number of partitions to create for the locations topic. This parameter is not needed for topics that exist already.

Step 5: Create materialized views

The goal of this quick start is to keep track of the latest location of the riders by using a materialized view.

  • You create a currentLocation table by running a CREATE TABLE AS SELECT statement over the previously created stream. The table is updated incrementally as new rider location data arrives. The LATEST_BY_OFFSET aggregate function specifies that the application is interested only in the latest location of each rider.
  • To add more interest, the application materializes a derived table, named ridersNearMountainView, that captures the distance of riders from a given location or city.
  1. In the ksqlDB CLI, run the following statement to create the currentLocation table.

    -- Create the currentLocation table
    CREATE TABLE currentLocation AS
      SELECT profileId,
             LATEST_BY_OFFSET(latitude) AS la,
             LATEST_BY_OFFSET(longitude) AS lo
      FROM riderlocations
      GROUP BY profileId
      EMIT CHANGES;
    

    Your output should resemble:

     Message
    ----------------------------------------------
     Created query with ID CTAS_CURRENTLOCATION_3
    ----------------------------------------------
    

    The CREATE TABLE AS SELECT statement created a persistent query, named CTAS_CURRENTLOCATION_3, that runs continuously on the ksqlDB server. The query’s name is prepended with “CTAS”, which stands for “CREATE TABLE AS SELECT”.

  2. Run the following statement to create the ridersNearMountainView table.

    -- Create the ridersNearMountainView table
    CREATE TABLE ridersNearMountainView AS
      SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
             COLLECT_LIST(profileId) AS riders,
             COUNT(*) AS count
      FROM currentLocation
     GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
    

    Your output should resemble:

     Message
    -----------------------------------------------------
     Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
    -----------------------------------------------------
    

Step 6: Run a push query over the stream

In this step, you create a push query that filters rows in the riderLocations stream.

This query outputs all rows from the riderLocations stream that have coordinates within 5 miles of Mountain View, California.

This query never returns, until it’s terminated explicitly. It pushes output rows perpetually to the client as events are written to the riderLocations stream.

In the ksqlDB CLI, run the following statement to create a push query that filters rows in the riderLocations stream.

-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;

Your output should resemble:

+--------------------+-------------------+--------------------+
|PROFILEID           |LATITUDE           |LONGITUDE           |
+--------------------+-------------------+--------------------+

Press CTRL-C to interrupt

For now, leave this query running in the CLI session. In the next step, you write some mock data into the riderLocations stream so that the query produces output.

Step 7: Populate the stream with events

Because the CLI session from the previous step is busy waiting for output from the push query, start another session that you can use to write some data into ksqlDB.

  1. Open a new command shell.

  2. Run the following command to start another ksqlDB CLI instance.

    docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    
  3. In the new CLI session, run the following INSERT statements.

    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
    INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
    
  4. Open the previous command shell to view the output from the push query that you started in Step 6.

    Your output should resemble:

    +--------------------+-------------------+--------------------+
    |PROFILEID           |LATITUDE           |LONGITUDE           |
    +--------------------+-------------------+--------------------+
    |4ab5cbad            |37.3952            |-122.0813           |
    |8b6eae59            |37.3944            |-122.0813           |
    |4a7c7b41            |37.4049            |-122.0822           |
    
    Press CTRL-C to interrupt
    

    Only the rows that have riders who are within 5 miles of Mountain View are displayed.

Step 8: Run a pull query against the materialized view

In this step, you run a pull query against the materialized view to retrieve all of the riders who are currently within 10 miles of Mountain View.

In contrast to the previous push query, which runs continuously, the pull query follows a traditional request-response model and retrieves the latest result from the materialized view.

In the second ksqlDB CLI, run the following statement.

SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;

Your output should resemble:

+--------------------+-------------------------------+------+
|DISTANCEINMILES     |RIDERS                         |COUNT |
+--------------------+-------------------------------+------+
|0.0                 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3     |
|10.0                |[18f4ea86]                     |1     |
Query terminated

Step 9: Clean up

When you’re done with the quick start, close the ksqlDB CLI instances and stop the ksqlDB Server and Kafka broker.

  1. In the second ksqlDB CLI instance, type “exit” and press enter.

    Your output should resemble:

    Exiting ksqlDB.
    
  2. In the ksqlDB CLI instance that’s running the push query, press Ctrl-C to end the push query.

    Your output should resemble:

    ^CQuery terminated
    
  3. Type “exit” and press enter to close the first ksqlDB CLI instance.

  4. Run the following command to stop the running containers.

    docker-compose down
    

    Your output should resemble:

    [+] Running 4/4
     ✔ Container ksqldb-cli                     Removed
     ✔ Container ksqldb-server                  Removed
     ✔ Container broker                         Removed
     ✔ Network test-ksqldb-quick-start_default  Removed