Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Confluent Open Source Quick Start (Docker)¶
This quick start demonstrates how to get up and running with Confluent Open Source and its main components using Docker containers. It demonstrates the basic and most powerful capabilities, including creating topics, adding and modifying data, and stream processing by using KSQL. In this quick start you will create Kafka topics and streaming queries on these topics by using KSQL.
This quick start leverages the Confluent Platform CLI, the Kafka CLI, and the KSQL CLI. For a rich UI-based experience, try out the Confluent Enterprise quick start.
- Prerequisites
- Docker
- Docker version 1.11 or later is installed and running.
- Docker Compose is installed. It is installed by default with Docker for Mac and Windows.
- Docker memory resource is allocated minimally at 8 GB.
- Git
- Internet connectivity
- wget to get the connector configuration file
- Docker
Step 1: Download and Start Confluent Platform Using Docker¶
Clone the Confluent Platform Docker Images GitHub Repository and checkout the
5.0.0-post
branch.git clone https://github.com/confluentinc/cp-docker-images cd cp-docker-images git checkout 5.0.0-post
Navigate to
cp-all-in-one
examples directory.cd examples/cp-all-in-one/
Start Confluent Platform specifying two options: (
-d
) to run in detached mode and (--build
) to build the Kafka Connect image with the source connectorkafka-connect-datagen
from Confluent Hub .Important
You must allocate a minimum of 8 GB of Docker memory resource.
docker-compose up -d --build
This starts Confluent Platform with separate containers for all Confluent Platform components. Your output should resemble the following:
Creating network "cp-all-in-one_default" with the default driver Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating rest-proxy ... done Creating connect ... done Creating ksql-datagen ... done Creating ksql-server ... done Creating control-center ... done Creating ksql-cli ... done
Optional: Run this command to verify that the services are up and running.
docker-compose ps
You should see the following:
Name Command State Ports ------------------------------------------------------------------------------------------ broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp ksql-cli ksql http://localhost:8088 Up ksql-datagen bash -c echo Waiting for K ... Up ksql-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
If the state is not
Up
, rerun thedocker-compose up -d
command.
Step 2: Create Kafka Topics¶
In this step Kafka topics are created in Confluent Platform by using the Kafka CLI.
Run this command to create a topic named
users
.docker-compose exec broker kafka-topics --create --zookeeper \ zookeeper:2181 --replication-factor 1 --partitions 1 --topic users
Your output should resemble:
Created topic "users".
Run this command to create a topic named
pageviews
.docker-compose exec broker kafka-topics --create --zookeeper \ zookeeper:2181 --replication-factor 1 --partitions 1 --topic pageviews
Your output should resemble:
Created topic "pageviews".
Step 3: Install a Kafka Connector and Generate Sample Data¶
In this step, you use Kafka Connect to run a demo source connector called kafka-connect-datagen
that creates sample data for the Kafka topics pageviews
and users
.
Run one instance of the
kafka-connect-datagen
connector to produce Kafka data to thepageviews
topic in JSON format.wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_pageviews_cos.config curl -X POST -H "Content-Type: application/json" --data @connector_pageviews_cos.config http://localhost:8083/connectors
Run another instance of the
kafka-connect-datagen
connector to produce Kafka data to theusers
topic in JSON format.wget https://github.com/confluentinc/kafka-connect-datagen/raw/master/config/connector_users_cos.config <path-to-confluent>/bin/confluent config datagen-users -d ./connector_users_cos.config
Step 4: Create and Write to a Stream and Table using KSQL¶
In this step KSQL queries are run on the pageviews
and users
topics that were created in the previous step. The
following KSQL commands are run from the KSQL CLI. Enter these commands in your terminal and press
Enter.
Important
- Confluent Platform must be installed and running.
- To try out the preview KSQL web interface, see the Confluent Enterprise quick start.
- All KSQL commands must end with a closing semicolon (
;
).
Create Streams and Tables¶
Start the KSQL CLI in your terminal with this command.
docker-compose exec ksql-cli ksql http://ksql-server:8088
Important
By default KSQL attempts to store its logs in a directory called
logs
that is relative to the location of theksql
executable. For example, ifksql
is installed at/usr/local/bin/ksql
, then it would attempt to store its logs in/usr/local/logs
. If you are runningksql
from the default Confluent Platform location,<path-to-confluent>/bin
, you must override this default behavior by using theLOG_DIR
variable.Create a stream (
pageviews
) from the Kafka topicpageviews
, specifying thevalue_format
ofJSON
.ksql> CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) \ WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='JSON');
Tip: Enter the
SHOW STREAMS;
command to view your streams. For example:Stream Name | Kafka Topic | Format ------------------------------------------------- PAGEVIEWS | pageviews | JSON -------------------------------------------------
Create a table (
users
) with several columns from the Kafka topicusers
, with thevalue_format
ofJSON
.CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, \ userid VARCHAR, interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>) \ WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON', KEY = 'userid');
Tip: Enter the
SHOW TABLES;
query to view your tables.Table Name | Kafka Topic | Format | Windowed -------------------------------------------------------------- USERS | users | JSON | false --------------------------------------------------------------
Write Queries¶
These examples write queries using KSQL. The following KSQL commands are run from the KSQL CLI. Enter these commands in your terminal and press Enter.
Add the custom query property
earliest
for theauto.offset.reset
parameter. This instructs KSQL queries to read all available topic data from the beginning. This configuration is used for each subsequent query. For more information, see the KSQL Configuration Parameter Reference.ksql> SET 'auto.offset.reset'='earliest';
Your output should resemble:
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
Create a query that returns data from a stream with the results limited to three rows.
ksql> SELECT pageid FROM pageviews LIMIT 3;
Your output should resemble:
Page_45 Page_38 Page_11 LIMIT reached for the partition. Query terminated
Create a persistent query that filters for female users. The results from this query are written to the Kafka
PAGEVIEWS_FEMALE
topic. This query enriches thepageviews
STREAM by doing aLEFT JOIN
with theusers
TABLE on the user ID, where a condition (gender = 'FEMALE'
) is met.ksql> CREATE STREAM pageviews_female AS SELECT users.userid AS userid, pageid, \ regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid \ WHERE gender = 'FEMALE';
Your output should resemble:
Message ---------------------------- Stream created and running ----------------------------
Create a persistent query where a condition (
regionid
) is met, usingLIKE
. Results from this query are written to a Kafka topic namedpageviews_enriched_r8_r9
.ksql> CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', \ value_format='JSON') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
Your output should resemble:
Message ---------------------------- Stream created and running ----------------------------
Create a persistent query that counts the pageviews for each region and gender combination in a tumbling window of 30 seconds when the count is greater than 1. Because the procedure is grouping and counting, the result is now a table, rather than a stream. Results from this query are written to a Kafka topic called
PAGEVIEWS_REGIONS
.ksql> CREATE TABLE pageviews_regions AS SELECT gender, regionid , \ COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) \ GROUP BY gender, regionid HAVING COUNT(*) > 1;
Your output should resemble:
Message --------------------------- Table created and running ---------------------------
Step 5: Monitor Streaming Data¶
Now that your streams are running you can monitor them.
View the details for your stream or table with the
DESCRIBE EXTENDED
command. For example, run this command to view thepageviews_female_like_89
stream:DESCRIBE EXTENDED pageviews_female_like_89;
Your output should look like this:
Type : STREAM Key field : PAGEVIEWS.USERID Timestamp field : Not set - using <ROWTIME> Key format : STRING Value format : JSON Kafka output topic : pageviews_enriched_r8_r9 (partitions: 4, replication: 1) Field | Type -------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) USERID | VARCHAR(STRING) (key) PAGEID | VARCHAR(STRING) REGIONID | VARCHAR(STRING) GENDER | VARCHAR(STRING) -------------------------------------- Queries that write into this STREAM ----------------------------------- id:CSAS_PAGEVIEWS_FEMALE_LIKE_89 - CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='JSON') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; For query topology and execution plan please run: EXPLAIN <QueryId> Local runtime statistics ------------------------ messages-per-sec: 2.01 total-messages: 10515 last-message: 3/14/18 2:25:40 PM PDT failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a (Statistics of the local KSQL server interaction with the Kafka topic pageviews_enriched_r8_r9)
Discover the query execution plan with the
EXPLAIN
command. For example, run this command to view the query execution plan forCTAS_PAGEVIEWS_REGIONS
:EXPLAIN CTAS_PAGEVIEWS_REGIONS;
Your should look like this:
Type : QUERY SQL : CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; Local runtime statistics ------------------------ messages-per-sec: 1.42 total-messages: 13871 last-message: 3/14/18 2:50:02 PM PDT failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a (Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS) Execution plan -------------- > [ PROJECT ] Schema: [GENDER : STRING , REGIONID : STRING , NUMUSERS : INT64]. > [ FILTER ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64 , KSQL_AGG_VARIABLE_0 : INT64 , KSQL_AGG_VARIABLE_1 : INT64]. > [ AGGREGATE ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64 , KSQL_AGG_VARIABLE_0 : INT64 , KSQL_AGG_VARIABLE_1 : INT64]. > [ PROJECT ] Schema: [PAGEVIEWS_FEMALE.GENDER : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.ROWTIME : INT64]. > [ SOURCE ] Schema: [PAGEVIEWS_FEMALE.ROWTIME : INT64 , PAGEVIEWS_FEMALE.ROWKEY : STRING , PAGEVIEWS_FEMALE.USERID : STRING , PAGEVIEWS_FEMALE.PAGEID : STRING , PAGEVIEWS_FEMALE.REGIONID : STRING , PAGEVIEWS_FEMALE.GENDER : STRING]. Processing topology ------------------- Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [PAGEVIEWS_FEMALE]) --> KSTREAM-MAPVALUES-0000000001 Processor: KSTREAM-MAPVALUES-0000000001 (stores: []) --> KSTREAM-TRANSFORMVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 ... Sub-topology: 1 Source: KSTREAM-SOURCE-0000000008 (topics: [KSQL_Agg_Query_1521052072079-repartition]) --> KSTREAM-AGGREGATE-0000000005 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [KSQL_Agg_Query_1521052072079]) --> KTABLE-FILTER-0000000009 <-- KSTREAM-SOURCE-0000000008 ...
For more information about KSQL syntax, see KSQL Syntax Reference.
Next Steps¶
Learn more about the components shown in this quick start:
- KSQL documentation Learn about processing your data with KSQL for use cases such as streaming ETL, real-time monitoring, and anomaly detection. You can also learn how to use KSQL with this collection of scripted demos.
- Stream Processing Cookbook Try out in-depth KSQL tutorials and recommended deployment scenarios.
- Kafka Streams documentation Learn how to build stream processing applications in Java or Scala.
- Kafka Connect documentation Learn how to integrate Kafka with other systems and download ready-to-use connectors to easily ingest data in and out of Kafka in real-time.
- Kafka Clients documentation Learn how to read and write data to and from Kafka using programming languages such as Go, Python, .NET, C/C++.
- Tutorials and Demos Try out the Confluent Platform tutorials and examples, watch demos and screencasts, and learn with white papers and blogs.