Important

Data Lineage is currently available on Confluent Cloud in an Early Access Program to a very limited set of early adopters. An early access feature is a component of Confluent Cloud introduced to gain feedback. This feature should be used only for evaluation and non-production testing purposes or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. If you would like to participate in the Early Access Program, email data-lineage-preview@confluent.io.

Early Access Program features are intended for evaluation use in development and testing environments only, and not for production use. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Early Access Program features. Early Access Program features are considered to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access Program features at any time in Confluent’s sole discretion.

Data Lineage (Early Access)

Confluent is incrementally adding Data Governance capabilities to Confluent Cloud, starting with an Early Access Program for Data Lineage.

Data lineage in Confluent covers the lifecycle of data, from its origins, through what happens when it is processed by different consumers. Data lineage is particularly valuable in complex, high volume data environments, like Confluent and Kafka. It is a key element of Data Governance, providing an understanding of how each component uses and transforms the data as it travels from source to destination.

The main needs for data lineage are:

  • Understand how data in Confluent is transformed from source to destination
  • Provide audit trails of data (for example, provide data flows for data privacy impact assessments in GDPR)
  • Perform impact analysis (for example, if I remove this ksqlDB query what impact will that have downstream)
  • Audit data movements (for example, are all these consumers still relevant?)
  • Allow developers to discover data
  • Satisfy regulatory requirements for compliance with data privacy regulations, such as the General Data Protection Regulation (GDPR) in the EU and the Basel Committee on Banking Supervision Rule 239 (BCBS 239), designed to improve risk data aggregation and reporting.

First Look

In this Early Access version, Data Lineage in Confluent Cloud is represented visually to show the movement of data from source to destination, and how it is transformed as it moves.

To view the Data Lineage UIs:

  1. Make sure you are signed up for the preview.
  2. Log on to Confluent Cloud.
  3. Select an environment.
  4. Select a cluster.
  5. Select a topic.
  6. Click on the Data Lineage card from inside the topic page.
  7. The data lineage for that topic is shown.
../_images/dg-dl-overview.png

Tip

The data lineage shown in this example is the result of setting up a data pipeline based on several ksqlDB query streams. If you haven’t set up a data pipleline yet, your lineage view may only show a single, lonely event node.

To get an interesting lineage like the one shown above, take a spin through the tutorial in the next section!

Tutorial

In order to really see Data Lineage in action, you need to configure topics, producers, and consumers to create a data pipeline. Once you have events flowing into your pipeline, you can use Data Lineage to inspect where data is coming from, what transformations are applied to it, and where it’s going.

Select an environment, cluster, and Schema Registry

  1. Add an environment or select an existing one.

  2. Add a cluster or select an existing one on which to run the demo.

    If you create a new cluster:

    • You must select a cluster type. For Data Lineage in Early Access, you can choose any cluster type.
    • Choose a cloud provider and region.
    • Click Continue to review configuration and costs, usage limits, and uptime service level agreement (SLA)

    Then click Launch Cluster

  3. Enable a Schema Registry (if not already enabled) by navigating to the schemas page for your cluster and follow the prompts to choose a region and cloud provider.

    ../_images/dg-dl-sr-create.png
  4. The Schema Registry settings and information will be available on the Schema Registry tab for the environment.

    ../_images/dg-dl-cluster-settings.png
  5. Generate and save the Schema Registry API key and secret for this Schema Registry. (Save the key to use later on step 10 of this procedure.)

Tip

If you need help with these initial steps, see Quick Start for Apache Kafka using Confluent Cloud.

Create the “stocks” topic and generate data

  1. (Optional) Create a topic named stocks.

    Tip

    • This step is optional because adding the Datagen connector (as described in next steps) will automatically create the stocks topic if it does not exist.
    • To learn more about manually creating topics and working with them, see Managing Topics in Confluent Cloud.
  2. Choose Connectors from the menu and select the Datagen source connector.

  3. Add the Connect Datagen source connector to generate sample data to the stocks topic, using these settings:

    • Name: StockSourceConnector
    • Which topic do you want to send data to?: stocks
    • Output message format: AVRO
    • Quickstart: STOCK_TRADES
    • Max interval between messages: 1000
    • Number of tasks for this connector: 1

    You’ll also need to generate and save an API key and secret for this cluster, if you have not done so already.

    ../_images/dg-dl-datagen-setup.png
  4. Click Next, review the settings for the connector, and click Launch to start sending data to the target topic.

    The connector first shows as Provisioning, then Running when it is fully initiated.

Create a ksqlDB app

  1. Navigate to ksqlDB
  2. Click Create application myself.
  3. Select Global access and click Continue.
  4. Provide an application name, such as ksqlDB_stocks_app, and accept the defaults for the number of streaming units.
  5. Click Launch application.

Tip

  • Provisioning will take some time. In some cases, it can take up to an hour.
  • By creating the ksqlDB app with global access, you avoid having to create specific ACLs for the app itself. With global access, the ksqlDB cluster is running with the same level of access to Kafka as the user who provisions ksqlDB. If you are interested in learning how to manage ACLs on a ksqlDB cluster with granular access, see Appendix A: Creating a ksqlDB app with granular access and assigning ACLs.

