Quick Start for Apache Kafka using Confluent Platform (Docker)

Use this quick start to get up and running with Confluent Platform and its main components using Docker containers. This quick start uses Confluent Control Center included in Confluent Platform for topic management and event stream processing using ksqlDB.

In this quick start, you create Apache Kafka® topics, use Kafka Connect to generate mock data to those topics, and create ksqlDB streaming queries on those topics. You then go to Control Center to monitor and analyze the event streaming queries.

See also

You can also run an automated version of this quick start designed for Confluent Platform local installs.

Prerequisites:
  • Docker
    • Docker version 1.11 or later is installed and running.
    • Docker Compose is installed. Docker Compose is installed by default with Docker for Mac.
    • Docker memory is allocated minimally at 8 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 8 GB in Docker. Navigate to Preferences > Resources > Advanced.
  • Git
  • Internet connectivity
  • Operating System currently supported by Confluent Platform
  • Networking and Kafka on Docker

Step 1: Download and Start Confluent Platform Using Docker

  1. Clone the confluentinc/cp-all-in-one GitHub repository.

  2. Check out the 6.0.0-post branch:

    cd cp-all-in-one
    
    git checkout 6.0.0-post
    
  3. Navigate to the cp-all-in-one/cp-all-in-one directory:

    cd cp-all-in-one
    
  4. Start Confluent Platform with the -d option to run in detached mode:

    docker-compose up -d
    

    The above command starts Confluent Platform with a separate containers for each Confluent Platform component. Your output should resemble the following:

    Creating network "cp-all-in-one_default" with the default driver
    Creating zookeeper ... done
    Creating broker    ... done
    Creating schema-registry ... done
    Creating rest-proxy      ... done
    Creating connect         ... done
    Creating ksql-datagen    ... done
    Creating ksqldb-server   ... done
    Creating control-center  ... done
    Creating ksqldb-cli      ... done
    
  5. To verify that the services are up and running, run the following command:

    docker-compose ps
    

    You should see the following:

         Name                    Command               State                Ports
    ------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp,
                                                               0.0.0.0:9092->9092/tcp
    connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,
                                                               9092/tcp
    control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp
    ksqldb-cli        ksql http://localhost:8088       Up
    ksqldb-datagen    bash -c echo Waiting for K ...   Up
    ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
    rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
    schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,
                                                               2888/tcp, 3888/tcp
    

    If the state is not Up, rerun the docker-compose up -d command.

Step 2: Create Kafka Topics

In this step, you create Kafka topics using Confluent Control Center. Confluent Control Center provides the functionality for building and monitoring production data pipelines and event streaming applications.

  1. Navigate to the Control Center web interface at http://localhost:9021.

    If you installed Confluent Platform on a different host, replace localhost with the host name in the address.

    It may take a minute or two for Control Center to come online.

  2. Select the CO Cluster 1 cluster.

    ../_images/c3-landing-page.png
  3. Click Topics from the cluster submenu and click + Add a topic.

    ../_images/c3-create-topic.png
  4. In the Topic name field, specify pageviews and click Create with defaults.

    ../_images/c3-create-topic-name.png
  5. Click Topics from the cluster submenu and click + Add a topic.

  6. In the Topic name field, specify users and click Create with defaults.

Step 3: Install a Kafka Connector and Generate Sample Data

In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen that creates sample data for the Kafka topics pageviews and users.

Tip

