Important

Data Lineage is currently available as a preview feature. A preview feature is a component of Confluent Cloud that is being introduced to gain early feedback from a limited group of early adopters. This feature can be used for evaluation and non-production testing purposes or to provide feedback to Confluent.

Data Lineage Preview

Confluent Platform is incrementally adding Data Governance capabilities to Confluent Cloud, starting with a limited preview of Data Lineage.

Data lineage in Kafka covers the lifecycle of data, from its origins, through what happens when it is processed by different consumers. Data lineage is particularly valuable in complex, high volume data environments, like Kafka. It is a key element of Data Governance, providing an understanding of how each component uses and transforms the data as it travels from source to destination.

First Look

In this limited preview, Data Lineage in Confluent Platform is represented visually to show the movement of data from source to destination, and how it is transformed as it moves.

To view the Data Lineage UIs:

  1. Make sure you are signed up for the preview.
  2. Log on to Confluent Cloud.
  3. Select an environment.
  4. Select a cluster.
  5. Select a topic.
  6. Click on the Data Lineage icon (lower right) image_reference from inside the topic page.
  7. The data lineage for that topic is shown.
../_images/dg-dl-overview.png

Tip

The data lineage shown in this example is the result of setting up a data pipeline based on several ksqlDB query streams. If you haven’t set up a data pipleline yet, your lineage view may only show a single, lonely event node.

To get an interesting lineage like the one shown above, take a spin through the tutorial in the next section!

Tutorial

In order to really see Data Lineage in action, you need to configure topics, associated schemas, and data sources to create a data pipeline. Once you have events flowing to one or more topics, you can use Data Lineage to inspect the sources and traversal of events through the pipeline.

  1. Add an environment or select an existing one.

  2. Add a cluster or select an existing one on which to run the demo.

  3. Enable a Schema Registry by navigating to the schemas page for your cluster and follow the prompts to choose a region and cloud provider.

    ../_images/dg-dl-sr-create.png
  4. Generate and save the Schema Registry API key and secret for this Schema Registry. (Save the key to use later on step 10 of this procedure.)

  5. Create a topic named stocks.

  6. Add the Connect Datagen source connector to generate sample data to the stocks topic, using these settings:

    • Name: StockSourceConnector
    • Which topic do you want to send data to?: stocks
    • Output message format: AVRO
    • Quickstart: STOCK_TRADES
    • Max interval between messages: 1000
    • Number of tasks for this connector: 1

    You’ll also need to generate and save an API key and secret for this cluster, if you have not done so already.

    ../_images/dg-dl-datagen-setup.png
  7. Click Confirm, review the settings for the connector, and click Launch to start sending data to the target topic.

    The connector first shows as Provisioning, then Running when it is fully initiated.

Create a ksqlDB app

  1. Navigate to ksqlDB

  2. Click Add application

  3. Provide an application name, and click Launch.

    Provisioning will take some time. In the meantime, you can move on to the next step of creating ACLs.

Create ACLs for your ksqlDB app

Create ACLs for your ksqlDB app (skip this step if you have done this previously for this app).

  1. Log on to the Confluent Cloud CLI. (Provide username and password at prompts.)

    ccloud login --url https://confluent.cloud
    
  2. List the environments to get the environment ID.

    ccloud environment list
    
  3. Select the environment you’ve been using for this demo.

    ccloud environment use <ENVIRONMENT_ID>
    
  4. List the clusters to get the right cluster ID.

    ccloud cluster list
    
  5. Select the cluster you’ve been using for this demo.

    ccloud kafka cluster use <KAFKA_CLUSTER_ID>
    
  6. List the ksqlDB apps to get the ID for your app.

    ccloud ksql app list
    
  7. Run this command to get the service account ID.

    ccloud ksql app configure-acls <KSQL_APP_ID> * --cluster <KAFKA_CLUSTER_ID> --dry-run
    
  8. Copy the service account ID (after User:<SERVICE_ACCOUNT_ID> in the output).

  9. Allow READ access on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation READ --topic  '*'
    
  10. Allow WRITE access on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation WRITE --topic  '*'
    
  11. Allow CREATE access on the ksql app for your service account ID.

    ccloud kafka acl create --allow  --service-account  <SERVICE_ACCOUNT_ID> --operation CREATE --topic  '*'
    

