Kafka Basics on Confluent Platform

Apache Kafka® is an open-source, distributed, event streaming platform capable of handling large volumes of real-time data. You use Kafka to build real-time streaming applications. Confluent is a commercial, global corporation that specializes in providing businesses with real-time access to data. Confluent was founded by the creators of Kafka, and its product line includes proprietary products based on open-source Kafka. This topic describes Kafka use cases, the relationship between Confluent and Kafka, and key differences between the Confluent products.

How Kafka relates to Confluent

Confluent products are built on the open-source software framework of Kafka to provide customers with reliable ways to stream data in real time. Confluent provides the features and know-how that enhance your ability to reliably stream data. If you’re already using Kafka, that means Confluent products support any producer or consumer code you’ve already written with the Kafka Java libraries. Whether you’re already using Kafka or just getting started with streaming data, Confluent provides features not found in Kafka. This includes non-Java libraries for client development and server processes that help you stream data more efficiently in a production environment, like Confluent Schema Registry, ksqlDB, and Confluent Hub. Confluent offers Confluent Cloud, a data-streaming service, and Confluent Platform, software you download and manage yourself.

Kafka use cases

Consider an application that uses Kafka topics as a backend to store and retrieve posts, likes, and comments from a popular social media site. The application incorporates producers and consumers that subscribe to those Kafka topics. When a user of the application publishes a post, likes something, or comments, the Kafka producer code in the application sends that data to the associated topic. When the user navigates to a particular page in the application, a Kafka consumer reads from the associated backend topic and the application renders data on the user’s device. For more information, see Use cases in the Apache Kafka Docs hosted by Confluent.

Confluent Platform

Confluent Platform is software you download and manage yourself. Any Kafka use cases are also Confluent Platform use cases. Confluent Platform is a specialized distribution of Kafka that includes additional features and APIs. Many of the commercial Confluent Platform features are built into the brokers as a function of Confluent Server.

The fundamental capabilities, concepts, design ethos, and ways of working that you already know from using Kafka, also apply to Confluent Platform. By definition, Confluent Platform ships with all of the basic Kafka command utilities and APIs used in development, along with several additional CLIs to support Confluent specific features. To learn more about Confluent Platform, see What is Confluent Platform?.

Confluent Platform releases include the latest stable version of Apache Kafka, so when you install Confluent Platform you are also installing Kafka. To view a mapping of Confluent Platform release to Kafka versions, see Supported Versions and Interoperability for Confluent Platform.

Ready to get started?

Confluent Cloud

Confluent Cloud provides Kafka as a cloud service, so that means you no longer need to install, upgrade or patch Kafka server components. You also get access to a cloud-native design, which offers Infinite Storage, elastic scaling and an uptime guarantee. If you’re coming to Confluent Cloud from open source Kafka, you can use data-streaming features only available from Confluent, including non-Java client libraries and proxies for Kafka producers and consumers, tools for monitoring and observability, an intuitive browser-based user interface, enterprise-grade security and data governance features.

Confluent Cloud includes different types of server processes for steaming data in a production environment. In addition to brokers and topics, Confluent Cloud provides implementations of Kafka Connect, Schema Registry, and ksqlDB.

Ready to get started?

What’s next

How to Run Confluent Platform

You have several options for running Confluent Platform (and Kafka), depending on your use cases and goals.

Quick Start

For developers who want to get familiar with the platform, you can start with the Quick Start for Confluent Platform. This quick start shows you how to run Confluent Platform using Docker on a single broker, single cluster development environment with topic replication factors set to 1.

Tip

If you want both an introduction to using Confluent Platform and an understanding of how to configure your clusters, a suggested learning progression is:

  1. Follow the steps for a local install as shown in the Quick Start for Confluent Platform and run a default single-broker cluster. Experiment with the features as shown in the workflow for that tutorial.
  2. Return to this page and walk through the steps to configure and run a multi-broker cluster.

Multi-node production-ready deployments

Operators and developers who want to set up production-ready deployments can follow the workflows for Install Confluent Platform On-Premises or Ansible Playbooks.

