Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Quick Start using Community Components (Docker)

This quick start demonstrates how to get up and running using Confluent Platform using only Confluent Community components using Docker containers. It demonstrates the basic and most powerful capabilities, including creating topics, adding and modifying data, and stream processing by using KSQL. In this quick start you will create Apache Kafka® topics and streaming queries on these topics by using KSQL.

This quick start leverages the Confluent Platform CLI, the Kafka CLI, and the KSQL CLI. For a rich UI-based experience, try out the Confluent Platform quick start with commercial components.

Prerequisites:
  • Docker:
    • Docker version 1.11 or later is installed and running.
    • Docker Compose is installed. Docker Compose is installed by default with Docker for Mac.
    • Docker memory is allocated minimally at 8 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 8 GB in Docker > Preferences > Advanced.
  • Git.
  • Internet connectivity.
  • Ensure you are on an Operating System currently supported by Confluent Platform.
  • wget to get the connector configuration file.

Step 1: Download and Start Confluent Platform Using Docker

  1. Clone the Confluent Platform Docker Images GitHub Repository and check out the 5.1.4-post branch.

    git clone https://github.com/confluentinc/cp-docker-images
    cd cp-docker-images
    git checkout 5.1.4-post
    
  2. Navigate to cp-all-in-one examples directory.

    cd examples/cp-all-in-one/
    
  3. Start Confluent Platform specifying two options: (-d) to run in detached mode and (--build) to build the Kafka Connect image with the source connector kafka-connect-datagen from Confluent Hub.

    Important

    You must allocate a minimum of 8 GB of Docker memory resource. The default memory allocation on Docker Desktop for Mac is 2 GB and must be changed.

    docker-compose up -d --build
    

    This starts Confluent Platform with separate containers for all Confluent Platform components. Your output should resemble the following:

    Creating network "cp-all-in-one_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksql-server     ... done
    Creating control-center  ... done
    Creating ksql-cli        ... done
    
  4. Optional: Run this command to verify that the services are up and running.

    docker-compose ps
    

    You should see the following:

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp
    ksql-cli          ksql http://localhost:8088       Up
    ksql-datagen      bash -c echo Waiting for K ...   Up
    ksql-server       /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    If the state is not Up, rerun the docker-compose up -d command.

Step 2: Create Kafka Topics

In this step Kafka topics are created in Confluent Platform by using the Kafka CLI.

  1. Run this command to create a topic named users.

    docker-compose exec broker kafka-topics --create --zookeeper \
    zookeeper:2181 --replication-factor 1 --partitions 1 --topic users
    

    Your output should resemble:

    Created topic "users".
    
  2. Run this command to create a topic named pageviews.

    docker-compose exec broker kafka-topics --create --zookeeper \
    zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews
    

    Your output should resemble:

    Created topic "pageviews".
    

Step 3: Install a Kafka Connector and Generate Sample Data

In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen that creates sample data for the Kafka topics pageviews and users.

  1. Run one instance of the Kafka Connect Datagen connector to produce Kafka data to the pageviews topic in AVRO format.

    wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config
    curl -X POST -H "Content-Type: application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
    
  2. Run another instance of the Kafka Connect Datagen connector to produce Kafka data to the users topic in AVRO format.

    wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config
    curl -X POST -H "Content-Type: application/json" --data @connector_users_cos.config http://localhost:8083/connectors
    

Tip

The Kafka Connect Datagen connector was installed automatically when you started Docker Compose with the --build argument in Step 1: Download and Start Confluent Platform Using Docker. If you encounter issues with the Datagen Connector, refer to the Issue: Cannot locate the Datagen Connector in the Troubleshooting section.

Step 4: Create and Write to a Stream and Table using KSQL

In this step, KSQL queries are run on the pageviews and users topics that were created in the previous step. The following KSQL commands are run from the KSQL CLI. Enter these commands in your terminal and press Enter.

Important

  • Confluent Platform must be installed and running.
  • To try out the preview KSQL web interface, see the Confluent Platform quick start with commercial components.
  • All KSQL commands must end with a closing semicolon (;).

