Troubleshooting a pipeline in Stream Designer

When you create pipelines in Stream Designer, you may encounter various errors. The following suggestions can help to resolve error conditions when they occur.

Known limitations

  • When you import source code, you must tag existing topics manually. The Stream Designer backend can’t determine if any of the topics in the source code exist currently. However, an existing topic shows a warning that a topic with this name exists already.
  • When you import source code, origin streams or tables must have schemas defined.
  • Activating connectors may take a few minutes, which can cause downstream components to fail to activate. Once the connector is in the Running state, click Reactivate to activate downstream components.
  • You can use Stream Designer only on Kafka clusters that are publicly available. You can’t use Stream Designer on Kafka clusters that use private networking (private link or VPC Peering).

Pipelines and RBAC

Before you can activate a pipeline, you must grant it privileges to create and manage Confluent Cloud resources. A pipeline is an RBAC resource that must be granted privileges before it can perform operations on backend systems. A pipeline without privileges can’t communicate with the backend systems and can only be used to design pipelines on canvas. Until you click Grant pipeline privileges at the top-right corner of the Stream Designer page, you can only draft the pipeline and won’t be able to activate it.

Note

Only users that have the OrganizationAdmin role can grant privileges to a pipeline.

For more information, see Role-Based Access Control for pipelines.

Queries

When you configure queries, the following requirements must be met before you can activate the pipeline.

  • All query components must be configured before you can activate the pipeline. If there are unconfigured components, the activate button is disabled.
  • All query components must have an output table or stream, which is backed by a Kafka topic.

If you see a validation error, click the component to see what you must fix. In the following example pipeline, an unconfigured Query component blocks activation because the filter must produce to a stream, table, or another transformation.

Stream Designer showing an error message for configuring a filter in Confluent Cloud Console

Editing source code

Stream Designer enables write source code directly into an editor, but there are known limitations.

Unsupported statements

Stream Designer translates the code into a pipeline model that’s displayed on the canvas. The translation from source code may not be fully accurate, because there are some ksqlDB features that aren’t completely represented in the canvas model, which means that some uploaded source code may be dropped when Stream Designer imports the code.

For example, Stream Designer doesn’t support the INSERT INTO statement, so any attempt to import scripts that have this statement results in the statement being dropped.

Schema declarations

Imported source code must have schema declarations on all origin streams and tables. For example, the following statement fails because the all_trades stream has no schema declaration.

CREATE OR REPLACE STREAM "all_trades" WITH (kafka_topic='trades_source', partitions=1, value_format='JSON_SR');

If you attempt to import the previous statement, Stream Designer displays the error, “The statement does not declare any columns.”

To fix this error, provide schemas for all origin streams and tables. In this example, update the all_trades stream with column definintions.

CREATE OR REPLACE STREAM "all_trades" (
  side varchar,
  quantity integer,
  symbol varchar,
  price integer,
  account varchar,
  userid varchar
) WITH (kafka_topic='trades_source', partitions=1, value_format='JSON_SR');

Connector security

For security, when you export SQL that has CREATE CONNECTOR statements, the Kafka API key and secret are redacted. The following example shows the exported code for a Datagen source connector.

 CREATE SOURCE CONNECTOR "Datagen_users" WITH (
  "connector.class"='DatagenSource',
  "kafka.api.key"='*****************',
  "kafka.api.secret"='*****************',
  "kafka.auth.mode"='KAFKA_API_KEY',
  "kafka.topic"='users_topic',
  "output.data.format"='JSON_SR',
  "quickstart"='USERS',
  "tasks.max"='1'
);

Before you import SQL that has CREATE CONNECTOR statements, you must manually edit the code to include the corresponding Kafka API key and secret. Without valid credentials, connectors will fail to activate.

Imported topics

When you create a topic on the canvas, you have the option of choosing a topic that exists already. Existing topics aren’t managed by Stream Designer, and any activation request on the pipeline skips operations on the existing topic.

However, when you import source code, existing topics aren’t marked as existing and instead show the warning message, “Topic already exists and will be reused when activated.”

Stream Designer showing an error message for importing a topic in Confluent Cloud Console

If you want to reuse the existing topic, click Choose an existing topic instead and pick the desired topic.

Troubleshooting pipeline activation

Before you activate a pipeline, ensure that there are no validation errors. Stream Designer won’t allow activation until all errors are fixed.

Activating connectors

When activating components in a pipeline, Stream Designer follows this order:

  1. Activate all topics.
  2. Activate all source connectors.
  3. Activate all queries.
  4. Activate all sink connectors.

Schema errors

A source connector may take multiple minutes to provision, which may cause the overall activation request to timeout. Some activation requests for pipelines that contain connectors will timeout with errors on failing to activate. The errors appear on stream and table nodes downstream of the connector, with the “Did not find any value schema on the topic”.

Once the connectors activate successfully, click Activate again to try provisioning the downstream components. Eventually, the entire pipeline will activate.

Note

Reactivating has no effect on components that are activated currently.

Connector errors

For errors on a connector node, click the connector component to open the configuration page and view the error. For example, the following error occurs when a connector is imported from code without a Kafka API key and secret.

Stream Designer showing an error message for configuring a connector in Confluent Cloud Console

The error message provides the Connect cluster ID (lcc-1a2b3c) that you can use to debug the issue behind the activation failure.