Single machine, multi-broker and multi-cluster configurations

To bridge the gap between the developer environment quick starts and full-scale, multi-node deployments, you can start by pioneering multi-broker clusters and multi-cluster setups on a single machine, like your laptop.

Trying out these different setups is a great way to learn your way around the configuration files for Kafka broker and Control Center, and experiment locally with more sophisticated deployments. These setups more closely resemble real-world configurations and support data sharing and other scenarios for Confluent Platform specific features like Replicator, Self-Balancing, Cluster Linking, and multi-cluster Schema Registry.

  • For a single cluster with multiple brokers, you must configure and start a single ZooKeeper or KRaft controller, and as many brokers as you want to run in the cluster. A detailed example of how to run this with ZooKeeper is provided in the Run a multi-broker cluster section that follows.
  • For a multi-cluster deployment, you should have a dedicated controller for each cluster, and a Kafka server properties file for each broker. To learn more about multi-cluster setups, see Run multiple clusters.

Does all this run on my laptop?

Yes, these examples show you how to run all clusters and brokers on a single laptop or machine.

That said, you can apply what you learn in this topic to create similar deployments on your favorite cloud provider, using multiple virtual hosts. Use these examples as stepping stones to more complex deployments and feature integrations.

KRaft and ZooKeeper

Important

As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. To learn more about running Kafka in KRaft mode, see KRaft Overview for Confluent Platform, the KRaft steps in the Platform Quick Start, and Settings for other components.

The following tutorial on how to run a multi-broker cluster provides examples for both KRaft mode and ZooKeeper mode.

For KRaft, the examples show an isolated mode configuration for a multi-broker cluster managed by a single controller. This maps to the deprecated ZooKeeper configuration, which uses one ZooKeeper and multiple brokers in a single cluster. To learn more about KRaft, see KRaft Overview for Confluent Platform and Kraft mode under Configure Confluent Platform for production.

In addition to some other differences noted in the steps below, note that:

  • For KRaft mode, you will use $CONFLUENT_HOME/etc/kafka/kraft/broker.properties and $CONFLUENT_HOME/etc/kafka/kraft/controller.properties.
  • For ZooKeeper mode, you will use $CONFLUENT_HOME/etc/kafka/server.properties and $CONFLUENT_HOME/etc/kafka/zookeeper.properties.

Run a multi-broker cluster

To run a single cluster with multiple brokers (3 brokers, for this example) you need:

  • 1 controller properties file (KRaft mode) or 1 ZooKeeper properties file (ZooKeeper mode)
  • 3 Kafka broker properties files with unique broker IDs, listener ports (to surface details for all brokers on Control Center), and log file directories.
  • Control Center properties file with the REST endpoints for controlcenter.cluster mapped to your brokers.
  • Metrics Reporter JAR file installed and enabled on the brokers. (If you start Confluent Platform as described below, from $CONFLUENT_HOME/bin/, the Metrics Reporter is automatically installed on the broker. Otherwise, you would need to add the path to the Metrics Reporter JAR file to your CLASSPATH.)
  • Properties files for any other Confluent Platform components you want to run, with default settings to start with.
../_images/kafka-basics-multi-broker-kraft.png

All of this is described in detail below.

Configure replication factors

The broker.properties (KRaft) and server.properties (ZooKeeper) files that ships with Confluent Platform have replication factors set to 1 on several system topics to support development test environments and Quick Start for Confluent Platform scenarios. For real-world scenarios, however, a replication factor greater than 1 is preferable to support fail-over and auto-balancing capabilities on both system and user-created topics.

For the purposes of this example, set the replication factors to 2, which is one less than the number of brokers (3). When you create your topics, make sure that they also have the needed replication factor, depending on the number of brokers.

Run these commands to update replication configurations in KRaft mode.

sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties

When you complete these steps, your file should show the following configs:

  • offsets.topic.replication.factor=2
  • transaction.state.log.replication.factor=2
  • confluent.license.topic.replication.factor=2
  • confluent.metadata.topic.replication.factor=2
  • confluent.balancer.topic.replication.factor=2