Create Streams and Tables

  1. Start the KSQL CLI in your terminal with this command.

    docker-compose exec ksql-cli ksql http://ksql-server:8088
    

    Important

    By default KSQL attempts to store its logs in a directory called logs that is relative to the location of the ksql executable. For example, if ksql is installed at /usr/local/bin/ksql, then it would attempt to store its logs in /usr/local/logs. If you are running ksql from the default Confluent Platform location, <path-to-confluent>/bin, you must override this default behavior by using the LOG_DIR variable.

  2. Create a stream pageviews from the Kafka topic pageviews, specifying the value_format of AVRO.

    CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \
    WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
    

    Tip: Enter the SHOW STREAMS; command to view your streams. For example:

     Stream Name      | Kafka Topic      | Format
    -------------------------------------------------
     PAGEVIEWS        | pageviews        | AVRO
    -------------------------------------------------
    
  3. Create a table users with several columns from the Kafka topic users, with the value_format of AVRO.

    CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,  \
    userid VARCHAR) \
    WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');
    

    Tip: Enter the SHOW TABLES; query to view your tables.

     Table Name        | Kafka Topic       | Format    | Windowed
    --------------------------------------------------------------
     USERS             | users             | AVRO      | false
    --------------------------------------------------------------
    

Write Queries

These examples write queries using KSQL. The following KSQL commands are run from the KSQL CLI. Enter these commands in your terminal and press Enter.

  1. Add the custom query property earliest for the auto.offset.reset parameter. This instructs KSQL queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the KSQL Configuration Parameter Reference.

    SET 'auto.offset.reset'='earliest';
    

    Your output should resemble:

    Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
    
  2. Create a query that returns data from a stream with the results limited to three rows.

    SELECT pageid FROM pageviews LIMIT 3;
    

    Your output should resemble:

    Page_45
    Page_38
    Page_11
    LIMIT reached for the partition.
    Query terminated
    
  3. Create a persistent query that filters for female users. The results from this query are written to the Kafka PAGEVIEWS_FEMALE topic. This query enriches the pageviews STREAM by doing a LEFT JOIN with the users TABLE on the user ID, where a condition (gender = 'FEMALE') is met.

    CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, \
    regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \
    WHERE gender = 'FEMALE';
    

    Your output should resemble:

     Message
    ----------------------------
     Stream created and running
    ----------------------------
    
  4. Create a persistent query where a condition (regionid) is met, using LIKE. Results from this query are written to a Kafka topic named pageviews_enriched_r8_r9.

    CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \
    value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    

    Your output should resemble:

     Message
    ----------------------------
     Stream created and running
    ----------------------------
    
  5. Create a persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called PAGEVIEWS_REGIONS.

    CREATE TABLE pageviews_regions AS SELECT gender, regionid , \
    COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) \
    GROUP BY gender, regionid HAVING COUNT(*) > 1;
    

    Your output should resemble:

     Message
    ---------------------------
     Table created and running
    ---------------------------
    

Step 5: Monitor Streaming Data

Now that your streams are running you can monitor them.

  • View the details for your stream or table with the DESCRIBE EXTENDED command. For example, run this command to view the pageviews_female_like_89 stream:

    DESCRIBE EXTENDED pageviews_female_like_89;
    

    Your output should look like this:

    Type                 : STREAM
    Key field            : PAGEVIEWS.USERID
    Timestamp field      : Not set - using <ROWTIME>
    Key format           : STRING
    Value format         : AVRO
    Kafka output topic   : pageviews_enriched_r8_r9 (partitions: 4, replication: 1)
    
     Field    | Type
    --------------------------------------
     ROWTIME  | BIGINT           (system)
     ROWKEY   | VARCHAR(STRING)  (system)
     USERID   | VARCHAR(STRING)  (key)
     PAGEID   | VARCHAR(STRING)
     REGIONID | VARCHAR(STRING)
     GENDER   | VARCHAR(STRING)
    --------------------------------------
    
    Queries that write into this STREAM
    -----------------------------------
    id:CSAS_PAGEVIEWS_FEMALE_LIKE_89 - CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
    
    For query topology and execution plan please run: EXPLAIN <QueryId>
    
    Local runtime statistics
    ------------------------
    messages-per-sec:      2.01   total-messages:     10515     last-message: 3/14/18 2:25:40 PM PDT
     failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
    (Statistics of the local KSQL server interaction with the Kafka topic pageviews_enriched_r8_r9)
    
  • Discover the query execution plan with the EXPLAIN command. For example, run this command to view the query execution plan for CTAS_PAGEVIEWS_REGIONS:

    EXPLAIN CTAS_PAGEVIEWS_REGIONS;
    

    Your should look like this:

    Type                 : QUERY
    SQL                  : CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
    
    
    Local runtime statistics
    ------------------------
    messages-per-sec:      1.42   total-messages:     13871     last-message: 3/14/18 2:50:02 PM PDT
     failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
    (Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)
    
    Execution plan
    --------------
     > [ PROJECT ] Schema: [GENDER : STRING , REGIONID : STRING , NUMUSERS : INT64].
             > [ FILTER ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64 , KSQL_AGG_VARIABLE_0 : INT64 , KSQL_AGG_VARIABLE_1 : INT64].
                     > [ AGGREGATE ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64 , KSQL_AGG_VARIABLE_0 : INT64 , KSQL_AGG_VARIABLE_1 : INT64].
                             > [ PROJECT ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64].
                                     > [ SOURCE ] Schema: [PAGEVIEWS_FEMALE.ROWTIME : INT64 , PAGEVIEWS_FEMALE.ROWKEY : STRING , PAGEVIEWS_FEMALE.USERID : STRING , PAGEVIEWS_FEMALE.PAGEID : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.GENDER : STRING].
    
    
    Processing topology
    -------------------
    Topologies:
       Sub-topology: 0
        Source: KSTREAM-SOURCE-0000000000 (topics: [PAGEVIEWS_FEMALE])
          --> KSTREAM-MAPVALUES-0000000001
        Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
          --> KSTREAM-TRANSFORMVALUES-0000000002
          <-- KSTREAM-SOURCE-0000000000
        ...
    
      Sub-topology: 1
        Source: KSTREAM-SOURCE-0000000008 (topics: [KSQL_Agg_Query_1521052072079-repartition])
          --> KSTREAM-AGGREGATE-0000000005
        Processor: KSTREAM-AGGREGATE-0000000005 (stores: [KSQL_Agg_Query_1521052072079])
          --> KTABLE-FILTER-0000000009
          <-- KSTREAM-SOURCE-0000000008
        ...
    

