Architecture of ksqlDB for Confluent Platform

You can use ksqlDB to build event streaming applications from Apache Kafka® topics by using only SQL statements and queries. ksqlDB is built on Kafka Streams, so a ksqlDB application communicates with a Kafka cluster like any other Kafka Streams application.

ksqlDB Components

ksqlDB has these main components:

  • ksqlDB engine – processes SQL statements and queries
  • REST interface – enables client access to the engine
  • ksqlDB CLI – console that provides a command-line interface (CLI) to the engine
  • ksqlDB UI – enables developing ksqlDB applications in Confluent Control Center and Confluent Cloud

ksqlDB Server comprises the ksqlDB engine and the REST API. ksqlDB Server instances communicate with a Kafka cluster, and you can add more of them as necessary without restarting your applications.

Diagram showing architecture of ksqlDB

Diagram showing architecture of ksqlDB

  • ksqlDB Engine: The ksqlDB engine executes SQL statements and queries. You define your application logic by writing SQL statements, and the engine builds and runs the application on available ksqlDB servers. Each ksqlDB Server instance runs a ksqlDB engine. Under the hood, the engine parses your SQL statements and builds corresponding Kafka Streams topologies. The ksqlDB engine is implemented in the KsqlEngine.java class.
  • ksqlDB CLI: The ksqlDB CLI provides a console with a command-line interface for the ksqlDB engine. Use the ksqlDB CLI to interact with ksqlDB Server instances and develop your streaming applications. The ksqlDB CLI is designed to be familiar to users of MySQL, Postgres, and similar applications. The ksqlDB CLI is implemented in the io.confluent.ksql.cli package.
  • REST Interface: The REST server interface enables communicating with the ksqlDB engine from the CLI, Confluent Control Center, or from any other REST client. For more information, see ksqlDB REST API Reference. The ksqlDB REST server is implemented in the KsqlRestApplication.java class.

When you deploy your ksqlDB application, it runs on ksqlDB Server instances that are independent of one another, are fault-tolerant, and can be scaled with load. For more information, see ksqlDB Deployment Modes.

Diagram showing architecture of ksqlDB

Diagram showing architecture of ksqlDB

ksqlDB and Kafka Streams

ksqlDB is built on Kafka Streams, a robust stream processing framework that is part of Apache Kafka®. You can use ksqlDB and Kafka Streams together in your event streaming applications.

The Confluent Platform stack, with ksqlDB built on Kafka Streams

The Confluent Platform stack, with ksqlDB built on Kafka Streams

ksqlDB gives you a query layer for building event streaming applications on Kafka topics. ksqlDB abstracts away much of the complex programming that’s required for real-time operations on streams of data, so that one line of SQL can do the work of a dozen lines of Java or Scala.

For example, to implement simple fraud-detection logic on a Kafka topic named payments, you could write one line of SQL:

CREATE STREAM fraudulent_payments AS
 SELECT fraudProbability(data) FROM payments
 WHERE fraudProbability(data) > 0.8
 EMIT CHANGES;

The equivalent Scala code on Kafka Streams might resemble:

// Example fraud-detection logic using the Kafka Streams API.
object FraudFilteringApplication extends App {

  val builder: StreamsBuilder = new StreamsBuilder()
  val fraudulentPayments: KStream[String, Payment] = builder
    .stream[String, Payment]("payments")
    .filter((_ ,payment) => payment.fraudProbability > 0.8)
  fraudulentPayments.to("fraudulent-payments-topic")

  val config = new java.util.Properties
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-filtering-app")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
  streams.start()
}

ksqlDB is easier to use, and Kafka Streams is more flexible. Which technology you choose for your real-time streaming applications depends on a number of considerations. Keep in mind that you can use both ksqlDB and Kafka Streams together in your implementations.

Also, you can implement custom logic and aggregations in your ksqlDB applications by implementing user defined functions (UDFs) in Java.

Differences Between ksqlDB and Kafka Streams

The following table summarizes some of the differences between ksqlDB and Kafka Streams.

Differences ksqlDB Kafka Streams
You write: SQL statements JVM applications
Graphical UI Yes, in Confluent Control Center and Confluent Cloud No
Console Yes No
Data formats Avro, Protobuf, JSON, JSON_SR, CSV Any data format, including Avro, JSON, CSV, Protobuf, XML
REST API included Yes No, but you can implement your own
Runtime included Yes, the ksqlDB server Applications run as standard JVM processes

