Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Quick Start for Apache Kafka using Confluent Platform Community Components (Docker)¶
This quick start shows you how to get up and running using Confluent Platform using only Confluent Community components using Docker containers. It demonstrates the basic and most powerful capabilities, including creating topics, adding and modifying data, and stream processing by using ksqlDB. In this quick start you will create Apache Kafka® topics and streaming queries on these topics by using ksqlDB.
This quick start leverages the Confluent Platform CLI, the Kafka CLI, and the ksqlDB CLI. For a rich UI-based experience, try out the Confluent Platform quick start with commercial components.
- Prerequisites:
- Docker:
- Docker version 1.11 or later is installed and running.
- Docker Compose is installed. Docker Compose is installed by default with Docker for Mac.
- Docker memory is allocated minimally at 8 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 8 GB in Docker > Preferences > Advanced.
- Git.
- Internet connectivity.
- Ensure you are on an Operating System currently supported by Confluent Platform.
- Networking and Kafka on Docker: Configure your hosts and ports to allow both internal and external components to the Docker network to communicate. For more details, see this article.
- Docker:
Step 1: Download and Start Confluent Platform Using Docker¶
Clone the confluentinc/cp-all-in-one GitHub repository and check out the
5.5.15-post
branch.git clone https://github.com/confluentinc/cp-all-in-one cd cp-all-in-one git checkout 5.5.15-post
Navigate to
cp-all-in-one-community
directory.cd cp-all-in-one-community/
Start Confluent Platform specifying the
-d
option to run in detached mode.Important
You must allocate a minimum of 8 GB of Docker memory resource. The default memory allocation on Docker Desktop for Mac is 2 GB and must be changed.
docker-compose up -d
This starts Confluent Platform with separate containers for all Confluent Platform components. Your output should resemble the following:
Creating network "cp-all-in-one-community_default" with the default driver Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating rest-proxy ... done Creating connect ... done Creating ksql-datagen ... done Creating ksqldb-server ... done Creating ksqldb-cli ... done
Optional: Run this command to verify that the services are up and running.
docker-compose ps
You should see the following:
Name Command State Ports ------------------------------------------------------------------------------------------ broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp ksqldb-cli ksql http://localhost:8088 Up ksql-datagen bash -c echo Waiting for K ... Up ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
If the state is not
Up
, rerun thedocker-compose up -d
command.
Step 2: Create Kafka Topics¶
In this step Kafka topics are created in Confluent Platform by using the Kafka CLI.
Run this command to create a topic named
users
.docker-compose exec broker kafka-topics --create --bootstrap-server \ localhost:9092 --replication-factor 1 --partitions 1 --topic users
Your output should resemble:
Created topic "users".
Run this command to create a topic named
pageviews
.docker-compose exec broker kafka-topics --create --bootstrap-server \ localhost:9092 --replication-factor 1 --partitions 1 --topic pageviews
Your output should resemble:
Created topic "pageviews".
Step 3: Install a Kafka Connector and Generate Sample Data¶
In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen
that creates sample data for the Kafka topics pageviews
and users
.
Run one instance of the Kafka Connect Datagen connector to produce Kafka data to the
pageviews
topic in AVRO format.curl -L -O -H 'Accept: application/vnd.github.v3.raw' https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_pageviews_cos.config curl -X POST -H 'Content-Type: application/json' --data @connector_pageviews_cos.config http://localhost:8083/connectors
Run another instance of the Kafka Connect Datagen connector to produce Kafka data to the
users
topic in AVRO format.curl -L -O -H 'Accept: application/vnd.github.v3.raw' https://api.github.com/repos/confluentinc/kafka-connect-datagen/contents/config/connector_users_cos.config curl -X POST -H 'Content-Type: application/json' --data @connector_users_cos.config http://localhost:8083/connectors
Tip
The Kafka Connect Datagen connector was installed automatically when you started Docker Compose in Step 1: Download and Start Confluent Platform Using Docker. If you encounter issues with the Datagen Connector, refer to the Issue: Cannot locate the Datagen Connector in the Troubleshooting section.
Step 4: Create and Write to a Stream and Table using ksqlDB¶
In this step, SQL queries are run on the pageviews
and users
topics that were created in the previous step. The
following ksqlDB commands are run from the ksqlDB CLI. Enter these commands in your terminal and press
Enter.
Important
- Confluent Platform must be installed and running.
- To try out the ksqlDB web interface, see the Confluent Platform quick start with commercial components.
- All ksqlDB commands must end with a closing semicolon (
;
).
Create Streams and Tables¶
Start the ksqlDB CLI in your terminal with this command.
docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
Important
By default ksqlDB attempts to store its logs in a directory called
logs
that is relative to the location of theksql
executable. For example, ifksql
is installed at/usr/local/bin/ksql
, then it would attempt to store its logs in/usr/local/logs
. If you are runningksql
from the default Confluent Platform location,$CONFLUENT_HOME/bin
, you must override this default behavior by using theLOG_DIR
variable.Create a stream
pageviews
from the Kafka topicpageviews
, specifying thevalue_format
ofAVRO
.CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \ WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
Tip: Enter the
SHOW STREAMS;
command to view your streams. For example:Stream Name | Kafka Topic | Format ------------------------------------------------- PAGEVIEWS | pageviews | AVRO -------------------------------------------------
Create a table
users
with several columns from the Kafka topicusers
, with thevalue_format
ofAVRO
.CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, \ userid VARCHAR) \ WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO', KEY = 'userid');
Tip: Enter the
SHOW TABLES;
query to view your tables.Table Name | Kafka Topic | Format | Windowed -------------------------------------------------------------- USERS | users | AVRO | false --------------------------------------------------------------
Write Queries¶
These examples write queries using SQL. The following ksqlDB commands are run from the ksqlDB CLI. Enter these commands in your terminal and press Enter.
Add the custom query property
earliest
for theauto.offset.reset
parameter. This instructs ksqlDB queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the ksqlDB Configuration Parameter Reference.SET 'auto.offset.reset'='earliest';
Your output should resemble:
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
Create a non-persistent query that returns data from a stream with the results limited to a maximum of three rows.
SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;
Your output should resemble:
Page_45 Page_38 Page_11 LIMIT reached for the partition. Query terminated
Create a persistent query that filters for female users. The results from this query are written to the Kafka
PAGEVIEWS_FEMALE
topic. This query enriches thepageviews
STREAM by doing aLEFT JOIN
with theusers
TABLE on the user ID, where a condition (gender = 'FEMALE'
) is met.CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, \ regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \ WHERE gender = 'FEMALE';
Your output should resemble:
Message ------------------------------------------------------------------------------------------------------ Stream PAGEVIEWS_FEMALE created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_3 ------------------------------------------------------------------------------------------------------
Create a persistent query where a condition (
regionid
) is met, usingLIKE
. Results from this query are written to a Kafka topic namedpageviews_enriched_r8_r9
.CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \ value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
Your output should resemble:
Message ---------------------------------------------------------------------------------------------------------------------- Stream PAGEVIEWS_FEMALE_LIKE_89 created and running. Created by query with query ID: CSAS_PAGEVIEWS_FEMALE_LIKE_89_4 ----------------------------------------------------------------------------------------------------------------------
Create a persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called
PAGEVIEWS_REGIONS
.CREATE TABLE pageviews_regions AS SELECT gender, regionid , \ COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) \ GROUP BY gender, regionid HAVING COUNT(*) > 1;
Your output should resemble:
Message ------------------------------------------------------------------------------------------------------- Table PAGEVIEWS_REGIONS created and running. Created by query with query ID: CTAS_PAGEVIEWS_REGIONS_5 -------------------------------------------------------------------------------------------------------
Step 5: Monitor Streaming Data¶
Now that your streams are running you can monitor them.
View the details for your stream or table with the
DESCRIBE EXTENDED
command. For example, run this command to view thepageviews_female_like_89
stream:DESCRIBE EXTENDED pageviews_female_like_89;
Your output should look like this:
Name : PAGEVIEWS_FEMALE_LIKE_89 Type : STREAM Key field : Key format : STRING Timestamp field : Not set - using <ROWTIME> Value format : AVRO Kafka topic : pageviews_enriched_r8_r9 (partitions: 1, replication: 1) Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING) REGIONID | VARCHAR(STRING) GENDER | VARCHAR(STRING) -------------------------------------- ...
List the running queries with the
SHOW QUERIES
command.SHOW QUERIES;
You should see a query named
CTAS_PAGEVIEWS_REGIONS
.Discover the query execution plan with the
EXPLAIN
command. For example, run this command to view the query execution plan forCTAS_PAGEVIEWS_REGIONS
:EXPLAIN CTAS_PAGEVIEWS_REGIONS_5;
Your output should resemble:
ID : CTAS_PAGEVIEWS_REGIONS_5 SQL : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_FEMALE.GENDER "GENDER", PAGEVIEWS_FEMALE.REGIONID "REGIONID", COUNT(*) "NUMUSERS" FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_FEMALE.GENDER, PAGEVIEWS_FEMALE.REGIONID HAVING (COUNT(*) > 1) EMIT CHANGES; Status : RUNNING Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) NUMUSERS | BIGINT -------------------------------------- ...
For more information about SQL syntax, see ksqlDB Syntax Reference.
Step 6: Stop Docker¶
When you are done working with Docker, you can stop and remove Docker containers and images.
View a list of all Docker container IDs.
docker container ls -aq
Run the following command to stop the Docker containers for Confluent:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
Run the following commands to stop the containers and prune the Docker system. Running these commands deletes containers, networks, volumes, and images; freeing up disk space:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker") && docker system prune -a -f --volumes
Tip
Remove the filter label for Confluent Docker (
-f "label=io.confluent.docker"
) to clear all Docker containers from your system.
You can rebuild and restart the containers at any time using the docker-compose up -d
command.
For more information, refer to the official Docker documentation.
Troubleshooting¶
If you encountered any issues, review the following resolutions before trying the steps again.
Issue: Cannot locate the Datagen Connector¶
Resolution: Run the build
command just for connect.
docker-compose build --no-cache connect
Your output should resemble:
Building connect
...
Completed
Removing intermediate container cdb0af3550c8
---> 36d00047d29b
Successfully built 36d00047d29b
Successfully tagged confluentinc/kafka-connect-datagen:latest
Resolution: Check the Connect log for Datagen
.
docker-compose logs connect | grep -i Datagen
Your output should resemble:
connect | [2019-04-17 20:03:26,137] INFO Loading plugin from: /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:26,206] INFO Added plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
connect | [2019-04-17 20:03:28,102] INFO Added aliases 'DatagenConnector' and 'Datagen' to plugin 'io.confluent.kafka.connect.datagen.DatagenConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
Resolution: Check the Connect log for a warning and reminder to run the docker-compose up -d
command properly.
docker-compose logs connect | grep -i Datagen
Resolution: Verify the .jar
file for kafka-connect-datagen
has been added and is present in the lib
subfolder.
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/lib/
Your output should resemble:
...
kafka-connect-datagen-0.1.0.jar
...
Resolution: Verify the plugin exists in the connector path.
docker-compose exec connect bash -c 'echo $CONNECT_PLUGIN_PATH'
Your output should resemble:
/usr/share/java,/usr/share/confluent-hub-components
Confirm its contents are present:
docker-compose exec connect ls /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen
Your output should resemble:
assets doc etc lib manifest.json
Issue: Stream-Stream joins error¶
An error states Stream-Stream joins must have a WITHIN clause specified. This error can occur if you created streams
for both pageviews
and users
by mistake.
Resolution: Ensure that you created a stream for pageviews
, and a table for users
in Step 4: Create and Write to a Stream and Table using ksqlDB.
Issue: Unable to successfully complete ksqlDB query steps¶
Java errors or other severe errors were encountered.
Resolution: Ensure you are on an Operating System currently supported by Confluent Platform.
Resolution: Ensure that the Docker memory was increased to 8 MB. Go to Docker > Preferences > Advanced. If Docker memory is insufficient, other unpredictable issues could occur.
ksqlDB errors were encountered.
Resolution: Review the help in the ksqlDB CLI for successful command tips and links to more documentation.
ksql> help
Next Steps¶
Learn more about the components shown in this quick start:
- ksqlDB documentation Learn about processing your data with ksqlDB for use cases such as streaming ETL, real-time monitoring, and anomaly detection. You can also learn how to use ksqlDB with this collection of scripted demos.
- Kafka Tutorials Try out basic Kafka, Kafka Streams, and ksqlDB tutorials with step-by-step instructions.
- Kafka Streams documentation Learn how to build stream processing applications in Java or Scala.
- Kafka Connect documentation Learn how to integrate Kafka with other systems and download ready-to-use connectors to easily ingest data in and out of Kafka in real-time.
- Kafka Clients documentation Learn how to read and write data to and from Kafka using programming languages such as Go, Python, .NET, C/C++.
- Videos, Demos, and Reading Material Try out the Confluent Platform tutorials and examples, watch demos and screencasts, and learn with white papers and blogs.