Handle Multiple Event Types with Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® simplifies processing events of multiple types with a single table. This guide shows how to use schema references to process purchase and pageview events using schema references. You will see how different types of events can coexist in a single table while maintaining their distinct structures.

Prerequisites

  • Access to Confluent Cloud.
  • Confluent CLI installed.

Step 1: Create a cluster

If you have a Kafka cluster already, you can skip to Step 2.

  1. Log in to Confluent Cloud.

    confluent login --save
    
  2. Use an existing environment or create a new one.

    confluent environment list
    

    Your output should resemble:

      Current |       ID       |  Name   | Stream Governance Package
    ----------+----------------+---------+----------------------------
      *       | env-<id_1>     | testing | ESSENTIALS
              | env-<id_2>     | default | ADVANCED
    

    Run the following command to create a new environment.

    confluent environment create env-<name>
    

    Your output should resemble:

    +---------------------------+----------------+
    | Current                   | false          |
    | ID                        | env-<id_3>     |
    | Name                      | env-<name>     |
    | Stream Governance Package |                |
    +---------------------------+----------------+
    

    Run the following command to set the current environment.

    confluent environment use env-<env_id>
    

    Your output should resemble:

    Using environment "env-<env_id>".
    
  3. Use an existing cluster or create a new one.

    confluent kafka cluster list
    

    Your output should resemble:

    Current |       ID         |   Name         | Type | Cloud   | Region   | Availability | Status
    --------+------------------+----------------+------+---------+----------+--------------+--------
       *    | lkc-<cluster_id> | <cluster_name> | ...  | <cloud> | <region> | ....         | UP
    

    Run the following command to create a new Kafka cluster.

    confluent kafka cluster create <cluster_name> [flags]
    

    Your output should resemble:

    +----------------------+------------------+
    | Current              | false            |
    | ID                   | lkc-<cluster_id> |
    | Name                 | <cluster_name>   |
    | Type                 | ...              |
    | Cluster Size         | ...              |
    | Ingress Limit (MB/s) | ...              |
    | Egress Limit (MB/s)  | ...              |
    | Storage              | ...              |
    | Cloud                | <cloud>          |
    | Region               | <region>         |
    | Availability         | ...              |
    | Status               | PROVISIONING     |
    | Endpoint             |                  |
    | REST Endpoint        |                  |
    +----------------------+------------------+
    

    Run the following command to set the current cluster.

    confluent kafka cluster use <cluster_id>
    

    Your output should resemble:

    Set Kafka cluster "lkc-<cluster_id>" as the active cluster for environment "env-<env_id>".
    

Step 2: Create a topic

  1. Create a topic named customer-events for the data.

    confluent kafka topic create customer-events
    

    Your output should resemble:

    Created topic "customer-events".
    

Step 3: Create schemas

  1. Create a schema for purchase events.

    cat > purchase.avsc <<- 'EOF'
    {
       "type":"record",
       "namespace": "io.confluent.developer.avro",
       "name":"Purchase",
       "fields": [
          {"name": "item", "type":"string"},
          {"name": "amount", "type": "double"},
          {"name": "customer_id", "type": "string"}
       ]
    }
    EOF
    
  2. Create a schema for pageview events.

    cat > pageview.avsc <<- 'EOF'
    {
       "type":"record",
       "namespace": "io.confluent.developer.avro",
       "name":"Pageview",
       "fields": [
          {"name": "url", "type":"string"},
          {"name": "is_special", "type": "boolean"},
          {"name": "customer_id", "type":  "string"}
       ]
    }
    EOF
    
  3. Create a value schema for the customer-events topic that references the schemas for purchase and pageview events. To learn more about schema references, see Schema References.

    cat > customer-events-value.avsc <<- 'EOF'
    [
       "io.confluent.developer.avro.Purchase",
       "io.confluent.developer.avro.Pageview"
    ]
    EOF
    
  4. Create a file named purchase-pageview-references that contains the subjects and versions for the schema references.

    cat > purchase-pageview-references <<- 'EOF'
    [
       {
          "name": "io.confluent.developer.avro.Purchase",
          "subject": "purchase",
          "version": 1
       },
       {
          "name": "io.confluent.developer.avro.Pageview",
          "subject": "pageview",
          "version": 1
       }
    ]
    EOF
    
  5. Run the following commands to register the schemas.

    confluent schema-registry schema create \
       --type avro \
       --subject purchase \
       --schema purchase.avsc
    
    confluent schema-registry schema create \
       --type avro \
       --subject pageview \
       --schema pageview.avsc
    
    confluent schema-registry schema create \
       --type avro \
       --subject customer-events-value \
       --schema customer-events-value.avsc \
       --references purchase-pageview-references
    

    For each command, your output should resemble:

    Successfully registered schema with ID … .
    