Configuration snapshot preview: Basic configuration for a three-broker cluster

The following table shows a summary of the configurations to specify for each of these files, as a reference to check against if needed. The steps in the next sections guide you through a quick way to set up these files, using existing the existing broker.properties file (KRaft) or server.properties file (ZooKeeper) as a basis for your specialized ones.

Ready to get started? Skip to Configure the servers.

File Configurations
controller.properties

The values for these basic properties must be unique for the controller:

node.id=5

controller.quorum.voters=5@localhost:9097

listeners=CONTROLLER://:9097

log.dirs=/tmp/kraft-controller-log

broker.properties

The values for these basic properties must be them unique per broker (except, all use the same Controller quorum):

node.id=0

controller.quorum.voters=0@localhost:9097

listeners=PLAINTEXT://:9092

log.dirs=/tmp/kraft-broker-logs

Add the following listener configuration to specify the REST endpoint for this broker:

confluent.http.server.listeners=http://localhost:8090

broker-1.properties

The values for these basic properties must be unique per broker (except, all use the same Controller quorum):

node.id=1

controller.quorum.voters=5@localhost:9097

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kraft-broker-logs-1

Provide the listener configuration to specify the REST endpoint unique to this broker:

confluent.http.server.listeners=http://localhost:8091

broker-2.properties

The values for these basic properties must be unique per broker (except, all use the same Controller quorum):

node.id=2

controller.quorum.voters=5@localhost:9097

listeners=PLAINTEXT://:9094

log.dirs=/tmp/kraft-broker-logs-2

Provide the listener configuration to specify the REST endpoint unique to this broker:

confluent.http.server.listeners=http://localhost:8092

Tip

In server.properties and other configuration files, commented out properties or those not listed at all, take the default values. For example, the commented out line for listeners on broker 0 has the effect of setting a single listener to PLAINTEXT://:9092.

Configure the servers

Start with the broker.properties file you updated in the previous sections with regard to replication factors and enabling Self-Balancing Clusters. You will make a few more changes to this file, then use it as the basis for the other servers.

  1. Update the node ID, controller quorum voters and port for the first broker, and then add the REST endpoint listener configuration for this broker at the end of the file:

    sed -i '' -e "s/node.id=2/node.id=0/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
    
    sed -i '' -e "s/1@localhost:9093/5@localhost:9097/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
    
    echo "confluent.http.server.listeners=http://localhost:8090" >> $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
    
  2. Copy the properties file for the first broker to use as a basis for the other two:

    cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    
    cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    
  3. Update the node ID, listener, and data directories for broker-1, and then update the REST endpoint listener for this broker:

    sed -i '' -e "s/node.id=0/node.id=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    
    sed -i '' -e "s/9092/9093/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    
    sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    
    sed -i '' -e "s/8090/8091/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    
  4. Update the node ID, listener, controller, and data directories for broker-2, and then update the REST endpoint listener for this broker:

    sed -i '' -e "s/node.id=0/node.id=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    
    sed -i '' -e "s/9092/9094/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    
    sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    
    sed -i '' -e "s/8090/8092/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    
  5. Finally, update the controller node ID, quorum voters, and port:

    sed -i '' -e "s/node.id=1/node.id=5/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
    
    sed -i '' -e "s/9093/9097/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
    
    sed -i '' -e "s/1@localhost/5@localhost/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
    

When you have completed this step, you will have three properties files that match the configurations shown in the Configuration snapshot preview: Basic configuration for a three-broker cluster:

  • broker.properties (KRaft) or server.properties (ZooKeeper) which corresponds to node/broker 0
  • broker-1.properties (KRaft) or server-1.properties (ZooKeeper) which corresponds to node/broker 1
  • broker-2.properties (KRaft) or server-2.properties (ZooKeeper) which corresponds to node/broker 2

Run this command to list the files in KRaft mode:

ls $CONFLUENT_HOME/etc/kafka/kraft/

Configure Control Center with REST endpoints and advertised listeners (Optional)