The Kafka Connect Datagen connector was installed automatically when you started Docker Compose in Step 1: Download and Start Confluent Platform Using Docker. If you encounter issues locating the Datagen Connector, refer to the Issue: Cannot locate the Datagen connector in the Troubleshooting section.

  1. Run the first instance of the Kafka Connect Datagen connector to produce Kafka data to the pageviews topic in AVRO format.

    1. Select the CO Cluster 1 cluster.

    2. Click Connect.

    3. Select the connect-default cluster.

    4. Click Add connector.

    5. Select the DatagenConnector tile.

      Tip

      To narrow displayed connectors, click Filter by type and click Sources.

    6. In the Name field, specify datagen-pageviews.

    7. After naming the connector, new fields appear. Scroll down and specify the following configuration values:

      • In Key converter class, specify org.apache.kafka.connect.storage.StringConverter.
      • In kafka.topic, specify pageviews.
      • In max.interval, specify 100.
      • In quickstart, specify pageviews.
    8. Click Continue.

    9. Review the connector configuration and click Launch.

      ../_images/connect-review-pageviews.png
  2. Run the second instance of the Kafka Connect Datagen connector to produce Kafka data to the users topic in AVRO format.

    1. Select the CO Cluster 1 cluster.

    2. Click Connect.

    3. Select the connect-default cluster.

    4. Click + Add connector.

    5. Select the DatagenConnector tile.

      Tip

      To narrow displayed connectors, click Filter by type and click Sources.

    6. In the Name field, specify datagen-users.

    7. After naming the connector, new fields appear. Scroll down and specify the following configuration values:

      • In Key converter class, specify org.apache.kafka.connect.storage.StringConverter.
      • In kafka.topic, specify users.
      • In max.interval, specify 1000.
      • In quickstart, specify users.
    8. Click Continue.

    9. Review the connector configuration and click Launch.

Step 4: Create and Write to a Stream and Table using ksqlDB

Tip

You can also run these commands using the ksqlDB CLI from your Docker container with this command: docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088.

Create Streams and Tables

In this step, you use ksqlDB to create a stream for the pageviews topic and a table for the users topic.

  1. Select the CO Cluster 1 cluster.
  2. Click ksqlDB.
  3. Select the ksqlDB application.
  4. From the ksqlDB EDITOR page, in the Streams tab, click + Add Stream.
  5. Select the pageviews topic.
  6. Choose your stream options:
    • In Value format, select AVRO.
    • In Value column(s), set the fields as follows:
      • viewtime with type BIGINT
      • userid with type VARCHAR
      • pageid with type VARCHAR
  7. Click Save STREAM.
  8. In the Tables tab, click Add a table.
  9. Select the users topic.
  10. Choose your table options:
    • In Value format, select AVRO.
    • In PRIMARY KEY column, select userid.
    • In the Value column(s) section, set the fields as follows:
      • registertime with type BIGINT
      • userid with type VARCHAR
      • regionid with type VARCHAR
      • gender with type VARCHAR
  11. Click Save TABLE.

Write Queries

In this step, you create ksqlDB queries against the stream and the tabe you created above.

  1. Select the CO Cluster 1 cluster.

  2. Click ksqlDB.

  3. Select the ksqlDB application.

  4. In the Editor tab, click Add query properties to add a custom query property.

  5. Set the auto.offset.reset parameter to Earliest.

    This setting instructs ksqlDB queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the ksqlDB Configuration Parameter Reference.

  6. Create the following queries.

    1. Create a non-persistent query that returns data from a stream with the results limited to a maximum of three rows:

      Enter the following query in the editor:

      SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
      
    2. Click Run query. Your output should resemble:

      ../_images/c3-ksql-query-results-pageid.png

      Click the Card view or Table view icon to change the output layout.

    3. Create a persistent query (as a stream) that filters pageviews stream for female users. The results from this query are written to the Kafka PAGEVIEWS_FEMALE topic:

      Enter the following query in the editor:

      CREATE STREAM pageviews_female
         AS SELECT users.userid AS userid, pageid, regionid, gender
         FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid
         WHERE gender = 'FEMALE';
      
    4. Click Run query. Your output should resemble:

      ../_images/c3-ksql-persist-query-pv-female-results.png
    5. Create a persistent query where the regionid ends with 8 or 9. Results from this query are written to a Kafka topic named pageviews_enriched_r8_r9:

      Enter the following query in the editor:

      CREATE STREAM pageviews_female_like_89
         WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO')
         AS SELECT * FROM pageviews_female
         WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
      
    6. Click Run query. Your output should resemble:

      ../_images/c3-ksql-persist-query-pv-female89-results.png
    7. Create a persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called PAGEVIEWS_REGIONS:

      Enter the following query in the editor:

      CREATE TABLE pageviews_regions
         AS SELECT gender, regionid , COUNT(*) AS numusers
         FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid
         WINDOW TUMBLING (size 30 second)
         GROUP BY gender, regionid
         HAVING COUNT(*) > 1;
      
    8. Click Run query. Your output should resemble:

      ../_images/c3-ksql-persist-query-table-results.png
    9. Click the Running queries tab. You should see the following persisted queries:

      • PAGEVIEWS_FEMALE
      • PAGEVIEWS_FEMALE_LIKE_89
      • PAGEVIEWS_REGIONS
    10. Click the Editor tab. The All available streams and tables pane shows all of the streams and tables that you can access.

      ../_images/c3-ksql-stream-table-view-1.png
    11. In the All available streams and tables section, click KSQL_PROCESSING_LOG to view the stream’s schema, including nested data structures.