Verify your ksqlDB app is running

Return to the list of ksqlDB apps on the Confluent Cloud UI.

Your ksqlDB app should have completed Provisioning, and show a status of Up.

../_images/dg-dl-ksql-app-up.png

Create persistent streams in ksqlDB to filter on stock prices

Navigate to the ksqlDB Editor to create the following persistent streams.

Specify each query statement in the Editor and click Run query to start the query. You can click the Streams tab to view a list of running queries.

  1. Create a persistent stream that filters on stocks with price <= 100.

    You’ll need to specify and run three separate queries for this step. You start by creating the stocks, stream, then add the filters to find and list stocks under $100. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks WITH (KAFKA_TOPIC = 'stocks', VALUE_FORMAT = 'AVRO');
    
    CREATE STREAM stocks_under_100 WITH (KAFKA_TOPIC='stocks_under_100', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE (price <= 100);
    
    SELECT * FROM stocks_under_100 EMIT CHANGES;
    

    When you have these running, click the Streams tab. You should have two new streams, stocks and stocks_under_100. (The last statement ran filters on stocks_under_100.)

  2. Create a persistent stream that filters on stocks to BUY.

    You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks_buy WITH (KAFKA_TOPIC='stocks_buy', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='BUY';
    
    SELECT * FROM stocks_buy EMIT CHANGES;
    
  3. Create a persistent stream that filters on stocks to SELL.

    You’ll need to specify and run two separate queries for this step. After each of these, click Run query, then clear the editor to specify the next statement.

    CREATE STREAM stocks_sell WITH (KAFKA_TOPIC='stocks_sell', PARTITIONS=10, REPLICAS=3) AS SELECT * FROM stocks WHERE side='SELL';
    
    SELECT * FROM stocks_sell EMIT CHANGES;
    

When you have completed these steps, click the ksqlDB > Streams tab. You should have four persistent ksqlDB query streams:

  • stocks
  • stocks_buy
  • stocks_sell
  • stocks_under_100
../_images/dg-dl-ksqldb-streams-all.png

These streams will have associated topics and schemas listed on those pages, respectively.

../_images/dg-dl-topics.png

Consume events from the “stocks” topic

Now, set up a consumer using the Confluent Cloud CLI to consume events from your stocks topic.

  1. Log on to the Confluent Cloud CLI, if needed. (Provide username and password at prompts.)

    ccloud login --url https://confluent.cloud
    
  2. List the environments to verify you are on the environment.

    ccloud environment list
    
  3. If needed, re-select the environment you’ve been using for this demo.

    ccloud environment use <ENVIRONMENT_ID>
    
  4. List the clusters to verify you are on the right cluster.

    ccloud cluster list
    
  5. If needed, re-select the cluster you’ve been using for this demo.

    ccloud kafka cluster use <KAFKA_CLUSTER_ID>
    
  6. Create Kafka API credentials for the consumer.

    Create an API key.

    ccloud api-key create --resource <KAFKA_CLUSTER_ID>
    

    Use the API key.

    ccloud api-key use <API_KEY> --resource <KAFKA_CLUSTER_ID>
    

    Alternatively, you can store the key.

    ccloud api-key store --resource  <KAFKA_CLUSTER_ID>
    
  7. Run a CLI consumer.

    ccloud kafka topic consume stocks_buy --value-format avro --group buy_group
    
  8. When prompted, provide the Schema Registry API key you generated in the first steps.

    You should see the consumer data being generated to the consumer at the command line, for example:

    Vickys-MacBook-Pro:~ vicky$ ccloud kafka topic consume stocks_buy --value-format avro --group buy_group
    Enter your Schema Registry API key: O2XTNUTKJOHZJBJW
    Enter your Schema Registry API secret: ****************************************************************
    Starting Kafka Consumer. ^C or ^D to exit
    {"SIDE":{"string":"BUY"},"QUANTITY":{"int":959},"SYMBOL":{"string":"ZVZZT"},"PRICE":{"int":704},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_8"}}
    {"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_1"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":1838},"SYMBOL":{"string":"ZWZZT"},"PRICE":{"int":405}}
    {"QUANTITY":{"int":2163},"SYMBOL":{"string":"ZTEST"},"PRICE":{"int":78},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_8"},"SIDE":{"string":"BUY"}}
    {"PRICE":{"int":165},"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_2"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":4675},"SYMBOL":{"string":"ZJZZT"}}
    {"QUANTITY":{"int":1702},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":82},"ACCOUNT":{"string":"XYZ789"},"USERID":{"string":"User_7"},"SIDE":{"string":"BUY"}}
    {"ACCOUNT":{"string":"LMN456"},"USERID":{"string":"User_9"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":2982},"SYMBOL":{"string":"ZVV"},"PRICE":{"int":643}}
    {"SIDE":{"string":"BUY"},"QUANTITY":{"int":3687},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":514},"ACCOUNT":{"string":"ABC123"},"USERID":{"string":"User_5"}}
    {"USERID":{"string":"User_5"},"SIDE":{"string":"BUY"},"QUANTITY":{"int":289},"SYMBOL":{"string":"ZJZZT"},"PRICE":{"int":465},"ACCOUNT":{"string":"XYZ789"}}
    ...
    

Explore the data lineage for the pipeline

  1. Search for stocks topic on the search box.

    ../_images/dg-dl-search-topic.png
  2. Click the Data Lineage icon (lower right) image_reference from inside the stocks topic page.

    ../_images/dg-dl-on-topic.png

    The data lineage for the stocks topic is shown.

    ../_images/dg-dl-overview.png
  3. Hover on a node for a high level description of the data source.

    ../_images/dg-dl-on-topic-hover.png
  4. Click on a node to drill down.

    ../_images/dg-dl-on-topic-drilldown.png

You can find the lineage icon on the topics, connectors, ksqlDB and consumer pages.

However for now, only when you are inside the page for a specific topic will you see the lineage graph automatically display when you click on the lineage icon.

For the other pages you will be asked to choose a topic from the left side panel.

More about Data Lineage

The main needs for data lineage in Kafka are:

  • Understand how data in Kafka is transformed from source to destination
  • Provide audit trails of data in Kafka (for example, provide data flows for data privacy impact assessments in GDPR)
  • Perform impact analysis in Kafka (for example, if I remove this ksqlDB query what impact will that have downstream)
  • Audit data movements in Kafka (for example, are all these consumers still relevant)
  • Allow developers to discover data in Kafka
  • Satisfy regulatory requirements for compliance with data privacy regulations, such as the General Data Protection Regulation (GDPR) in the EU and the Basel Committee on Banking Supervision Rule 239 (BCBS 239), designed to improve risk data aggregation and reporting.

More about Data Governance

Data Governance (DG) is the overall management of the quality, availability, usability, integrity, and security of data used throughout your organization. It provides a framework to manage information as an asset and guide data management activities across the organization. An effective data governance solution integrated with Kafka can provide discoverability, tracking, and security of your data throughout its lifecycle.

At its core, a data governance framework encompasses three key areas:

  • People - Put together a team that manages all aspects of your organization’s data, with clearly defined roles and responsibilities.
  • Processes - Define a process for how data will be controlled, audited, and monitored. Some key governance processes include data quality, security and risk management, reference and master data management, and data standards and definitions, and so forth.
  • Technology - Technology alone is not a silver bullet for Data Governance, but does help enable people, streamline processes, and make sure organizations can make data-driven business decisions. Data management technologies can include things like verification, standardization, monitoring, collaboration, reporting, and identity resolution tools, just to name a few.

In other words, a Data Governance framework assigns ownership and responsibility for data, defines the processes for managing data, and leverages technologies that will help enable people and processes.

  • Clear ownership of data
  • Data accessible across the organization with proper controls
  • Data properly protected
  • Good quality data fit for consumption
  • Data knowledge disseminated across the organization

Confluent Platform is incrementally adding Data Governance capabilities to Confluent Cloud as the technology part of the solution in this framework.