You can use KSQL to build event streaming applications from Apache Kafka® topics by using only SQL statements and queries. KSQL is built on Kafka Streams, so a KSQL application communicates with a Kafka cluster like any other Kafka Streams application.
KSQL has these main components:
- KSQL engine – processes KSQL statements and queries
- REST interface – enables client access to the engine
- KSQL CLI – console that provides a command-line interface (CLI) to the engine
- KSQL UI – enables developing KSQL applications in Confluent Control Center
KSQL Server comprises the KSQL engine and the REST API. KSQL Server instances communicate with the Kafka cluster, and you can add more of them as necessary without restarting your applications.
- KSQL Engine
The KSQL engine executes KSQL statements and queries. You define your application logic by writing KSQL statements, and the engine builds and runs the application on available KSQL servers. Each KSQL server instance runs a KSQL engine. Under the hood, the engine parses your KSQL statements and builds corresponding Kafka Streams topologies.
The KSQL engine is implemented in the KsqlEngine.java class.
- KSQL CLI
The KSQL CLI provides a console with a command-line interface for the KSQL engine. Use the KSQL CLI to interact with KSQL Server instances and develop your streaming applications. The KSQL CLI is designed to be familiar to users of MySQL, Postgres, and similar applications.
The KSQL CLI is implemented in the io.confluent.ksql.cli package.
- REST Interface
The REST server interface enables communicating with the KSQL engine from the CLI, Confluent Control Center, or from any other REST client. For more information, see KSQL REST API Reference.
The KSQL REST server is implemented in the KsqlRestApplication.java class.
When you deploy your KSQL application, it runs on KSQL Server instances that are independent of one another, are fault-tolerant, and can scale elastically with load. For more information, see KSQL Deployment Modes.
KSQL and Kafka Streams¶
KSQL is built on Kafka Streams, a robust stream processing framework that is part of Apache Kafka. You can use KSQL and Kafka Streams together in your event streaming applications. For more information on their relationship, see KSQL and Kafka Streams. For more information on Kafka Streams, see Streams Architecture.
Also, you can implement custom logic and aggregations in your KSQL applications by implementing User Defined Functions in Java. For more information, see KSQL Custom Function Reference (UDF, UDAF and UDTF).
KSQL Language Elements¶
Like traditional relational databases, KSQL supports two categories of statements: Data Definition Language (DDL) and Data Manipulation Language (DML).
These categories are similar in syntax, data types, and expressions, but they have different functions in KSQL Server.
- Data Definition Language (DDL) Statements
Imperative verbs that define metadata on the KSQL server by adding, changing, or deleting streams and tables. Data Definition Language statements modify metadata only and don’t operate on data. You can use these statements with declarative DML statements.
The DDL statements include:
- CREATE STREAM
- CREATE TABLE
- DROP STREAM
- DROP TABLE
- CREATE STREAM AS SELECT (CSAS)
- CREATE TABLE AS SELECT (CTAS)
- Data Manipulation Language (DML) Statements
Declarative verbs that read and modify data in KSQL streams and tables. Data Manipulation Language statements modify data only and don’t change metadata. The KSQL engine compiles DML statements into Kafka Streams applications, which run on a Kafka cluster like any other Kafka Streams application.
The DML statements include:
- INSERT INTO
- CREATE STREAM AS SELECT (CSAS)
- CREATE TABLE AS SELECT (CTAS)
The CSAS and CTAS statements occupy both categories, because they perform both a metadata change, like adding a stream, and they manipulate data, by creating a derivative of existing records.
For more information, see KSQL Syntax Reference.
KSQL Deployment Modes¶
You can use these modes to deploy your KSQL streaming applications:
- Interactive – data exploration and pipeline development
- Headless – long-running production environments
In both deployment modes, KSQL enables distributing the processing load for your KSQL applications across all KSQL Server instances, and you can add more KSQL Server instances without restarting your applications.
All servers that run in a cluster must use the same deployment mode.
Use the interactive mode to develop your KSQL applications. When you deploy a KSQL server in interactive mode, the REST interface is available for the KSQL CLI and Confluent Control Center to connect to.
In interactive mode, you can:
- Write statements and queries on the fly
- Start any number of server nodes:
- Start one or more CLIs or REST Clients and point them to a server:
In interactive mode, KSQL shares statements with servers in the cluster over the
command topic. The command topic stores every KSQL statement, along with some
metadata that ensures the statements are built compatibly across KSQL restarts
and upgrades. KSQL names the command topic
<service id> is the value in the
By convention, the
ksql.service.id property should end with a separator character of some form,
for example a dash or underscore, as this makes the topic name easier to read.
Use headless mode to deploy your KSQL application to a production environment. When you deploy a KSQL server in headless mode, the REST interface isn’t available, so you assign workloads to KSQL servers by using a SQL file. The SQL file contains the KSQL statements and queries that define your application. Headless mode is ideal for streaming ETL application deployments.
In headless mode you can:
- Start any number of server nodes
- Pass a SQL file with KSQL statements to execute:
- Version-control your queries and transformations as code
- Ensure resource isolation
- Leave resource management to dedicated systems, like Kubernetes
Headless mode deployments don’t have a command topic.
In headless mode, you supply KSQL statements to each server in its SQL file. However
KSQL still needs to store some internal metadata to ensure that it builds queries
compatibly across restarts and upgrades. KSQL stores this metadata in an internal topic
called the config topic. KSQL names the config topic
<service id> is the value in the
Supported Operations in Headless and Interactive Modes¶
The following table shows which KSQL operations are supported in headless and interactive deployments.
|KSQL Operation||Interactive KSQL||Headless KSQL|
|Describe a stream or table, including runtime stats (DESCRIBE, DESCRIBE EXTENDED)||Supported||Not Supported|
|Explain a query, including runtime stats (EXPLAIN)||Supported||Not Supported|
|CREATE a stream or table||Supported||Supported|
|DROP a stream or table||Supported||Not Supported|
|List existing streams and tables (SHOW STREAMS, SHOW TABLES)||Supported||Not Supported|
|List running queries (SHOW QUERIES)||Supported||Not Supported|
|Run a script (RUN SCRIPT)||Supported||Not Supported|
|Set query properties (SET)||Supported||Supported|
|Show contents of a Kafka topic (PRINT)||Supported||Not Supported|
|Show contents of a stream or table (SELECT)||Supported||Not Supported|
|Show properties of a query (SHOW PROPERTIES)||Supported||Not Supported|
|Show results of a query (SELECT)||Supported||Not Supported|
|TERMINATE a query||Supported||Not Supported|
|Start and stop a KSQL Server instance||Not with KSQL API||Not with KSQL API|
|Cleanup and delete internal data (internal topics) of a KSQL cluster or application||Supported (KSQL REST API)||Not with KSQL API|
You can perform operations listed as “Not with KSQL API” manually. Also,
you can use deployment tools, like Kubernetes or Ansible, and you can use
the Kafka tools, like
Join KSQL engines to the same service pool by using the
property. The following diagram shows a Kafka cluster with separate workloads
finance pool and a
sales pool. For more information, see
To scale out, just add more KSQL server instances. There’s no master node or coordination among them required. For more information, see KSQL Capacity Planning.
KSQL Query Lifecycle¶
To create a streaming application with KSQL, you write KSQL statements and queries. Each statement and query has a lifecycle with the following steps:
- You register a KSQL stream or table
from an existing Kafka topic with a DDL statement, like
CREATE STREAM <my-stream> WITH <topic-name>.
- You express your app by using a
KSQL statement, like
CREATE TABLE AS SELECT FROM <my-stream>.
- KSQL parses your statement into an abstract syntax tree (AST).
- KSQL uses the AST and creates the logical plan for your statement.
- KSQL uses the logical plan and creates the physical plan for your statement.
- KSQL generates and runs the Kafka Streams application.
- You manage the application as a STREAM or TABLE with its corresponding persistent query.
Register the Stream¶
Register a stream or table by using the DDL statements, CREATE STREAM and
CREATE TABLE. For example, the following KSQL statement creates a stream named
authorization_attempts that’s backed by a topic named
CREATE STREAM authorization_attempts (card_number VARCHAR, attemptTime BIGINT, ...) WITH (kafka_topic='authorizations', value_format=‘JSON’);
KSQL writes DDL and DML statements to the command topic. Each KSQL Server reads the statement from the command topic, parsing and analyzing it.
The CREATE STREAM statement is a DDL statement, so the action is to update the KSQL metadata.
Each KSQL server has an internal, in-memory metadata store, or metastore, that it builds as it receives DDL statements. The metastore is an in-memory map. For each new DDL statement, the KSQL engine adds an entry to the metastore.
For example, the metastore entry for the previous CREATE STREAM statement might resemble:
|Source Name||Structured Data Source|
|AUTHORIZATION_ATTEMPTS||[DataSourceType: STREAM], [Schema:(card_number VARCHAR, attemptTime BIGINT, attemptRegion VARCHAR, …)], [Key: null], [KsqlTopic: AUTHORIZATIONS], …|
The KSQL metastore is implemented in the io.confluent.ksql.metastore package.
Express Your Application as a KSQL Statement¶
Now that you have a stream, express your application’s business logic by using
a KSQL statement. The following DML statement creates a
CREATE TABLE possible_fraud AS SELECT card_number, count(*) FROM authorization_attempts WINDOW TUMBLING (SIZE 5 SECONDS) WHERE region = ‘west’ GROUP BY card_number HAVING count(*) > 3 EMIT CHANGES;
The KSQL engine translates the DML statement into a Kafka Streams application.
The application reads the source topic continuously, and whenever the
count(*) > 3 condition is met, it writes records to the
KSQL Parses Your Statement¶
To express your DML statement as a Kafka Streams application, the KSQL engine starts by parsing the statement. The parser creates an abstract syntax tree (AST). The KSQL engine uses the AST to plan the query.
KSQL Creates the Logical Plan¶
The KSQL engine creates the logical plan for the query by using the AST. For
possible_fraud statement, the logical plan has the following
- Define the source – FROM node
- Apply the filter – WHERE clause
- Apply aggregation – GROUP BY
- Project – WINDOW
- Apply post-aggregation filter – HAVING, applied to the result of GROUP BY
- Project – for the result
KSQL Creates the Physical Plan¶
From the logical plan, the KSQL engine creates the physical plan, which is a Kafka Streams DSL application with a schema.
The generated code is based on the KSQL classes,
- A KSQL stream is rendered as a SchemaKStream instance, which is a KStream with a Schema.
- A KSQL table is rendered as a SchemaKTable instance, which is a KTable with a Schema.
- Schema awareness is provided by the SchemaRegistryClient class.
The KSQL engine traverses the nodes of the logical plan and emits corresponding Kafka Streams API calls:
- Define the source – a
SchemaKTablewith info from the KSQL metastore
- Filter – produces another
- Project –
- Apply aggregation – Multiple steps:
aggregate()methods. KSQL may re-partition data if it’s not keyed with a GROUP BY phrase.
- Filter –
- Project –
select()method for the result
If the DML statement is CREATE STREAM AS SELECT or CREATE TABLE AS SELECT, the result from the generated Kafka Streams application is a persistent query that writes continuously to its output topic until the query is terminated.