.. meta::
:description: Apache Kafka is a distributed streaming platform. The Kafka cluster stores streams of records in categories called Kafka topics.
.. _intro-to-ak:
Introduction to |ak|
====================
.. raw:: html
|ak-tm| is a distributed streaming platform that:
- Publishes and subscribes to streams of records, similar to a message queue or enterprise messaging system.
- Stores streams of records in a fault-tolerant durable way.
- Processes streams of records as they occur.
|ak| is used for these broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications.
- Building real-time streaming applications that transform or react to the streams of data.
|ak| is run as a cluster on one or more servers that can span multiple datacenters. The |ak| cluster stores streams of
*records* in categories called *topics*. Each record consists of a key, a value, and a timestamp.
.. figure:: ../images/kafka-apis.png
:width: 400px
|ak| has these core APIs:
:ref:`Producer API `
Applications can publish a stream of records to one or more |ak| topics.
:ref:`Consumer API `
Applications can subscribe to topics and process the stream of records produced to them.
:ref:`Streams API `
Applications can act as a *stream processor*, consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the input streams to output streams.
:ref:`Connector API `
Build and run reusable producers or consumers that connect |ak| topics to existing applications or data systems. For
example, a connector to a relational database might capture every change to a table.
In |ak| the communication between the clients and the servers is done
with a simple, high-performance, language agnostic `TCP
protocol `__. This protocol is
versioned and maintains backwards compatibility with older version. The Java client is provided for |ak|, but clients
are available in :ref:`many languages `.
Topics and Logs
---------------
The core abstraction |ak| provides for a stream of records is the topic.
A topic is a category or feed name to which records are published. Topics in |ak| are always multi-subscriber. This
means that a topic can have zero, one, or many consumers that subscribe to the data written to it.
For each topic, the |ak| cluster maintains a partitioned log that looks like this:
.. figure:: ../images/log_anatomy.png
Each partition is an ordered, immutable sequence of records that is continually appended to a structured commit log.
The records in the partitions are each assigned a sequential ID number called the *offset*, that uniquely identifies
each record within the partition.
The |ak| cluster durably persists all published records, whether they have been consumed using a configurable
retention period or not. For example, if the retention policy is set to two days, then for the two days after a record
is published, it is available for consumption, and then after the two days have passed it is discarded to free up space.
|ak|'s performance is effectively constant with respect to data size, which means storing data for a long time is not a
problem.
.. figure:: ../images/log_consumer.png
:width: 400px
The only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset
is controlled by the consumer. Normally a consumer will advance its offset linearly as it reads records, however,
because the position is controlled by the consumer it can consume records in any order. For example, a consumer can
reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start
consuming from "now".
This combination of features means that |ak| consumers can come and go without much impact on the cluster or on
other consumers. For example, you can use :ref:`the command line tools ` to "tail" the contents of any
topic without changing what is consumed by existing consumers.
The partitions in the log allow it to scale beyond a size that will fit on a single server. Each individual
partition must fit on the servers that host it, but a topic can have many partitions so it can handle an arbitrary
amount of data. Partitions can also act as the unit of parallelism.
Distribution
------------
The partitions of the log are distributed over the servers in the |ak| cluster with each server handling data and
requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault
tolerance.
Each partition has one server that acts as the "leader" and zero or more servers which act as "followers". The leader
handles all read and write requests for the partition while the followers passively replicate the leader. If the leader
fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its
partitions and a follower for others so that load is successfully balanced within the cluster.
Geo-Replication
---------------
|crep-full| and |ak| |mmaker| provide geo-replication support for your clusters. You can replicate messages across
multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in
active/active scenarios to place data closer to your users, or support data locality requirements. For more information,
see :ref:`multi_dc`.
Producers
---------
Producers publish data to the topics of their choice. The producer is responsible for choosing which record to
assign to which partition within the topic. This can be done in a round-robin fashion for load balancing or it can be
done according to some semantic partition function (e.g., based on some key in the record).
For more information, see :ref:`kafka_producer`.
Consumers
---------
Consumers label themselves with a *consumer group* name, and each record published to a topic is delivered to one
consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on
separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over
the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the
consumer processes.
.. figure:: ../images/consumer-groups.png
A two-server |ak| cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer
instances and group B has four.
Generally topics have a small number of consumer groups, one for each "logical subscriber". Each group is
composed of many consumer instances for scalability and fault tolerance. In this publish-subscribe scenario, the
subscriber is a cluster of consumers instead of a single process.
The way consumption is implemented in |ak| is by dividing up the partitions in the log over the consumer instances
so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process
of maintaining membership in the group is handled dynamically by the |ak| protocol. If new instances join the
group they will take over some partitions from other members of the group. If an instance dies, its partitions will
be distributed to the remaining instances.
|ak| only provides a total order over records *within* a partition, not between different partitions in a topic.
Per-partition ordering combined with the ability to partition data by key is sufficient for most applications.
However, if you require a total order over records this can be achieved with a topic that has only one partition,
though this will mean only one consumer process per consumer group.
For more information, see :ref:`kafka_consumer`.
Multi-tenancy
-------------
You can deploy |ak| as a multi-tenant solution. Multi-tenancy is enabled by configuring which topics can produce or
consume data. There is also operations support for quotas. Administrators can define and enforce quotas on requests
to control the broker resources that are used by clients. For more information, see :ref:`security`.
Guarantees
----------
At a high-level |ak| gives the following guarantees:
- Messages sent by a producer to a particular topic partition are appended in the order they are sent. For example,
if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset
than M2 and appear earlier in the log.
- A consumer instance sees records in the order they are stored in the log.
- For a topic with replication factor N, up to N-1 server failures are tolerated without losing any records
committed to the log.
Kafka as a Messaging System
---------------------------
Messaging is traditionally based on these models:
`Queuing `__
In a queue, a pool of consumers may read from a server and each record goes to one of them.
`Publish-subscribe `_
The record is broadcast to all consumers.
The strength of queuing is that it allows you to divide up the processing of data over multiple consumer instances.
This lets you scale your processing. Unfortunately, queues aren't multi-subscriber. After one process reads the data,
it's gone. Publish-subscribe allows you to broadcast data to multiple processes, but has no way of scaling processing
because every message goes to every subscriber.
The consumer group concept in |ak| generalizes these two concepts. As with the queue, the consumer group allows
you to divide up processing over a collection of processes (the members of the consumer group). As with
publish-subscribe, |ak| allows you to broadcast messages to multiple consumer groups.
The advantage of |ak|'s model is that every topic can scale processing and every topic is multi-subscriber. |ak| also
has stronger ordering guarantees than a traditional messaging system.
A traditional queue retains ordered records on the server, and if multiple consumers consume from the queue then
the server distributes records in the order they are stored. However, the records are delivered asynchronously to
consumers, so they may arrive out of order on different consumers. This means the ordering of the records is lost
in the presence of parallel consumption. Messaging systems often get around this with the notion of "exclusive consumer"
that allows only one process to consume from a queue. However, this means that there is no parallelism in processing.
With partitions, |ak| has the notion of parallelism within the topics. |ak| provides ordering guarantees and load
balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the
consumers in the consumer group. This means that each partition is consumed by exactly one consumer in the group.
This ensures that the consumer is the only reader of that partition and consumes the data in order. Because there
are many partitions, the load is still balanced over many consumer instances.
.. important:: You cannot have more consumer instances in a consumer group than partitions.
.. kafka_storage:
|ak| as a Storage System
-------------------------
Any message queue that publishes messages decoupled from consumption of them is acting as a storage system for the
in-flight messages. |ak| is a very good storage system.
Data written to |ak| is written to disk and replicated for fault-tolerance. |ak| allows producers to wait on
acknowledgement. A write isn't considered complete until it is fully replicated and guaranteed to persist even
if the server written to fails.
The disk structures |ak| uses are able to scale well. |ak| performs the same whether you have 50 KB or 50 TB of
persistent data on the server.
|ak| allows the clients to control their read position and can be thought of as a special purpose distributed
filesystem, dedicated to high-performance, low-latency commit log storage, replication, and propagation.
For details about |ak|'s commit log storage and replication design, see
:ref:`Design Details `.
|ak| for Stream Processing
--------------------------
In |ak| a stream processor is anything that takes continual streams of data from input topics, performs some
processing on this input, and produces continual streams of data to output topics. For example, a retail application
might take in input streams of sales and shipments, and output a stream of reorders and price adjustments computed
off this data.
You can do simple processing directly using the producer and consumer APIs. However for more complex transformations,
|ak| provides a fully integrated :ref:`Streams API `. You can build applications with the Streams API
that do non-trivial processing tasks that compute aggregations off of streams or join streams together.
Streams help solve problems such as: handling out-of-order data, reprocessing input as code changes,
performing stateful computations, etc.
The streams API builds on the core |ak| primitives, specifically it uses:
- The producer and consumer APIs for input
- |ak| for stateful storage
- The same group mechanism for fault tolerance among the stream processor instances
Putting the Pieces Together
---------------------------
This combination of messaging, storage, and stream processing is essential to |ak|'s role as a streaming platform.
A distributed file system, like HDFS, allows static files storage for batch processing. These types of systems allow
storing and processing of *historical* data from the past.
A traditional enterprise messaging system allows processing future messages that will arrive after you subscribe.
Applications built in this way process future data as it arrives.
|ak| combines the capabilities of a distributed file system and traditional enterprise messaging system, to deliver
a platform for streaming applications and data pipelines.
By combining storage and low-latency subscriptions, streaming applications can handle both past and future data the
same way. A single application can process historical, stored data and continue processing as future data arrives.
This is a generalized notion of stream processing that subsumes batch processing and message-driven applications.
For streaming data pipelines, the combination of subscription to real-time events make it possible to use |ak| for
very low-latency pipelines. |ak|'s ability to store data reliably means it can be used for critical data, where data
delivery must be guaranteed; or for integration with offline systems that load data only periodically or may go down
for extended periods of time for maintenance. The stream processing facilities make it possible to transform data
as it arrives.