Run Queries

In this step, you run the ksqlDB queries you save as streams and tables above in the previous section.

  1. Select the CO Cluster 1 cluster.

  2. Click ksqlDB.

  3. Select the ksqlDB application.

  4. In the Streams tab, select the PAGEVIEWS_FEMALE stream.

  5. Click Query stream.

    Streaming output of the query displays.

  6. Click Stop to stop the output generation.

  7. In the Tables tab, select PAGEVIEWS_REGIONS table.

  8. Click Query table.

    Streaming output of the query displays.

  9. Click Stop to stop the output generation.

Step 5: Monitor Consumer Lag

  1. Select the CO Cluster 1 cluster.
  2. Click Consumers to view the consumers created by ksqlDB.
  3. Click the consumer group ID to view details for the _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE consumer group.
../_images/ksql-interface-monitor.png

From this page you can see the consumer lag and consumption values for your streaming query.

../_images/ksql-interface-monitor-cnsmgp.png

For more information, see the Control Center Consumers documentation.

Step 6: Stop Docker

When you are done working with Docker, you can stop and remove Docker containers and images.

  1. View a list of all Docker container IDs.

    docker container ls -a -q
    
  2. Run the following command to stop the Docker containers for Confluent:

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
    
  3. After stopping the Docker containers, run the following commands to prune the Docker system. Running these commands deletes containers, networks, volumes, and images, freeing up disk space:

    docker system prune -a -f --volumes
    

    Tip

    Remove the filter label for Confluent Docker (-f "label=io.confluent.docker") to clear all Confluent Platform Docker containers from your system.

For more information, refer to the official Docker documentation.

Troubleshooting

If you encountered any issues while going through the quickstart workflow, review the following resolutions before trying the steps again.

Issue: Cannot locate the Datagen connector

For details, see Step 1: Download and Start Confluent Platform Using Docker.

Resolution: Run the build command just for connect.

docker-compose build --no-cache connect

Your output should resemble:

Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest

Resolution: Check the Connect log for Datagen.

docker-compose logs connect | grep -i Datagen

Your output should resemble:

connect  | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect  | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Resolution: Check the Connect log for a warning and reminder to run the docker-compose up -d command properly.

docker-compose logs connect | grep -i Datagen

Resolution: Verify the .jar file for kafka-connect-datagen has been added and is present in the lib subfolder.

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/

Your output should resemble:

...
kafka-connect-datagen-0.1.0.jar
...

Resolution: Verify the plugin exists in the connector path.

docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'

Your output should resemble:

/usr/share/java,/usr/share/confluent-hub-components

Confirm its contents are present:

docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen

Your output should resemble:

assets   doc  etc  lib  manifest.json

Issue: Stream-Stream joins error

An error states Stream-Stream joins must have a WITHIN clause specified. This error can occur if you created both pageviews and users as streams by mistake.

../_images/c3-ksql-stream-stream-join-error.png

Resolution: Ensure that you created a stream for pageviews, and a table for users in Step 4: Create and Write to a Stream and Table using ksqlDB.

Issue: Unable to successfully complete ksqlDB query steps

Java errors or other severe errors were encountered.

Resolution: Ensure you are on an Operating System currently supported by Confluent Platform.

Resolution: Ensure that the Docker memory was increased to 8 MB. Go to Docker > Preferences > Advanced. If Docker memory is insufficient, other unpredictable issues could occur.

Next Steps

Learn more about the components shown in this quick start: