Handle Multiple Event Types with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink®️ simplifies processing events of multiple types with a single table.
This guide shows how to use schema references to process purchase
and
pageview
events using schema references. You will see how different types of
events can coexist in a single table while maintaining their distinct structures.
- Step 1: Create a cluster
- Step 2: Create a topic
- Step 3: Create schemas
- Step 4: Produce messages to topic
- Step 5: Query inferred table
Prerequisites¶
- Access to Confluent Cloud.
- Confluent CLI installed.
Step 1: Create a cluster¶
If you have a Kafka cluster already, you can skip to Step 2.
Log in to Confluent Cloud.
confluent login --save
Use an existing environment or create a new one.
confluent environment list
Your output should resemble:
Current | ID | Name | Stream Governance Package ----------+----------------+---------+---------------------------- * | env-<id_1> | testing | ESSENTIALS | env-<id_2> | default | ADVANCED
Run the following command to create a new environment.
confluent environment create env-<name>
Your output should resemble:
+---------------------------+----------------+ | Current | false | | ID | env-<id_3> | | Name | env-<name> | | Stream Governance Package | | +---------------------------+----------------+
Run the following command to set the current environment.
confluent environment use env-<env_id>
Your output should resemble:
Using environment "env-<env_id>".
Use an existing cluster or create a new one.
confluent kafka cluster list
Your output should resemble:
Current | ID | Name | Type | Cloud | Region | Availability | Status --------+------------------+----------------+------+---------+----------+--------------+-------- * | lkc-<cluster_id> | <cluster_name> | ... | <cloud> | <region> | .... | UP
Run the following command to create a new Kafka cluster.
confluent kafka cluster create <cluster_name> [flags]
Your output should resemble:
+----------------------+------------------+ | Current | false | | ID | lkc-<cluster_id> | | Name | <cluster_name> | | Type | ... | | Cluster Size | ... | | Ingress Limit (MB/s) | ... | | Egress Limit (MB/s) | ... | | Storage | ... | | Cloud | <cloud> | | Region | <region> | | Availability | ... | | Status | PROVISIONING | | Endpoint | | | REST Endpoint | | +----------------------+------------------+
Run the following command to set the current cluster.
confluent kafka cluster use <cluster_id>
Your output should resemble:
Set Kafka cluster "lkc-<cluster_id>" as the active cluster for environment "env-<env_id>".
Step 2: Create a topic¶
Create a topic named
customer-events
for the data.confluent kafka topic create customer-events
Your output should resemble:
Created topic "customer-events".
Step 3: Create schemas¶
Create a schema for purchase events.
cat > purchase.avsc <<- 'EOF' { "type":"record", "namespace": "io.confluent.developer.avro", "name":"Purchase", "fields": [ {"name": "item", "type":"string"}, {"name": "amount", "type": "double"}, {"name": "customer_id", "type": "string"} ] } EOF
cat > purchase.proto <<- 'EOF' syntax = "proto3"; package io.confluent.developer.proto; message Purchase { string item = 1; double amount = 2; string customer_id = 3; } EOF
cat > purchase.json <<- 'EOF' { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Purchase", "type": "object", "properties": { "item": { "type": "string" }, "amount": { "type": "number" }, "customer_id": { "type": "string" } }, "required": ["item", "amount", "customer_id"] } EOF
Create a schema for pageview events.
cat > pageview.avsc <<- 'EOF' { "type":"record", "namespace": "io.confluent.developer.avro", "name":"Pageview", "fields": [ {"name": "url", "type":"string"}, {"name": "is_special", "type": "boolean"}, {"name": "customer_id", "type": "string"} ] } EOF
cat > pageview.proto <<- 'EOF' syntax = "proto3"; package io.confluent.developer.proto; message Pageview { string url = 1; bool is_special = 2; string customer_id = 3; } EOF
cat > pageview.json <<- 'EOF' { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Pageview", "type": "object", "properties": { "url": { "type": "string" }, "is_special": { "type": "boolean" }, "customer_id": { "type": "string" } }, "required": ["url", "is_special", "customer_id"] } EOF
Create a value schema for the
customer-events
topic that references the schemas for purchase and pageview events. To learn more about schema references, see Schema References.cat > customer-events-value.avsc <<- 'EOF' [ "io.confluent.developer.avro.Purchase", "io.confluent.developer.avro.Pageview" ] EOF
cat > customer-events-value.proto <<- 'EOF' syntax = "proto3"; package io.confluent.developer.proto; import "purchase.proto"; import "pageview.proto"; message CustomerEvent { oneof action { Purchase purchase = 1; Pageview pageview = 2; } } EOF
cat > customer-events-value.json <<- 'EOF' { "$schema": "http://json-schema.org/draft-07/schema#", "title": "CustomerEvent", "type": "object", "oneOf": [ { "$ref": "io.confluent.developer.json.Purchase" }, { "$ref": "io.confluent.developer.json.Pageview" } ] } EOF
Create a file named
purchase-pageview-references
that contains the subjects and versions for the schema references.cat > purchase-pageview-references <<- 'EOF' [ { "name": "io.confluent.developer.avro.Purchase", "subject": "purchase", "version": 1 }, { "name": "io.confluent.developer.avro.Pageview", "subject": "pageview", "version": 1 } ] EOF
cat > purchase-pageview-references <<- 'EOF' [ { "name": "purchase.proto", "subject": "purchase", "version": 1 }, { "name": "pageview.proto", "subject": "pageview", "version": 1 } ] EOF
cat > purchase-pageview-references <<- 'EOF' [ { "name": "io.confluent.developer.json.Purchase", "subject": "purchase", "version": 1 }, { "name": "io.confluent.developer.json.Pageview", "subject": "pageview", "version": 1 } ] EOF
Run the following commands to register the schemas.
confluent schema-registry schema create \ --type avro \ --subject purchase \ --schema purchase.avsc confluent schema-registry schema create \ --type avro \ --subject pageview \ --schema pageview.avsc confluent schema-registry schema create \ --type avro \ --subject customer-events-value \ --schema customer-events-value.avsc \ --references purchase-pageview-references
For each command, your output should resemble:
Successfully registered schema with ID … .
confluent schema-registry schema create \ --type protobuf \ --subject purchase \ --schema purchase.proto confluent schema-registry schema create \ --type protobuf \ --subject pageview \ --schema pageview.proto confluent schema-registry schema create \ --type protobuf \ --subject customer-events-value \ --schema customer-events.proto \ --references purchase-pageview-references
For each command, your output should resemble:
Successfully registered schema with ID … .
confluent schema-registry schema create \ --type json \ --subject purchase \ --schema purchase.json confluent schema-registry schema create \ --type json \ --subject pageview \ --schema pageview.json confluent schema-registry schema create \ --type json \ --subject customer-events-value \ --schema customer-events.json \ --references purchase-pageview-references
For each command, your output should resemble:
Successfully registered schema with ID … .
Step 4: Produce messages to topic¶
Create an API key for the cluster and store it. The API key is used to authenticate the producer with the cluster.
confluent api-key create --resource lkc-<cluster_id>
Your output should resemble:
+-------------+------------------------------------------------------------------+ | API Key | 12A3BCDEFGHI4JKL | | API Secret | aB+c12dEfghiJkLMNopqr3StUVWxyzabCdEFGHiJ4kL5mnop6QrS78TUVwxyzaB9 | +-------------+------------------------------------------------------------------+
Run the following store the API key and secret.
confluent api-key store --resource lkc-<cluster_id> <key> <secret>
Your output should resemble:
Stored secret for API key "12A3BCDEFGHI4JKL".
Run the following command to use the API key.
confluent api-key use <key>
Your output should resemble:
Using API Key "12A3BCDEFGHI4JKL".
Create an API key for Schema Registry. The API key is used to authenticate the producer with Schema Registry.
confluent api-key create --resource lsrc-<schema_registry_id>
Your output should resemble:
+-------------+------------------------------------------------------------------+ | API Key | AB1CDEF2GHI3J4KL | | API Secret | j3Am6e+loCkCJUQ43iq9Es1z5KO7kKZQGmBvjg7jombv1PR0kxCvjsh6IDrz9LHY | +-------------+------------------------------------------------------------------+
Open a new console and run the following command to start a Kafka producer for purchase messages with the id of the schema stored in Schema Registry. Replace
<purchase_schema_id>
,<sr_endpoint>
,<sr_api_key>
, and<sr_api_secret>
with your actual values.confluent kafka topic produce customer-events \ --value-format avro \ --schema-id <purchase_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
confluent kafka topic produce customer-events \ --value-format protobuf \ --schema-id <purchase_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
confluent kafka topic produce customer-events \ --value-format json \ --schema-id <purchase_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
Copy and paste the following
purchase
messages in the console.{"io.confluent.developer.avro.Purchase":{"item": "apple", "amount": 9.99, "customer_id": "u-21"}} {"io.confluent.developer.avro.Purchase":{"item": "jam", "amount": 4.29, "customer_id": "u-67"}} {"io.confluent.developer.avro.Purchase":{"item": "mango", "amount": 13.99, "customer_id": "u-67"}} {"io.confluent.developer.avro.Purchase":{"item": "socks", "amount": 7.99, "customer_id": "u-123"}}
{"purchase":{"item": "apple", "amount": 9.99, "customer_id": "u-21"}} {"purchase":{"item": "jam", "amount": 4.29, "customer_id": "u-67"}} {"purchase":{"item": "mango", "amount": 13.99, "customer_id": "u-67"}} {"purchase":{"item": "socks", "amount": 7.99, "customer_id": "u-123"}}
{"io.confluent.developer.json.Purchase":{"item": "apple", "amount": 9.99, "customer_id": "u-21"}} {"io.confluent.developer.json.Purchase":{"item": "jam", "amount": 4.29, "customer_id": "u-67"}} {"io.confluent.developer.json.Purchase":{"item": "mango", "amount": 13.99, "customer_id": "u-67"}} {"io.confluent.developer.json.Purchase":{"item": "socks", "amount": 7.99, "customer_id": "u-123"}}
Open a new console and run the following command to start a Kafka producer for pageview messages with the id of the schema stored in Schema Registry. Replace
<pageview_schema_id>
,<sr_endpoint>
,<sr_api_key>
, and<sr_api_secret>
with your actual values.confluent kafka topic produce customer-events \ --value-format avro \ --schema-id <pageview_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
confluent kafka topic produce customer-events \ --value-format protobuf \ --schema-id <pageview_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
confluent kafka topic produce customer-events \ --value-format json \ --schema-id <pageview_schema_id> \ --schema-registry-endpoint <sr_endpoint> \ --schema-registry-api-key <sr_api_key> \ --schema-registry-api-secret <sr_api_secret>
Copy and paste the following
pageview
messages in the console.{"io.confluent.developer.avro.Pageview":{ "url": "https://www.confluent.io", "is_special": true, "customer_id": "u-67"}} {"io.confluent.developer.avro.Pageview":{ "url": "http://www.cflt.io", "is_special": false, "customer_id": "u-12"}}
{"pageview":{ "url": "https://www.confluent.io", "is_special": true, "customer_id": "u-67"}} {"pageview":{ "url": "http://www.cflt.io", "is_special": false, "customer_id": "u-12"}}
{"io.confluent.developer.json.Pageview":{ "url": "https://www.confluent.io", "is_special": true, "customer_id": "u-67"}} {"io.confluent.developer.json.Pageview":{ "url": "http://www.cflt.io", "is_special": false, "customer_id": "u-12"}}
Step 5: Query inferred table¶
Return to your first console. Create a new compute pool or use an existing one.
confluent flink compute-pool list
Your output should resemble:
Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status --------+----------------+-------------+--------------+-------------+---------+---------+----------+------------ | lfcp-<pool_id> | <pool_name> | env-<env_id> | 0 | .. | <cloud> | <region> | PROVISIONED
Run the following command to create a new Flink compute pool.
confluent flink compute-pool create <pool_name> --cloud <cloud> --region <region> --max-cfu 5
Your output should resemble:
+-------------+-----------------+ | Current | false | | ID | lfcp-<pool_id> | | Name | <pool_name> | | Environment | env-<env_id> | | Current CFU | 0 | | Max CFU | 5 | | Cloud | <cloud> | | Region | <region> | | Status | PROVISIONING | +-------------+-----------------+
Run the following command to set the current compute pool.
confluent flink compute-pool use lfcp-<pool_id>
Your output should resemble:
Using Flink compute pool "lfcp-<pool_id>".
Start the Flink shell.
confluent flink shell start
Analyze the
customer-events
table to see column names, data types, and other metadata.SHOW CREATE TABLE `customer-events`;
Your output should resemble:
CREATE TABLE `[catalog_name]`.`[database_name]`.`customer-events` ( `key` VARBINARY(2147483647), `Purchase` ROW<`item` VARCHAR(2147483647) NOT NULL, `amount` DOUBLE NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL>, `Pageview` ROW<`url` VARCHAR(2147483647) NOT NULL, `is_special` BOOLEAN NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL> ) DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS WITH ( 'changelog.mode' = 'append', 'connector' = 'confluent', 'kafka.cleanup-policy' = 'delete', 'kafka.max-message-size' = '2097164 bytes', 'kafka.retention.size' = '0 bytes', 'kafka.retention.time' = '7 d', 'key.format' = 'raw', 'scan.bounded.mode' = 'unbounded', 'scan.startup.mode' = 'earliest-offset', 'value.format' = '[VALUE_FORMAT]' )
Query the
customer-events
table to display both purchase and pageview events.SELECT * FROM `customer-events`;
Your output should resemble:
key Purchase Pageview x’’ (apple, 9.99, u-21) NULL x’’ (jam, 4.29, u-67) NULL x’’ (mango, 13.99, u-67) NULL x’’ (socks, 7.99, u-123) NULL x’’ NULL (https://www.confluent.io, TRUE, u-67) x’’ NULL (http://www.cflt.io, FALSE, u-12)
To unlock Confluent Cloud data containing events of multiple types with Flink, see Flink SQL Examples with Schema References.