Step 4: Produce messages to topic

  1. Create an API key for the cluster and store it. The API key is used to authenticate the producer with the cluster.

    confluent api-key create --resource lkc-<cluster_id>
    

    Your output should resemble:

    +-------------+------------------------------------------------------------------+
    | API Key     | 12A3BCDEFGHI4JKL                                                 |
    | API Secret  | aB+c12dEfghiJkLMNopqr3StUVWxyzabCdEFGHiJ4kL5mnop6QrS78TUVwxyzaB9 |
    +-------------+------------------------------------------------------------------+
    

    Run the following store the API key and secret.

    confluent api-key store --resource lkc-<cluster_id> <key> <secret>
    

    Your output should resemble:

    Stored secret for API key "12A3BCDEFGHI4JKL".
    

    Run the following command to use the API key.

    confluent api-key use <key>
    

    Your output should resemble:

    Using API Key "12A3BCDEFGHI4JKL".
    
  2. Create an API key for Schema Registry. The API key is used to authenticate the producer with Schema Registry.

    confluent api-key create --resource lsrc-<schema_registry_id>
    

    Your output should resemble:

    +-------------+------------------------------------------------------------------+
    | API Key     | AB1CDEF2GHI3J4KL                                                 |
    | API Secret  | j3Am6e+loCkCJUQ43iq9Es1z5KO7kKZQGmBvjg7jombv1PR0kxCvjsh6IDrz9LHY |
    +-------------+------------------------------------------------------------------+
    
  3. Open a new console and run the following command to start a Kafka producer for purchase messages with the id of the schema stored in Schema Registry. Replace <purchase_schema_id>, <sr_endpoint>, <sr_api_key>, and <sr_api_secret> with your actual values.

    confluent kafka topic produce customer-events \
       --value-format avro \
       --schema-id <purchase_schema_id> \
       --schema-registry-endpoint <sr_endpoint> \
       --schema-registry-api-key <sr_api_key> \
       --schema-registry-api-secret <sr_api_secret>
    
  4. Copy and paste the following purchase messages in the console.

    {"io.confluent.developer.avro.Purchase":{"item": "apple",  "amount": 9.99, "customer_id": "u-21"}}
    
    {"io.confluent.developer.avro.Purchase":{"item": "jam",  "amount": 4.29, "customer_id": "u-67"}}
    
    {"io.confluent.developer.avro.Purchase":{"item": "mango",  "amount": 13.99, "customer_id": "u-67"}}
    
    {"io.confluent.developer.avro.Purchase":{"item": "socks",  "amount": 7.99, "customer_id": "u-123"}}
    
  5. Open a new console and run the following command to start a Kafka producer for pageview messages with the id of the schema stored in Schema Registry. Replace <pageview_schema_id>, <sr_endpoint>, <sr_api_key>, and <sr_api_secret> with your actual values.

    confluent kafka topic produce customer-events \
       --value-format avro \
       --schema-id <pageview_schema_id> \
       --schema-registry-endpoint <sr_endpoint> \
       --schema-registry-api-key <sr_api_key> \
       --schema-registry-api-secret <sr_api_secret>
    
  6. Copy and paste the following pageview messages in the console.

    {"io.confluent.developer.avro.Pageview":{ "url": "https://www.confluent.io", "is_special": true, "customer_id": "u-67"}}
    
    {"io.confluent.developer.avro.Pageview":{ "url": "http://www.cflt.io", "is_special": false, "customer_id": "u-12"}}
    

Step 5: Query inferred table

  1. Return to your first console. Create a new compute pool or use an existing one.

    confluent flink compute-pool list
    

    Your output should resemble:

    Current | ID             | Name        | Environment  | Current CFU | Max CFU | Cloud   | Region   | Status
    --------+----------------+-------------+--------------+-------------+---------+---------+----------+------------
            | lfcp-<pool_id> | <pool_name> | env-<env_id> | 0           | ..      | <cloud> | <region> | PROVISIONED
    

    Run the following command to create a new Flink compute pool.

    confluent flink compute-pool create <pool_name> --cloud <cloud> --region <region> --max-cfu 5
    

    Your output should resemble:

    +-------------+-----------------+
    | Current     | false           |
    | ID          | lfcp-<pool_id>  |
    | Name        | <pool_name>     |
    | Environment | env-<env_id>    |
    | Current CFU | 0               |
    | Max CFU     | 5               |
    | Cloud       | <cloud>         |
    | Region      | <region>        |
    | Status      | PROVISIONING    |
    +-------------+-----------------+
    

    Run the following command to set the current compute pool.

    confluent flink compute-pool use lfcp-<pool_id>
    

    Your output should resemble:

    Using Flink compute pool "lfcp-<pool_id>".
    
  2. Start the Flink shell.

    confluent flink shell start
    
  3. Analyze the customer-events table to see column names, data types, and other metadata.

    SHOW CREATE TABLE `customer-events`;
    

    Your output should resemble:

    CREATE TABLE `[catalog_name]`.`[database_name]`.`customer-events` (
      `key` VARBINARY(2147483647),
      `Purchase` ROW<`item` VARCHAR(2147483647) NOT NULL, `amount` DOUBLE NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL>,
      `Pageview` ROW<`url` VARCHAR(2147483647) NOT NULL, `is_special` BOOLEAN NOT NULL, `customer_id` VARCHAR(2147483647) NOT NULL>
    )
    DISTRIBUTED BY HASH(`key`) INTO 2 BUCKETS
    WITH (
      'changelog.mode' = 'append',
      'connector' = 'confluent',
      'kafka.cleanup-policy' = 'delete',
      'kafka.max-message-size' = '2097164 bytes',
      'kafka.retention.size' = '0 bytes',
      'kafka.retention.time' = '7 d',
      'key.format' = 'raw',
      'scan.bounded.mode' = 'unbounded',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = '[VALUE_FORMAT]'
    )
    
  4. Query the customer-events table to display both purchase and pageview events.

    SELECT * FROM `customer-events`;
    

    Your output should resemble:

    key Purchase Pageview
    x’’ (apple, 9.99, u-21) NULL
    x’’ (jam, 4.29, u-67) NULL
    x’’ (mango, 13.99, u-67) NULL
    x’’ (socks, 7.99, u-123) NULL
    x’’ NULL (https://www.confluent.io, TRUE, u-67)
    x’’ NULL (http://www.cflt.io, FALSE, u-12)

To unlock Confluent Cloud data containing events of multiple types with Flink, see Flink SQL Examples with Schema References.