This is an optional step, only needed if you want to use Confluent Control Center. It gives you a similar starting point as you get in the Quick Start for Confluent Platform, and an alternate way to work with and verify the topics and data you will create on the command line with kafka-topics.

You must tell Control Center about the REST endpoints for all brokers in your cluster, and the advertised listeners for the other components you may want to run. Without these configurations, the brokers and components will not show up on Control Center.

Make the following changes to $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties and save the file.

  1. Open the file in an editor; for example, in vi:

    vi $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
    
  2. Configure REST endpoints for the brokers.

    In $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties, replace the default value for the Kafka REST endpoint URL by a copy-paste of the following lines to match your multi-broker configuration:

    # Kafka REST endpoint URL
    confluent.controlcenter.streams.cprest.url=http://localhost:8090,http://localhost:8091,http://localhost:8092
    

    See also

    Required Configurations for Control Center in Self-Balancing Configuration Options and confluent.controlcenter.streams.cprest.url in the Control Center Configuration Reference.

  3. Replace the configurations for Kafka Connect, ksqlDB, and Schema Registry to provide Control Center with the default advertised URLs to for the component clusters. You can delete the original configs and copy-paste the following into the file.

    # A comma separated list of Connect host names
    confluent.controlcenter.connect.cluster=http://localhost:8083
    
    # KSQL cluster URL
    confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088
    
    # Schema Registry cluster URL
    confluent.controlcenter.schema.registry.url=http://localhost:8081
    

Install the Datagen Connector (Optional)

Install the Kafka Connect Datagen source connector using the confluent connect plugin install command, or by using Confluent Hub. This connector generates mock data for demonstration purposes and is not suitable for production.

To install with the confluent connect plugin install command:

confluent connect plugin install confluentinc/kafka-connect-datagen:latest

Confluent Hub provides an online library of pre-packaged and ready-to-install extensions or add-ons for Confluent Platform and Kafka. To install with Confluent Hub:

confluent-hub install \
   --no-prompt confluentinc/kafka-connect-datagen:latest

This is an optional step, but useful, as it gives you a similar starting point as you get in the Quick Start for Confluent Platform.

Start the controller and brokers