ksqlDB Language Elements

Like traditional relational databases, ksqlDB supports two categories of statements: Data Definition Language (DDL) and Data Manipulation Language (DML).

These categories are similar in syntax, data types, and expressions, but they have different functions in ksqlDB Server.

Data Definition Language (DDL) Statements

Imperative verbs that define metadata on the ksqlDB Server by adding, changing, or deleting streams and tables. Data Definition Language statements modify metadata only and don’t operate on data. You can use these statements with declarative DML statements.

The DDL statements include:

  • CREATE STREAM
  • CREATE TABLE
  • DROP STREAM
  • DROP TABLE
  • CREATE STREAM AS SELECT (CSAS)
  • CREATE TABLE AS SELECT (CTAS)

Data Manipulation Language (DML) Statements

Declarative verbs that read and modify data in ksqlDB streams and tables. Data Manipulation Language statements modify data only and don’t change metadata. The ksqlDB engine compiles DML statements into Kafka Streams applications, which run on a Kafka cluster like any other Kafka Streams application.

The DML statements include:

  • SELECT
  • INSERT INTO
  • INSERT INTO VALUES
  • CREATE STREAM AS SELECT (CSAS)
  • CREATE TABLE AS SELECT (CTAS)

The CSAS and CTAS statements occupy both categories, because they perform both a metadata change, like adding a stream, and they manipulate data, by creating a derivative of existing records.

ksqlDB Deployment Modes

You can deploy your ksqlDB streaming applications using either Interactive or Headless mode. We recommend using interactive mode when possible.

In both deployment modes, ksqlDB enables distributing the processing load for your ksqlDB applications across all ksqlDB Server instances, and you can add more ksqlDB Server instances without restarting your applications.

Note

All servers that run in a ksqlDB cluster must use the same deployment mode.

Interactive Deployment

When you deploy a ksqlDB server in interactive mode, the REST interface is available for the ksqlDB CLI and Confluent Control Center to connect to. This allows you to add and remove persistent queries without restarting the servers.

Diagram showing interactive ksqlDB deployment

Diagram showing interactive ksqlDB deployment

In interactive mode, you can:

  • Write statements and queries on the fly
  • Start any number of server nodes dynamically: <path-to-confluent>/bin/ksql-server-start
  • Start one or more CLIs or REST Clients and point them to a server: <path-to-confluent>/bin/ksql https://<ksql-server-ip-address>:8090

Command Topic

In interactive mode, ksqlDB shares statements with servers in the cluster over the command topic. The command topic stores every SQL statement, along with some metadata that ensures the statements are built compatibly across ksqlDB restarts and upgrades. ksqlDB names the command topic _confluent-ksql-<service id>command_topic, where <service id> is the value in the ksql.service.id property.

By convention, the ksql.service.id property should end with a separator character of some form, for example a dash or underscore, as this makes the topic name easier to read.

Note

In Confluent Cloud, the Command Topic is managed by Confluent and not visible in the Kafka cluster. Previous versions of ksqlDB in Confluent Cloud created a Command Topic in user-owned Kafka clusters, but this isn’t the case for newly created ksqlDB clusters.

Headless Deployment

When you deploy a ksqlDB Server in headless mode, the REST interface isn’t available, so you assign workloads to ksqlDB clusters by using a SQL file. This can be useful if you want to completely lock down the set of persistent queries that ksqlDB will run. The SQL file contains the SQL statements and queries that define your application.

Diagram showing headless ksqlDB deployment

Diagram showing headless ksqlDB deployment

In headless mode you can:

  • Start any number of server nodes
  • Pass a SQL file with SQL statements to execute: <path-to-confluent>bin/ksql-node query-file=path/to/myquery.sql
  • Ensure resource isolation
  • Leave resource management to dedicated systems, like Kubernetes

Note

Headless mode deployments don’t have a command topic.

Config Topic

In headless mode, you supply SQL statements to each server in its SQL file. But ksqlDB still needs to store some internal metadata to ensure that it builds queries compatibly across restarts and upgrades. ksqlDB stores this metadata in an internal topic called the config topic. ksqlDB names the config topic _confluent-ksql-<service id>_configs, where <service id> is the value in the ksql.service.id property.

Supported Operations in Headless and Interactive Modes

The following table shows which SQL operations are supported in headless and interactive deployments.