For more information about KSQL syntax, see KSQL Syntax Reference.

Step 6: Stop Docker

When you are done working with Docker, you can stop and remove Docker containers and images.

  1. View a list of all Docker container IDs.

    docker container ls -aq
    
  2. Run the following command to stop the Docker containers for Confluent:

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
    
  3. Run the following commands to stop the containers and prune the Docker system. Running these commands deletes containers, networks, volumes, and images; freeing up disk space:

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker") && docker system prune -a -f --volumes
    

    Tip

    Remove the filter label for Confluent Docker (-f "label=io.confluent.docker") to clear all Docker containers from your system.

You can rebuild and restart the containers at any time using the docker-compose up -d --build command.

For more information, refer to the official Docker documentation.

Troubleshooting

If you encountered any issues, review the following resolutions before trying the steps again.

Issue: Cannot locate the Datagen Connector

Resolution: Make sure to run the docker-compose command with the --build option:

docker-compose up -d --build

For details, see Step 1: Download and Start Confluent Platform Using Docker.

Resolution: Run the build command just for connect.

docker-compose build --no-cache connect

Your output should resemble:

Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest

Resolution: Check the Connect log for Datagen.

docker-compose logs connect | grep -i Datagen

Your output should resemble:

connect  | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Resolution: Check the Connect log for a warning and reminder to run the docker-compose up -d --build command properly.

docker-compose logs connect | grep -i Datagen

If the following warning is present, re-run the docker-compose up -d --build command:

connect  | WARNING: Did not find directory for kafka-connect-datagen (did you
remember to run: docker-compose up -d --build ?)

Resolution: Verify the .jar file for kafka-connect-datagen has been added and is present in the lib subfolder.

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

Your output should resemble:

...
kafka-connect-datagen-0.1.0.jar
...

Resolution: Verify the plugin exists in the connector path.

docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'

Your output should resemble:

/usr/share/java,/usr/share/confluent-hub-components

Confirm its contents are present:

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen

Your output should resemble:

assets   doc  etc  lib  manifest.json

Issue: Stream-Stream joins error

An error states Stream-Stream joins must have a WITHIN clause specified. This error can occur if you created streams for both pageviews and users by mistake.

Resolution: Ensure that you created a stream for pageviews, and a table for users in Step 4: Create and Write to a Stream and Table using KSQL.

Issue: Unable to successfully complete KSQL query steps

Java errors or other severe errors were encountered.

Resolution: Ensure you are on an Operating System currently supported by Confluent Platform.

Resolution: Ensure that the Docker memory was increased to 8 MB. Go to Docker > Preferences > Advanced. If Docker memory is insufficient, other unpredictable issues could occur.

KSQL errors were encountered.

Resolution: Review the help in the KSQL terminal for successful command tips and links to more documentation.

ksql> help

Next Steps

Learn more about the components shown in this quick start: