Quick Start for Confluent Platform

Use this quick start to get up and running locally with Confluent Platform and its main components using Docker containers.

In this quick start, you create Apache Kafka® topics, use Kafka Connect to generate mock data to those topics, and create ksqlDB streaming queries on those topics. You then go to Confluent Control Center to monitor and analyze the event streaming queries. When you finish, you’ll have a real-time app that consumes and processes data streams by using familiar SQL statements.

Tip

Prerequisites

To run this quick start, you will need Docker and Docker Compose installed on a computer with a supported Operating System. Make sure Docker is running. For full prerequisites, expand the Detailed prerequisites section that follows.

Step 1: Download and start Confluent Platform

In this step, you start by downloading a Docker compose file. The docker-compose.yml file sets ports and Docker environment variables such as the replication factor and listener properties for Confluent Platform and its components. To learn more about the settings in this file, see Docker Image Configuration Reference for Confluent Platform. In addition, the Confluent Platform image specified in the file uses the new Kafka-based KRaft metadata service, which has many benefits. As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. For more information, see KRaft Overview for Confluent Platform.

  1. Download or copy the contents of the Confluent Platform KRaft all-in-one Docker Compose file, for example:

    wget https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.8.0-post/cp-all-in-one-kraft/docker-compose.yml
    

    Note

    To use Apache ZooKeeper as the metadata service for your Kafka cluster, download the Confluent Platform ZooKeeper all-in-one Docker Compose file.

  2. Start the Confluent Platform stack with the -d option to run in detached mode:

    docker compose up -d
    

    Note

    If you using an Docker Compose V1, you need to use a dash in the docker compose commands. For example:

    docker-compose up -d
    

    To learn more, see Migrate to Compose V2.

    Each Confluent Platform component starts in a separate container. Your output should resemble the following. Your output may vary slightly from these examples depending on your operating system.

    Creating broker ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksqldb-server   ... done
    Creating control-center  ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-cli      ... done
    
  3. Verify that the services are up and running:

    docker compose ps
    

    Your output should resemble:

        NAME              IMAGE                                             COMMAND                  SERVICE           CREATED       STATUS         PORTS
    broker            confluentinc/cp-kafka:7.8.0                       "/etc/confluent/dock…"   broker            4 weeks ago   Up 4 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
    connect           cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0   "/etc/confluent/dock…"   connect           4 weeks ago   Up 4 minutes   0.0.0.0:8083->8083/tcp, 9092/tcp
    control-center    confluentinc/cp-enterprise-control-center:7.8.0   "/etc/confluent/dock…"   control-center    4 weeks ago   Up 4 minutes   0.0.0.0:9021->9021/tcp
    ksqldb-cli        confluentinc/cp-ksqldb-cli:7.8.0                  "/bin/sh"                ksqldb-cli        4 weeks ago   Up 4 minutes
    ksqldb-server     confluentinc/cp-ksqldb-server:7.8.0               "/etc/confluent/dock…"   ksqldb-server     4 weeks ago   Up 4 minutes   0.0.0.0:8088->8088/tcp
    rest-proxy        confluentinc/cp-kafka-rest:7.8.0                  "/etc/confluent/dock…"   rest-proxy        4 weeks ago   Up 4 minutes   0.0.0.0:8082->8082/tcp
    schema-registry   confluentinc/cp-schema-registry:7.8.0             "/etc/confluent/dock…"   schema-registry   4 weeks ago   Up 4 minutes   0.0.0.0:8081->8081/tcp
    

    After a few minutes, if the state of any component isn’t Up, run the docker-compose up -d command again, or try docker-compose restart <image-name>, for example:

    docker compose restart control-center
    

Step 2: Create Kafka topics for storing your data

In Confluent Platform, real-time streaming events are stored in a Kafka topic, which is an append-only log, and the fundamental unit of organization for Kafka. To learn more about Kafka basics, see Kafka Introduction.

In this step, you create two topics by using Control Center for Confluent Platform. Control Center provides the features for building and monitoring production data pipelines and event streaming applications.

The topics are named pageviews and users. In later steps, you create connectors that produce data to these topics.

Create the pageviews topic

Confluent Control Center enables creating topics in the UI with a few clicks.

  1. Navigate to Control Center at http://localhost:9021. It may take a minute or two for Control Center to start and load.

  2. Click the controlcenter.cluster tile.

    The Cluster tile in Confluent Control Center
  3. In the navigation menu, click Topics to open the topics list. Click + Add topic to start creating the pageviews topic.

    The Topics page in Confluent Control Center
  4. In the Topic name field, enter pageviews and click Create with defaults. Topic names are case-sensitive.

    Creating a Kafka topic in Confluent Control Center