Verify your ksqlDB app is running

Return to the list of ksqlDB apps on the Confluent Cloud UI.

Your ksqlDB app should have completed Provisioning, and show a status of Up.

../_images/dg-dl-ksql-app-up.png

Create persistent streams in ksqlDB to filter on stock prices

Navigate to the ksqlDB Editor and click into your ksqlDB app, ksqlDB_stocks_app (ksqlDB_stocks_app > Editor), to create the following persistent streams.

Specify each query statement in the Editor and click Run query to start the query. You can click the Streams tab to view a list of running queries.

  1. Create a stream for the stocks topic, then create a persistent stream that filters on stocks with price <= 100. This feed the results to the stocks_under_100 topic.

    You’ll need to specify and run three separate queries for this step. You start by creating the stocks stream, then add the filters to find and list stocks under $100. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks WITH (KAFKA_TOPIC = 'stocks', VALUE_FORMAT = 'AVRO');
    
    CREATE STREAM stocks_under_100 WITH (KAFKA_TOPIC='stocks_under_100', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE (price <= 100);
    
    SELECT * FROM stocks_under_100 EMIT CHANGES;
    

    When you have these running, click the Streams tab. You should have two new streams, STOCKS and STOCKS_UNDER_100. (The last statement is a transient query on the stream, STOCKS_UNDER_100, to get some data onto the UI.)

  2. Create a persistent stream that filters on stocks to BUY, and feed the results to the stocks_buy topic.

    You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks_buy WITH (KAFKA_TOPIC='stocks_buy', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='BUY';
    
    SELECT * FROM stocks_buy EMIT CHANGES;
    
  3. Create a persistent stream that filters on stocks to SELL.

    You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks_sell WITH (KAFKA_TOPIC='stocks_sell', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='SELL';
    
    SELECT * FROM stocks_sell EMIT CHANGES;
    

When you have completed these steps, click the ksqlDB > Streams tab. You should have four persistent ksqlDB query streams:

  • STOCKS
  • STOCKS_BUY
  • STOCKS_SELL
  • STOCKS_UNDER_100
../_images/dg-dl-ksqldb-streams-all.png

These streams will have associated topics and schemas listed on those pages, respectively.

../_images/dg-dl-topics.png

Consume events from the “stocks” topic

Now, set up a consumer using the Confluent Cloud CLI to consume events from your stocks topic.

  1. Log on to the Confluent Cloud CLI. (Provide username and password at prompts.)

    ccloud login --url https://confluent.cloud
    
  2. List the environments to verify you are on the environment.

    ccloud environment list
    
  3. If needed, re-select the environment you’ve been using for this demo.

    ccloud environment use <ENVIRONMENT_ID>
    
  4. List the clusters to verify you are on the right cluster.

    ccloud kafka cluster list
    
  5. If needed, re-select the cluster you’ve been using for this demo.

    ccloud kafka cluster use <KAFKA_CLUSTER_ID>
    
  6. Create Kafka API credentials for the consumer.

    Create an API key.

    ccloud api-key create --resource <KAFKA_CLUSTER_ID>
    

    Use the API key.

    ccloud api-key use <API_KEY> --resource <KAFKA_CLUSTER_ID>
    

    Alternatively, you can store the key.

    ccloud api-key store --resource  <KAFKA_CLUSTER_ID>
    
  7. Run a CLI consumer.

    ccloud kafka topic consume stocks_buy --value-format avro --group buy_group
    
  8. When prompted, provide the Schema Registry API key you generated in the first steps.

    You should see the consumer data being generated to the consumer at the command line, for example:

    Vickys-MacBook-Pro:~ vicky$ ccloud kafka topic consume stocks_buy --value-format avro --group buy_group
    Enter your Schema Registry API key: O2XTNUTKJOHZJBJW
    Enter your Schema Registry API secret: ****************************************************************
    Starting Kafka Consumer. ^C or ^D to exit
    {"SIDE":{"string":"BUY"},"QUANTITY":{"int":959},"SYMBOL":{"string":"ZVZZT"},"PRICE":{"int":704},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_8"}}
    {"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_1"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":1838},"SYMBOL":{"string":"ZWZZT"},"PRICE":{"int":405}}
    {"QUANTITY":{"int":2163},"SYMBOL":{"string":"ZTEST"},"PRICE":{"int":78},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_8"},"SIDE":{"string":"BUY"}}
    {"PRICE":{"int":165},"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_2"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":4675},"SYMBOL":{"string":"ZJZZT"}}
    {"QUANTITY":{"int":1702},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":82},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_7"},"SIDE":{"string":"BUY"}}
    {"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_9"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":2982},"SYMBOL":{"string":"ZVV"},"PRICE":{"int":643}}
    {"SIDE":{"string":"BUY"},"QUANTITY":{"int":3687},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":514},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_5"}}
    {"USERID":{"string":"User_5"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":289},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":465},"ACCOUNT":{"string":"XYZ789"}}
    ...
    

Explore the data lineage for the pipeline

With the producers and consumers up and running, you can use Data Lineage to visualize and explore the flow of data from the source connector to the STOCKS topic, where queries filter the data on specified limits and generate lists to your three topics: - STOCKS_BUY - STOCKS_SELL - STOCKS_UNDER_100

  1. Search for stocks topic on the search box.

    ../_images/dg-dl-search-topic.png
  2. Click anywhere on the Data Lineage card from inside the stocks topic page.

    ../_images/dg-dl-on-topic.png

    The data lineage for the stocks topic is shown.

    ../_images/dg-dl-overview.png
  3. Hover on a node for a high level description of the data source and throughput.

    ../_images/dg-dl-on-topic-hover.png
  4. Click on a node to inspect.

    ../_images/dg-dl-on-topic-drilldown.png

Try this

  • Click the stocks topic node, and scroll through the message throughput timelines on the Overview tab, then click Edit topic to go directly to the topic.
  • Click the stocks_buy topic node, then click the Schema tab to view its associated schema.
  • Click a query, such as stocks_buy query, and click the Schema tab. This shows you a menu style view of the same schema because the schema associated with the stocks_buy topic is coming from the stocks_buy query.
  • To verify this, click View query to link to the ksqlDB_stocks_app, then click the Flow tab under that app, and click stocks_buy on that diagram. (Note that you also can visualize a data flow particular to that query from directly within the ksqlDB app, but not the combined flows of all queries to all topics, as is shown on Data Lineage.)

Understand Data Nodes

Node Description
node-topic

A topic node shows:

  • Topic name and link to the topic
  • Associated schemas (key and value)
  • Number of partitions
  • Total throughput as a time series line graph
  • Bytes produced and consumed per app, and per partition
node-customApp

A custom application node provides:

  • Name of the application, and its status
  • Total throughput as a time series line graph
  • Bytes produced and consumed per topic
  • Drilldowns to show consumers and producers
node-query

A ksqlDB Query node shows:

  • Query name and link to the query
  • Query type, status, and mode
  • Total throughput as a time series line graph
  • Bytes produced and consumed per topic
  • Drilldowns to show the ksqlDB app data
node-kstream

A ksqlDB persistent query stream app includes:

  • Query name and link to the query
  • Query type, status, and mode
  • Total throughput as a time series line graph
  • Bytes produced and consumed per topic
  • Drilldowns to show individual consumers and messages
  • Drilldowns to show the actual query and the schema created for it
node-connector

A Kafka Connector node shows:

  • Connector name and link to the connector
  • Type of connector (plugin type)
  • Message throughput and lag
  • Total production as a time series line graph
  • Bytes produced per topic as a time series line graph
  • Associated tasks and their statuses
node-cli

A CLI node shows monitoring data on producers and consumers running on the the Confluent Cloud CLI, producing to or reading from a topic on your Confluent Cloud cluster:

  • For consumers, name or consumer group name, bytes in, number of messages read
  • For producers, client ID, bytes out, number of messages sent
  • Total and per topic number of bytes produced or consumed, as time series graphs
  • Drilldowns to show individual consumers, producers, and messages

Appendix A: Creating a ksqlDB app with granular access and assigning ACLs

As an alternative to creating the ksqlDB app with global access, you can create the app with granular access, assign a service account to it, and then create ACLs limited specifically to your ksqlDB app. There may be cases where you want to limit access to the ksqlDB cluster to specific topics or actions.

  1. Navigate to ksqlDB

  2. Click Create application myself.

  3. Select Granular access and click Continue.

  4. Under Create a service account:

    • Select Create a new one (unless you already have an account you want to use).
    • Provide a new service account name and description, such as stocks_trader ksqlDB_stocks_app.
    • Check the box to add required ACLs when the ksqlDB app is created.
  5. Provide access to the stocks topic (this should already be selected), and click Continue.

  6. Create the ACLs for your ksqlDB app as follows (skip this step if you have done this previously for this app).

  7. Log on to the Confluent Cloud CLI. (Provide username and password at prompts.)

    ccloud login --url https://confluent.cloud
    
  8. List the environments to get the environment ID.

    ccloud environment list
    
  9. Select the environment you’ve been using for this demo.

    ccloud environment use <ENVIRONMENT_ID>
    
  10. List the clusters to get the right cluster ID.

    ccloud kafka cluster list
    
  11. Select the cluster you’ve been using for this demo.

    ccloud kafka cluster use <KAFKA_CLUSTER_ID>
    
  12. List the ksqlDB apps to get the ID for your app.

    ccloud ksql app list
    
  13. Run this command to get the service account ID.

    ccloud ksql app configure-acls <KSQL_APP_ID> * --cluster <KAFKA_CLUSTER_ID> --dry-run
    
  14. Copy the service account ID (after User:<SERVICE_ACCOUNT_ID> in the output).

  15. Allow READ access to all topics on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation READ --topic  '*'
    
  16. Allow WRITE access to all topics on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation WRITE --topic  '*'
    
  17. Allow CREATE access for all topics on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation CREATE --topic  '*'