SQL Operation Interactive ksqlDB Headless ksqlDB
Describe a stream or table, including runtime stats (DESCRIBE, DESCRIBE EXTENDED) Supported Not Supported
Explain a query, including runtime stats (EXPLAIN) Supported Not Supported
CREATE a stream or table Supported Supported
DROP a stream or table Supported Not Supported
List existing streams and tables (SHOW STREAMS, SHOW TABLES) Supported Not Supported
List running queries (SHOW QUERIES) Supported Not Supported
Describe existing streams and tables (DESCRIBE STREAMS, DESCRIBE TABLES) Supported Not Supported
Operations on connectors (CREATE CONNECTOR, DROP CONNECTOR, SHOW CONNECTORS) Supported Not Supported
Run a script (RUN SCRIPT) Supported Not Supported
Set query properties (SET) Supported Supported
Show contents of a Kafka topic (PRINT) Supported Not Supported
Show contents of a stream or table (SELECT) Supported Not Supported
Show properties of a query (SHOW PROPERTIES) Supported Not Supported
Show results of a query (SELECT) Supported Not Supported
PAUSE / RESUME a persistent query Supported Not Supported
TERMINATE a query Supported Not Supported
Start and stop a ksqlDB Server instance Not with ksqlDB API Not with ksqlDB API
Cleanup and delete internal data (internal topics) of a ksqlDB cluster or application Supported (ksqlDB REST API) Not with ksqlDB API

Note

You can perform operations listed as “Not with ksqlDB API” manually. Also, you can use deployment tools, like Kubernetes or Ansible, and you can use the Kafka tools, like kafka-delete-records.

Dedicating Resources

Join ksqlDB engines to the same service pool by using the ksql.service.id property. The following diagram shows a Kafka cluster with separate workloads for a finance pool and a sales pool. For more information, see ksql.service.id.

Diagram showing how to join ksqlDB engines to the same service pool

Diagram showing how to join ksqlDB engines to the same service pool

To scale out, just add more ksqlDB server instances. There’s no master node or coordination among them required. For more information, see Capacity Planning.

ksqlDB Query Lifecycle

To create a streaming application with ksqlDB, you write SQL statements and queries. Each statement and query has a lifecycle with the following steps:

  1. You register a ksqlDB stream or table from an existing Kafka topic with a DDL statement, like CREATE STREAM <my-stream> WITH <topic-name>.
  2. You express your app by using a SQL statement, like CREATE TABLE AS SELECT FROM <my-stream>.
  3. ksqlDB parses your statement into an abstract syntax tree (AST).
  4. ksqlDB uses the AST and creates the logical plan for your statement.
  5. ksqlDB uses the logical plan and creates the physical plan for your statement.
  6. ksqlDB generates and runs the Kafka Streams application.
  7. You manage the application as a STREAM or TABLE with its corresponding persistent query.
Diagram showing how the ksqlDB query lifecycle for a SQL statement

Diagram showing how the ksqlDB query lifecycle for a SQL statement

Register the stream

Register a stream or table by using the DDL statements, CREATE STREAM and CREATE TABLE. For example, the following SQL statement creates a stream named authorization_attempts that’s backed by a topic named authorizations.