Create the users topic

Repeat the previous steps to create the users topic.

  1. In the navigation menu, click Topics to open the topics list. Click + Add topic to start creating the users topic.

  2. In the Topic name field, enter users and click Create with defaults.

  3. You can optionally inspect a topic. On the users page, click Configuration to see details about the users topic.

    The Topic Configuration page in Confluent Control Center

Step 3: Generate mock data

In Confluent Platform, you get events from an external source by using Kafka Connect. Connectors enable you to stream large volumes of data to and from your Kafka cluster. Confluent publishes many connectors for integrating with external systems, like MongoDB and Elasticsearch. For more information, see the Kafka Connect Overview page.

In this step, you run the Datagen Source Connector to generate mock data. The mock data is stored in the pageviews and users topics that you created previously. To learn more about installing connectors, see Install Self-Managed Connectors.

  1. In the navigation menu, click Connect.

  2. Click the connect-default cluster in the Connect clusters list.

  3. Click Add connector to start creating a connector for pageviews data.

  4. Select the DatagenConnector tile.

    Tip

    To see source connectors only, click Filter by category and select Sources.

  5. In the Name field, enter datagen-pageviews as the name of the connector.

  6. Enter the following configuration values in the following sections:

    Common section:

    • Key converter class: org.apache.kafka.connect.storage.StringConverter

    General section:

    • kafka.topic: Choose pageviews from the dropdown menu
    • max.interval: 100
    • quickstart: pageviews
  7. Click Next to review the connector configuration. When you’re satisfied with the settings, click Launch.

    Reviewing connector configuration in Confluent Control Center

Run a second instance of the Datagen Source connector connector to produce mock data to the users topic.

  1. In the navigation menu, click Connect.

  2. In the Connect clusters list, click connect-default.

  3. Click Add connector.

  4. Select the DatagenConnector tile.

  5. In the Name field, enter datagen-users as the name of the connector.

  6. Enter the following configuration values:

    Common section:

    • Key converter class: org.apache.kafka.connect.storage.StringConverter

    General section:

    • kafka.topic: Choose users from the dropdown menu
    • max.interval: 1000
    • quickstart: users
  7. Click Next to review the connector configuration. When you’re satisfied with the settings, click Launch.

  8. In the navigation menu, click Topics and in the list, click users.

  9. Click Messages to confirm that the datagen-users connector is producing data to the users topic.

    Incoming messages displayed in the Topics page in Confluent Control Center

Inspect the schema of a topic

By default, the Datagen Source Connector produces data in Avro format, which defines the schemas of pageviews and users messages.

Schema Registry ensures that messages sent to your cluster have the correct schema. For more information, see Schema Registry Documentation.

  1. In the navigation menu, click Topics, and in the topic list, click pageviews.

  2. Click Schema to inspect the Avro schema that applies to pageviews message values.

    Your output should resemble:

    ../_images/qs-schema-ex.png

Step 4: Create a stream and table by using SQL statements

In this step, you create a stream for the pageviews topic and a table for the users topic by using familiar SQL syntax. When you register a stream or a table on a topic, you can use the stream/table in SQL statements.

Note

A stream is a an immutable, append-only collection that represents a series of historical facts, or events. After a row is inserted into a stream, the row can never change. You can append new rows at the end of the stream, but you can’t update or delete existing rows.

A table is a mutable collection that models change over time. It uses row keys to display the most recent data for each key. All but the newest rows for each key are deleted periodically. Also, each row has a timestamp, so you can define a windowed table which enables controlling how to group records that have the same key for stateful operations – like aggregations and joins – into time spans. Windows are tracked by record key.

Together, streams and tables comprise a fully realized database. For more information, see Stream processing.

The SQL engine is implemented in ksqlDB, the purpose-built database for stream processing applications. The following steps show how to create a ksqlDB application, which enables processing real-time data with familiar SQL syntax.

This example app shows these stream processing operations.

  • JOIN the pageviews stream with the users table to create an enriched stream of pageview events.

  • Filter the enriched stream by the region field.

  • Create a windowed view on the filtered stream that shows the most recently updated rows. The window has a SIZE of 30 seconds.

    Tip

    These processing steps are implemented with only three SQL statements.