In KRaft mode, you must run the following commands from `$CONFLUENT_HOME to generate a random cluster ID, and format log directories for the controller and each broker in dedicated command windows. You will then start the controller and brokers from those same dedicated windows.

The kafka-storage command is run only once per broker/controller. You cannot use the kafka-storage command to update an existing cluster. If you make a mistake in configurations at that point, you must recreate the directories from scratch, and work through the steps again.

Controller

  1. In a new dedicated command window, change directories into $CONFLUENT_HOME to run the KRaft setup commands and start the controller.

    cd $CONFLUENT_HOME
    
  2. Generate a random-uuid for the cluster using the kafka-storage tool.

    KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
    
  3. Get the value for KAFKA_CLUSTER_ID and add it to your .bash_profile, .bashrc, .zsh or similar so that it is available to you in new command windows for running the brokers. You will use this same cluster ID for all brokers.

    echo $KAFKA_CLUSTER_ID
    
  4. Format the log directories for the controller:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/controller.properties --ignore-formatted
    
  5. Start the controller:

    kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
    

broker.properties (node 0)

  1. In a new command window dedicated to running node 0, change directories into $CONFLUENT_HOME to run the KRaft setup commands and start your first broker.

    cd $CONFLUENT_HOME
    
  2. Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.

    (Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your .bash_profile:

    source ~./bash_profile
    
    echo $KAFKA_CLUSTER_ID
    
  3. Format the log directories for this broker:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker.properties --ignore-formatted
    
  4. Start the broker:

    kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
    

broker-1.properties (node 1)

  1. In a new command window dedicated to running node 1, change directories into $CONFLUENT_HOME to run the KRaft setup commands and start broker-1.

    cd $CONFLUENT_HOME
    
  2. Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.

    (Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your .bash_profile:

    source ~./bash_profile
    
    echo $KAFKA_CLUSTER_ID
    
  3. Format the log directories for broker-1:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties --ignore-formatted
    
  4. Start the broker:

    kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
    

broker-2.properties (node 2)

  1. In a new command window dedicated to running node 2, change directories into $CONFLUENT_HOME to run the KRaft setup commands and start broker-2.

    cd $CONFLUENT_HOME
    
  2. Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.

    (Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your .bash_profile:

    source ~./bash_profile
    
    echo $KAFKA_CLUSTER_ID
    
  3. Format the log directories for this broker-2:

    ./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties --ignore-formatted
    
  4. Start the broker:

    kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
    

Start the other components

Start each of these components in separate windows.

Tip

For this example, it is not necessary to start all of these. At a minimum, you will need ZooKeeper and the brokers (already started), and Kafka REST. However, it is useful to have all components running if you are just getting started with the platform, and want to explore everything. This gives you a similar starting point as you get in Quick Start for Confluent Platform, and enables you to work through the examples in that Quick Start in addition to the Kafka command examples provided here.

  1. Start Kafka REST

    kafka-rest-start $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties
    
  2. (Optional) Start Kafka Connect

    connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
    
  3. (Optional) Start ksqlDB

    ksql-server-start $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
    
  4. (Optional) Start Schema Registry

    schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
    
  5. (Optional) Finally, start Control Center in a separate command window.

    control-center-start $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
    

Create Kafka topics, producers, and consumers

If you are ready to start working at the command line, skip to Kafka Commands Primer and try creating Kafka topics, working with producers and consumers, and so forth.

Explore Control Center (Optional)

Bring up Confluent Control Center to verify the current status of your cluster, including lead broker (controller), topic data, and number of brokers. For a local deployment, Control Center is available at http://localhost:9021/ in your web browser.

The starting view of your environment in Control Center shows your cluster with 3 brokers.

  1. Click into the cluster card.

    ../_images/basics-c3-cluster.png

    The cluster overview is displayed.

    ../_images/basics-c3-cluster-overview.png
  2. Click either the Brokers card or Brokers on the menu to view broker metrics.

  3. Finally, click Topics on the left menu.

    Note that only system (internal) topics are available at this point because you haven’t created any topics of your own yet. The default_ksql_processing_log will show up as a topic if you configured and started ksqlDB.

    There is a lot more to Control Center but it is not the focus of this guide. If you haven’t had a chance to work all the way through a quick start (which demos tasks on Control Center), technically you could jump over to Quick Start for Confluent Platform and work through those same tasks on this cluster (starting with creating Kafka topics on Control Center), and then come back to this guide to continue with the examples in Kafka Commands Primer.

    Everything should work the same for the Quick Start steps. The only difference is that here you have a multi-broker cluster with replication factors set appropriately for additional examples, and the deployment in the quick start is a single-broker cluster with replication factors set to 1 for a development-only environment.

Kafka Commands Primer

After you have Confluent Platform running, an intuitive next step is try out some basic Kafka commands to create topics and work with producers and consumers. This should help orient Kafka newbies and pros alike that all those familiar Kafka tools are readily available in Confluent Platform, and work the same way. These provide a means of testing and working with basic functionality, as well as configuring and monitoring deployments. The commands surface a subset of the APIs available to you.

A few things to note:

  • Confluent Platform ships with Kafka commands and utilities in $CONFLUENT_HOME/bin. This bin/ directory includes both Confluent proprietary and open source Kafka utilities. A full list is provided in CLI Tools Shipped With Confluent Platform. Those in the list that begin with kafka- are the Kafka open source command utilities. A reference for Confluent proprietary commands is provided in CLI Tools for Confluent Platform.
  • With Confluent Platform installed and running on your system, you can run Kafka commands from anywhere; for example, from your $HOME (~/) directory. You do not have to run these from within $CONFLUENT_HOME.
  • Command line help is available by typing any of the commands with no arguments; for example, kafka-topics or kafka-producer-perf-test.

To help get you started, the sections below provide examples for some of the most fundamental and widely-used Kafka scripts.

Create, list and describe topics

You can use kafka-topics for operations on topics (create, list, describe, alter, delete, and so forth).

In a command window, run the following commands to experiment with topics.

  1. Create three topics, cool-topic, warm-topic, hot-topic.

    kafka-topics --create --topic cool-topic --bootstrap-server localhost:9092
    
    kafka-topics --create --topic warm-topic --bootstrap-server localhost:9092
    
    kafka-topics --create --topic hot-topic --partitions 2 --replication-factor 2 --bootstrap-server localhost:9092
    
  2. List all topics.

    kafka-topics --list --bootstrap-server localhost:9092
    

    Tip

    System topics are prefaced by an underscore in the output. The topics you created are listed at the end.

  3. Describe a topic.

    This shows partitions, replication factor, and in-sync replicas for the topic.

    kafka-topics --describe --topic cool-topic --bootstrap-server localhost:9092
    

    Your output should resemble the following:

    Topic: cool-topic PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
      Topic: cool-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline:
    

    Tip

    If you run kafka-topics --describe with no specified topic, you get a detailed description of every topic on the cluster (system and user topics).

  4. Describe another topic, using one of the other brokers in the cluster as the bootstrap server.

    kafka-topics --describe --topic hot-topic --bootstrap-server localhost:9094
    

    Here is that example output:

    Topic: hot-topic  PartitionCount: 2       ReplicationFactor: 2    Configs: segment.bytes=1073741824
      Topic: hot-topic        Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0        Offline:
      Topic: hot-topic        Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2        Offline:
    

    You can connect to any of the brokers in the cluster to run these commands because they all have the same data!

  5. Alter a topic’s cofiguration.

    For this example, change the partition count on hot-topic from 2 to 9.

    kafka-topics --alter --topic hot-topic --partitions 9 --bootstrap-server localhost:9092
    

    Tip

    Dynamic topic modification is inherently limited by the current configurations. For example, you cannot decrease the number of partitions or modify the replication factor for a topic, as that would require partition reassignment.

  6. Rerun --describe on the same topic.

    kafka-topics --describe --topic hot-topic --bootstrap-server localhost:9092
    

    Here is that example output, and verify that the partition count is updated to 9:

    Topic: hot-topic  PartitionCount: 9       ReplicationFactor: 2    Configs: segment.bytes=1073741824
      Topic: hot-topic        Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1        Offline:
      Topic: hot-topic        Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0        Offline:
      Topic: hot-topic        Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2        Offline:
      Topic: hot-topic        Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1        Offline:
      Topic: hot-topic        Partition: 4    Leader: 0       Replicas: 0,2   Isr: 0,2        Offline:
      Topic: hot-topic        Partition: 5    Leader: 1       Replicas: 1,0   Isr: 1,0        Offline:
      Topic: hot-topic        Partition: 6    Leader: 2       Replicas: 2,0   Isr: 2,0        Offline:
      Topic: hot-topic        Partition: 7    Leader: 0       Replicas: 0,1   Isr: 0,1        Offline:
      Topic: hot-topic        Partition: 8    Leader: 1       Replicas: 1,2   Isr: 1,2        Offline:
    
  7. Delete a topic.

    kafka-topics --delete --topic warm-topic --bootstrap-server localhost:9092
    
  8. List all topics.

    kafka-topics --list --bootstrap-server localhost:9092
    

Run producers and consumers to send and read messages

The command utilities kafka-console-producer and kafka-console-consumer allow you to manually produce messages to and consume from a topic.

  1. Open two new command windows, one for a producer, and the other for a consumer.

  2. Run a producer to produce to cool-topic.

    kafka-console-producer --topic cool-topic --bootstrap-server localhost:9092
    
  3. Send some messages.

    Type your messages at the prompt (>), and hit Return after each one.

    Your command window will resemble the following:

    $ kafka-console-producer --broker-list localhost:9092 --topic cool-topic
    >hi cool topic
    >did you get this message?
    >first
    >second
    >third
    >yes! I love you cool topic
    >
    

    Tip

    You can use the --broker-list flag in place of --bootstrap-server for the producer, typically used to send data to specific brokers; shown here as an example.

  4. In the other command window, run a consumer to read messages from cool-topic. Specify that you want to start consuming from the beginning, as shown.

    kafka-console-consumer --topic cool-topic --from-beginning --bootstrap-server localhost:9092
    

    Your output will resemble the following:

    $ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic cool-topic
    hi cool topic on origin cluster
    is this getting to your replica?
    first
    second
    third
    yes! I love you cool topic
    
  5. When you want to stop the producer and consumer, type Ctl-C in their respective command windows.

    Tip

    You may want to leave at least the producer running for now, in case you want to send more messages when we revisit topics on the Control Center.

Produce auto-generated message data to topics

You can use kafka-producer-perf-test in its own command window to generate test data to topics.

  • For example, open a new command window and type the following command to send data to hot-topic, with the specified throughput and record size.

    kafka-producer-perf-test \
       --producer-props bootstrap.servers=localhost:9092 \
       --topic hot-topic \
       --record-size 1000 \
       --throughput 1000 \
       --num-records 3600000
    

    The command provides status output on messages sent, as shown:

    4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency.
    5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency.
    5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency.
    ...
    
  • Open a new command window to consume the messages from hot-topic as they are sent (not from the beginning).

    kafka-console-consumer --topic hot-topic --bootstrap-server localhost:9092
    

    Type Ctl-C to stop the consumer.

Tip

You may want to leave the producer running for a moment, as you are about to revisit Topics on the Control Center.

To learn more, check out Benchmark Commands, Let’s Load test, Kafka!, and How to do Performance testing of Kafka Cluster

Revisit Control Center (Optional)

Now that you have created some topics and produced message data to a topic (both manually and with auto-generated), take another look at Control Center, this time to inspect the existing topics.

  1. Open a web browser and go to http://localhost:9021/, the default URL for Control Center on a local system.

  2. Select the cluster, and click Topics from the menu.

  3. Choose cool-topic, then select the Messages tab.

    Select Jump to offset and type 1, 2, or 3 to display previous messages.

    These messages do not show in the order they were sent because the consumer here is not reading --from-beginning.

    Try manually typing some more messages to cool-topic with your command line producer, and watch them show up here.

    ../_images/basics-c3-topics-messages-cool.png
  4. Navigate to Topics > hot-topic > Messages tab.

    Auto-generated messages from your kafka-producer-perf-test are shown here as they arrive.

    ../_images/basics-c3-topics-messages-hot.png

Shutdown and cleanup tasks

Run the following shutdown and cleanup tasks.

  1. Stop the kafka-producer-perf-test with Ctl-C in its respective command window.

  2. Stop the all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. For example, stop Control Center first, then other components, followed by Kafka brokers, and finally ZooKeeper.

  3. Remove log files from /tmp. For example, if you were running in KRaft mode:

    ls /tmp
    
    rm -rf /tmp/kraft-broker-logs*.*
    
    rm -rf /tmp/kraft-controller-logs
    

Run multiple clusters

Another option to experiment with is a multi-cluster deployment. This is relevant for trying out features like Replicator, Cluster Linking, and multi-cluster Schema Registry, where you want to share or replicate topic data across two clusters, often modeled as the origin and the destination cluster.

These configurations can be used for data sharing across data centers and regions and are often modeled as source and destination clusters. An example configuration for cluster linking is shown in the diagram below. (A full guide to this setup is available in the Tutorial: Share Data Across Topics Using Cluster Linking for Confluent Platform.)

../_images/kafka-basics-multi-cluster.png

Multi-cluster configurations are described in context under the relevant use cases. Since these configurations will vary depending on what you want to accomplish, the best way to test out multi-cluster is to choose a use case, and follow the feature-specific tutorial. The specifics of these configurations vary depending on whether you are using KRaft in combined or isolated mode, or ZooKeeper.

Code Examples and Demo Apps

Following are links to examples of Confluent Platform distributed applications that uses Kafka topics, along with producers, and consumers that subscribe to those topics, in an event subscription model. The idea is to complete the picture of how Kafka and Confluent Platform can be used to accomplish a task or provide a service.