CREATE STREAM authorization_attempts
  (card_number VARCHAR, attemptTime BIGINT, ...)
  WITH (kafka_topic='authorizations', value_format=‘JSON');

ksqlDB writes DDL and DML statements to the command topic. Each ksqlDB Server reads the statement from the command topic, parsing and analyzing it.

Diagram showing deployment of a SQL file to a command topic

Diagram showing deployment of a SQL file to a command topic

The CREATE STREAM statement is a DDL statement, so the action is to update the ksqlDB metadata.

Each ksqlDB Server has an internal, in-memory metadata store, or metastore, that it builds as it receives DDL statements. The metastore is an in-memory map. For each new DDL statement, the ksqlDB engine adds an entry to the metastore.

For example, the metastore entry for the previous CREATE STREAM statement might resemble:

Source Name Structured Data Source
AUTHORIZATION_ATTEMPTSMPTS [DataSourceType: STREAM], [Schema:(card_number VARCHAR, attemptTime BIGINT, attemptRegion VARCHAR, …)], [Key: null], [KsqlTopic: AUTHORIZATIONS], …

The ksqlDB metastore is implemented in the io.confluent.ksql.metastore package.

Express Your Application as a SQL Statement

Now that you have a stream, express your application’s business logic by using a SQL statement. The following DML statement creates a possible_fraud table from the authorization_attempts stream:

CREATE TABLE possible_fraud AS
  SELECT card_number, count(*)
  FROM authorization_attempts
  WINDOW TUMBLING (SIZE 5 SECONDS)
  WHERE region = ‘west'
  GROUP BY card_number
  HAVING count(*) > 3
  EMIT CHANGES;

The ksqlDB engine translates the DML statement into a Kafka Streams application. The application reads the source topic continuously, and whenever the count(*) > 3 condition is met, it writes records to the possible_fraud table.

ksqlDB Parses Your Statement

To express your DML statement as a Kafka Streams application, the ksqlDB engine starts by parsing the statement. The parser creates an abstract syntax tree (AST). The ksqlDB engine uses the AST to plan the query.

The SQL statement parser is based on ANTLR and is implemented in the io.confluent.ksql.parser package.

ksqlDB Creates the Logical Plan

The ksqlDB engine creates the logical plan for the query by using the AST. For the previous possible_fraud statement, the logical plan has the following steps:

  1. Define the source – FROM node
  2. Apply the filter – WHERE clause
  3. Apply aggregation – GROUP BY
  4. Project – WINDOW
  5. Apply post-aggregation filter – HAVING, applied to the result of GROUP BY
  6. Project – for the result
Diagram showing how the ksqlDB engine creates a logical plan for a SQL statement

Diagram showing how the ksqlDB engine creates a logical plan for a SQL statement

ksqlDB Creates the Physical Plan

From the logical plan, the ksqlDB engine creates the physical plan, which is a Kafka Streams DSL application with a schema.

The generated code is based on the ksqlDB classes, SchemaKStream and SchemaKTable:

The ksqlDB engine traverses the nodes of the logical plan and emits corresponding Kafka Streams API calls:

  1. Define the source – a SchemaKStream or SchemaKTable with info from the ksqlDB metastore
  2. Filter – produces another SchemaKStream
  3. Project – select() method
  4. Apply aggregation – Multiple steps: rekey(), groupby(), and aggregate() methods. ksqlDB may re-partition data if it’s not keyed with a GROUP BY phrase.
  5. Filter – filter() method
  6. Project – select() method for the result
Diagram showing how the ksqlDB engine creates a physical plan for a SQL statement

Diagram showing how the ksqlDB engine creates a physical plan for a SQL statement

If the DML statement is CREATE STREAM AS SELECT or CREATE TABLE AS SELECT, the result from the generated Kafka Streams application is a persistent query that writes continuously to its output topic until the query is terminated.

Developer Workflows

There are different workflows for ksqlDB and Kafka Streams when you develop streaming applications.

  • ksqlDB: You write SQL queries interactively and view the results in real-time, either in the ksqlDB CLI or in Confluent Control Center.
  • Kafka Streams: You write code in Java or Scala, recompile, and run and test the application in an IDE, like IntelliJ. You deploy the application to production as a jar file that runs in a Kafka cluster.

ksqlDB and Kafka Streams: Where to Start?

Use the following table to help you decide between ksqlDB and Kafka Streams as a starting point for your real-time streaming application development.

Start with ksqlDB when… Start with Kafka Streams when…
New to streaming and Kafka Prefer writing and deploying JVM applications like Java and Scala; for example, due to people skills, tech environment
To quicken and broaden the adoption and value of Kafka in your organization Use case is not naturally expressible through SQL, for example, finite state machines
Prefer an interactive experience with UI and CLI Building microservices
Prefer SQL to writing code in Java or Scala Must integrate with external services, or use 3rd-party libraries (but ksqlDB user defined functions(UDFs) may help)
Use cases include enriching data; joining data sources; filtering, transforming, and masking data; identifying anomalous events To customize or fine-tune a use case, for example, with the Kafka Streams Processor API: custom join variants, or probabilistic counting at very large scale with Count-Min Sketch
Use case is naturally expressible by using SQL, with optional help from UDFs Need queryable state, which ksqlDB doesn’t support
Want the power of Kafka Streams but you aren’t on the JVM: use the ksqlDB REST API from Python, Go, C#, JavaScript, shell  

Usually, ksqlDB isn’t a good fit for BI reports, ad-hoc querying, or queries with random access patterns, because it’s a continuous query system on data streams.

Next steps