The following steps show how use the CREATE STREAM and CREATE TABLE statements to register a stream on the pageviews topic and a table on the users topic. Registering a stream or a table on a topic enables SQL queries on the topic’s data.

  1. In the navigation menu, click ksqlDB.

  2. Click the ksqldb1 cluster to open the ksqldb1 page. There are tabs for editing SQL statements and for monitoring the streams and tables that you create.

  3. Copy the following SQL into the editor window. This statement registers a stream, named pageviews_stream, on the pageviews topic. Stream and table names are not case-sensitive.

    CREATE STREAM pageviews_stream
      WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    
    Results from a ksqlDB query in Confluent Control Center
  4. Click Run query to execute the statement. In the result window, your output should resemble:

    ../_images/qs-ksql-query-execute.png
  5. Use a SELECT query to confirm that data is moving through your stream. Copy the following SQL into the editor and click Run query.

    SELECT * FROM pageviews_stream EMIT CHANGES;
    

    Your output should resemble:

    Results from a ksqlDB SELECT query in Confluent Control Center
  6. Click Stop to end the SELECT query.

    Important

    Stopping the SELECT query doesn’t stop data movement through the stream.

  7. Copy the following SQL into the editor window and click Run query. This statement registers a table, named users_table, on the users topic.

    CREATE TABLE users_table (id VARCHAR PRIMARY KEY)
      WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
    

    Your output should resemble:

    ksqlDB create table details

    A table requires you to specify a PRIMARY KEY when you register it. In ksqlDB, a table is similar to tables in other SQL systems: a table has zero or more rows, and each row is identified by its PRIMARY KEY.

Inspect the schemas of your stream and table

Schema Registry is installed with Confluent Platform and is running in the stack, so you don’t need to specify message schemas in your CREATE STREAM and CREATE TABLE statements. For the Avro, JSON_SR, and Protobuf formats, Schema Registry infers schemas automatically.

  1. Click Streams to see the currently registered streams. In the list, click PAGEVIEWS_STREAM to see details about the stream.

    In the Schema section, you can see the field names and types for the message values produced by the datagen-pageviews connector.

    ksqlDB stream details view in Confluent Control Center
  2. Click Tables to see the currently registered tables. In the list, click USERS_TABLE to see details about the table.

    In the Schema section, you can see the field names and types for the message values produced by the datagen-users connector.

    ksqlDB table details view in Confluent Control Center

Create queries to process data

In this step, you write SQL queries that inspect and process pageview and user rows. You can create different kinds of queries.

  • Transient query: a non-persistent, client-side query that you terminate manually or with a LIMIT clause. A transient query doesn’t create a new topic.
  • Persistent query: a server-side query that outputs a new stream or table that’s backed by a new topic. It runs until you issue the TERMINATE statement. The syntax for a persistent query uses the CREATE STREAM AS SELECT or CREATE TABLE AS SELECT statements.
  • Push query: A query that produces results continuously to a subscription. The syntax for a push query uses the EMIT CHANGES keyword. Push queries can be transient or persistent.
  • Pull query: A query that gets a result as of “now”, like a query against a traditional relational database. A pull query runs once and returns the current state of a table. Tables are updated incrementally as new events arrive, so pull queries run with predictably low latency. Pull queries are always transient.

Query for pageviews

  1. Click Editor to return to the query editor.

  2. Copy the following SQL into the editor and click Run query. This statement creates a transient query that returns three rows from pageviews_stream.

    SELECT pageid FROM pageviews_stream EMIT CHANGES LIMIT 3;
    

    Your output should resemble the following. Note that you can click the Card view or Table view icon (table-view-button) to change the layout of the output. This example output shows the Card view.

    Results from a ksqlDB query in Confluent Control Center

Join your stream and table

In this step, you create a stream named user_pageviews by using a persistent query that joins pageviews_stream with users_table on the userid key. This join enriches pageview data with information about the user who viewed the page. The joined rows are written to a new sink topic, which has the same name as the new stream, by default.

Tip

You can specify the name of the sink topic by using the KAFKA_TOPIC keyword in a WITH clause.

The following steps show how to join a stream with a table and view the resulting stream’s output.

  1. Copy the following SQL into the editor and click Run query.

    CREATE STREAM user_pageviews
      AS SELECT users_table.id AS userid, pageid, regionid, gender
        FROM pageviews_stream
        LEFT JOIN users_table ON pageviews_stream.userid = users_table.id
    EMIT CHANGES;
    

    Your output should resemble the following. In the output, you’ll see the query’s internal identifier is CSAS_USER_PAGEVIEWS_5 and is prepended with “CSAS”, which stands for CREATE STREAM AS SELECT. Identifiers for tables are prepended with “CTAS”, which stands for CREATE TABLE AS SELECT.

    Output of creating the user_pageviews stream
  2. Click Streams to open the list of streams that you can access.

  3. Select USER_PAGEVIEWS, and click Query stream.

    The editor opens with a transient SELECT query, and streaming output from the user_pageviews stream displays in the result window. The joined stream has all fields from pageviews_stream and users_table. Your output should resemble the following:

    Results from a ksqlDB join query in Confluent Control Center

    Note

    The query uses the EMIT CHANGES syntax, which indicates that this is a push query. A push query enables you to query a stream or table with a subscription to the results. It continues until you stop it. For more information, see Push queries.

  4. Click Stop to end the transient push query.

Filter a stream

In this step, you create a stream, named pageviews_region_like_89, which is made of user_pageviews rows that have a regionid value that ends with 8 or 9. Results from this query are written to a new topic, named pageviews_filtered_r8_r9. The topic name is specified explicitly in the query by using the KAFKA_TOPIC keyword.

  1. Copy the following SQL into the editor and click Run query.

    CREATE STREAM pageviews_region_like_89
      WITH (KAFKA_TOPIC='pageviews_filtered_r8_r9', VALUE_FORMAT='AVRO')
        AS SELECT * FROM user_pageviews
        WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
    EMIT CHANGES;
    

    Your output should resemble:

    ../_images/qs-ksql-create-stream-region.png
  2. Inspect the filtered output of the pageviews_region_like_89 stream. Copy the following SQL into the editor and click Run query.

    SELECT * FROM pageviews_region_like_89 EMIT CHANGES;
    

    Your output should resemble:

    Results from a ksqlDB filter query in Confluent Control Center

Create a windowed view

In this step, you create a table named pageviews_per_region_89 that counts the number of pageviews from regions 8 and 9 in a tumbling window with a SIZE of 30 seconds. The query result is an aggregation that counts and groups rows, so the result is a table, instead of a stream.

  1. Copy the following SQL into the editor and click Run query.

    CREATE TABLE pageviews_per_region_89 WITH (KEY_FORMAT='JSON')
      AS SELECT userid, gender, regionid, COUNT(*) AS numviews
        FROM pageviews_region_like_89
        WINDOW TUMBLING (SIZE 30 SECOND)
        GROUP BY gender, regionid, userid
        HAVING COUNT(*) > 1
    EMIT CHANGES;
    

    Your output should resemble:

    ../_images/qs-ksql-create-table-pageviews-per-region.png
  2. Inspect the windowed output of the pageviews_per_region_89 table. Copy the following SQL into the editor and click Run query.

    SELECT * FROM pageviews_per_region_89 EMIT CHANGES;
    
  3. Click the table view button (table-view-button).

    Your output should resemble:

    Results from a ksqlDB windowed query in Confluent Control Center

    The NUMVIEWS column shows the count of views in a 30-second window.

Snapshot a table by using a pull query

You can get the current state of a table by using a pull query, which returns rows for a specific key at the time you issue the query. A pull query runs once and terminates.

In the step, you query the pageviews_per_region_89 table for all rows that have User_1 in Region_9.

Copy the following SQL into the editor and click Run query.

SELECT * FROM pageviews_per_region_89
  WHERE userid = 'User_1' AND gender='FEMALE' AND regionid='Region_9';

Your output should resemble:

Results from a ksqlDB pull query in Confluent Control Center

Inspect your streams and tables

  1. In the upper-right corner of the editor, the All available streams and tables pane shows all of the streams and tables that you can access. Click PAGEVIEWS_PER_REGION_89 to see the fields in the pageviews_per_region_89 table.

    All available streams and tables pane in Confluent Control Center
  2. In the All available streams and tables section, click KSQL_PROCESSING_LOG to view the fields in the processing log, including nested data structures. The processing log shows errors that occur when your SQL statements are processed. You can query it like any other stream. For more information, see Processing log

  3. Click Persistent queries to inspect the streams and tables that you’ve created.

    Persistent query details pane in Confluent Control Center

    Use this page to check whether your queries are running, to explain a query, and to terminate running queries.

Step 5: Visualize your app’s stream topology

In the streaming application you’ve built, events flow from the Datagen connectors into the pageviews and users topics. Rows are processed with a join and filtered, and in the final step, rows are aggregated in a table view of the streaming data.

You can see an end-to-end view of the whole system by using Flow view.

  1. Click Flow to open Flow view. Your app’s stream topology appears, showing stream, tables, and the statements you executed to create them.

  2. Click USER_PAGEVIEWS to inspect the joined stream.

    ksqlDB Flow view in Confluent Control Center
  3. Click the other nodes in the graph to inspect the data flowing through your app.

Step 6: Uninstall and clean up

If you’re done working with Confluent Platform, you can stop and remove the Docker containers and images.

  1. Run the following command to stop the Docker containers for Confluent:

    docker-compose stop
    
  2. After stopping the Docker containers, run the following commands to prune the Docker system. Running these commands deletes containers, networks, volumes, and images, freeing up disk space:

    docker system prune -a --volumes --filter "label=io.confluent.docker"
    

    For more information, refer to the official Docker documentation.