CREATE CONNECTION Statement in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports creating secure connections to external services and data sources. You can use these connections in your Flink statements.

Connections are resources that you define to configure parameters needed for connecting to third-party services. Connections include endpoint and authentication information. They provide a way to handle sensitive information such as credentials while ensuring security.

Connections are essential for secure communications in Confluent AI and Flink UDFs to make secure calls to external services. For more information, see Reuse Confluent Cloud Connections With External Services.

A connection has its own lifecycle and can be created, managed, updated, or deleted by users with appropriate permissions. For more information, see Manage Connections.

Confluent Cloud for Apache Flink makes a best-effort attempt to redact sensitive values from the CREATE CONNECTION and ALTER CONNECTION statements by masking the values for the known sensitive keys. In Confluent Cloud Console, the sensitive values are redacted in the Flink SQL workspace if you navigate away from the workspace and return, or if you reload the page in the browser. Alternatively, you can use the Confluent CLI commands to create and manage connections.

In addition, if syntax in the CREATE CONNECTION statement is incorrect, Confluent Cloud for Apache Flink may not detect the secrets. For example, if you type CREATE CONNECTION my_conn WITH ('ap-key' = 'x'), Flink won’t redact the x, because api-key is misspelled.

Note

Connection resources are an Open Preview feature in Confluent Cloud.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

Syntax

CREATE CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name
[COMMENT connection_comment]
WITH (
    'type' = '<connection-type>',
    'endpoint' = '<endpoint-url>',
    ['sse-endpoint' = '<sse-endpoint-url>'],
    ['api-key' = 'api_key'] |
    ['username' = 'user_name', 'password' = 'user_password'] |
    ['aws-access-key' = '<aws-access-key-id>', 'aws-secret-key' = '<aws-secret-access-key>', 'aws-session-token' = '<aws-session-token>'] |
);

Description

Create a new secure connection to an external service or data source.

Change the authorization settings of an existing connection by using the ALTER CONNECTION statement.

To remove a connection from the current database, use the DROP CONNECTION statement.

Confluent Cloud for Apache Flink supports these authentication methods:

  • Basic: username and password. The credentials are added to the HTTP request as a BASIC header.

  • Bearer: token. The credentials are added to the HTTP request as a BEARER header.

  • OAuth: token-endpoint, client-id, client-secret, and scope. The provided options are used to retrieve the OAuth token from the token endpoint and add the token to the HTTP request as a BEARER token.

Connection types

The following connection types are supported:

Connection type reference

All connections require two standard parameters:

  • type: The connection type (case-insensitive)

  • endpoint: The service endpoint URL

Beyond these standard parameters, each connection type has specific authentication and configuration requirements.

Amazon Bedrock

  • Type: bedrock

  • Required Parameters:

    • aws-access-key: AWS Access Key ID

    • aws-secret-key: AWS Secret Access Key

  • Optional Parameters:

    • aws-session-token: AWS Session Token (for temporary credentials)

  • Example:

    CREATE CONNECTION `my_bedrock_connection`
      WITH (
        'type' = 'bedrock',
        'endpoint' = 'https://bedrock-runtime.us-east-1.amazonaws.com/model/my-model/invoke',
        'aws-access-key' = '<aws-access-key-id>',
        'aws-secret-key' = '<aws-secret-access-key>',
        'aws-session-token' = '<aws-session-token>'
      );
    

Amazon SageMaker

  • Type: sagemaker

  • Required Parameters:

    • aws-access-key: AWS Access Key ID

    • aws-secret-key: AWS Secret Access Key

  • Optional Parameters:

    • aws-session-token: AWS Session Token (for temporary credentials)

  • Example:

    CREATE CONNECTION `my_sagemaker_connection`
      WITH (
        'type' = 'sagemaker',
        'endpoint' = 'https://runtime.sagemaker.us-east-1.amazonaws.com',
        'aws-access-key' = '<aws-access-key-id>',
        'aws-secret-key' = '<aws-secret-access-key>'
      );
    

Azure ML

  • Type: azureml

  • Required Parameters:

    • api-key: Your Azure ML API key

  • Examples:

    -- Azure ML connection with API key
    CREATE CONNECTION `my_azureml_connection`
      WITH (
        'type' = 'azureml',
        'endpoint' = 'https://myworkspace.myregion.inference.ml.azure.com/test',
        'api-key' = '<your-api-key>'
      );
    
    -- Azure ML connection with comment
    CREATE CONNECTION `my_azureml_connection`
      COMMENT 'Connection Comment'
      WITH (
        'type' = 'azureml',
        'endpoint' = 'https://myworkspace.myregion.inference.ml.azure.com/test',
        'api-key' = '<your-api-key>'
      );
    

Azure OpenAI

  • Type: azureopenai

  • Required Parameters:

    • api-key: Your Azure OpenAI API key

  • Example:

    CREATE CONNECTION `my_azureopenai_connection`
      WITH (
        'type' = 'azureopenai',
        'endpoint' = 'https://your-resource.openai.azure.com/',
        'api-key' = '<your-azure-openai-api-key>'
      );
    

Couchbase

  • Type: couchbase

  • Required Parameters:

    • username: Couchbase username

    • password: Couchbase password

  • Example:

    CREATE CONNECTION `my_couchbase_connection`
      WITH (
        'type' = 'couchbase',
        'endpoint' = 'couchbases://my-cluster.cloud.couchbase.com',
        'username' = '<user-name>',
        'password' = '<password>'
      );
    

Confluent JDBC

  • Type: confluent_jdbc

  • Required Parameters:

    • username: Database username

    • password: Database password

  • Example: See Confluent JDBC section.

Elasticsearch

  • Type: elastic

  • Required Parameters (choose one):

    • API key authentication: api-key (for Elasticsearch Service)

    • Basic authentication: username and password (for self-hosted)

  • Examples:

    -- Elasticsearch connection with API key (for Elasticsearch Service)
    CREATE CONNECTION `my_elastic_connection`
      WITH (
        'type' = 'elastic',
        'endpoint' = '<elasticsearch-endpoint>',
        'api-key' = '<elastic-api-key>'
      );
    
    -- Elasticsearch connection with basic authentication (for self-hosted)
    CREATE CONNECTION `my_elastic_connection`
      WITH (
        'type' = 'elastic',
        'endpoint' = '<elasticsearch-endpoint>',
        'username' = '<elastic-username>',
        'password' = '<elastic-password>'
      );
    

Google AI

  • Type: googleai

  • Required Parameters:

    • api-key: Your Google AI API key

  • Example:

    CREATE CONNECTION `my_googleai_connection`
      WITH (
        'type' = 'googleai',
        'endpoint' = 'https://generativelanguage.googleapis.com',
        'api-key' = '<your-google-ai-api-key>'
      );
    

MCP Server

  • Type: mcp_server

  • Supported Authentication Methods (choose one):

    • API Key: api-key

    • Bearer Token: token

    • Basic: username and password

    • OAuth2: token-endpoint, client-id, client-secret, and scope

  • Optional Parameters:

    • transport-type: Set to STREAMABLE_HTTP for streamable HTTP transport

  • Examples:

    -- MCP server connection with OAuth
    CREATE CONNECTION `my_mcp_connection`
      WITH (
        'type' = 'mcp_server',
        'endpoint' = 'https://mymcp.connection.com',
        'token-endpoint' = '<token-endpoint>',
        'client-id' = '<client-id>',
        'client-secret' = '<client-secret>',
        'scope' = '<scope>'
      );
    
    -- MCP server connection with streamable HTTP
    CREATE CONNECTION `linear_mcp_connection`
       WITH (
         'type' = 'mcp_server',
         'endpoint' = 'https://mcp.linear.app/mcp',
         'token' = '<LINEAR_API_KEY>',
         'transport-type' = 'STREAMABLE_HTTP'
       );
    

MongoDB

OpenAI

  • Type: openai

  • Required Parameters:

    • api-key: Your OpenAI API key

  • Example:

    CREATE CONNECTION `my_openai_connection`
      WITH (
        'type' = 'openai',
        'endpoint' = 'https://api.openai.com/v1',
        'api-key' = '<your-api-key>'
      );
    

Pinecone

  • Type: pinecone

  • Required Parameters:

    • api-key: Pinecone API key

  • Example:

    CREATE CONNECTION `my_pinecone_connection`
      WITH (
        'type' = 'pinecone',
        'endpoint' = 'https://your-index-project.svc.environment.pinecone.io',
        'api-key' = '<your-pinecone-api-key>'
      );
    

REST

  • Type: rest

  • Supported Authentication Methods (choose one):

    • API Key: api-key

    • Bearer Token: token

    • Basic: username and password

    • OAuth2: token-endpoint, client-id, client-secret, and scope

  • Examples:

    -- REST connection with API key
    CREATE CONNECTION `my_rest_api_key_connection`
      WITH (
        'type' = 'rest',
        'endpoint' = 'https://api.example.com/v1',
        'api-key' = '<your-api-key>'
      );
    
    -- REST connection with bearer token
    CREATE CONNECTION `my_rest_bearer_connection`
      WITH (
        'type' = 'rest',
        'endpoint' = 'https://api.example.com/v1',
        'token' = '<your-bearer-token>'
      );
    
    -- REST connection with basic authentication
    CREATE CONNECTION `my_rest_basic_connection`
      WITH (
        'type' = 'rest',
        'endpoint' = 'https://api.example.com/v1',
        'username' = '<your-username>',
        'password' = '<your-password>'
      );
    
    -- REST connection with OAuth2
    CREATE CONNECTION `my_rest_oauth_connection`
      WITH (
        'type' = 'rest',
        'endpoint' = 'https://api.example.com/v1',
        'token-endpoint' = 'https://auth.example.com/oauth/token',
        'client-id' = '<your-client-id>',
        'client-secret' = '<your-client-secret>',
        'scope' = 'read write'
      );
    

Vertex AI

  • Type: vertexai

  • Required Parameters:

    • service-key: Google Cloud service account key (JSON format)

  • Example:

    CREATE CONNECTION `my_vertexai_connection`
      WITH (
        'type' = 'vertexai',
        'endpoint' = 'https://us-central1-aiplatform.googleapis.com',
        'service-key' = '{"type": "service_account", "project_id": "your-project", ...}'
      );
    

Authorization

Depending on the connection type, the following authorization methods are supported:

  • API key: azureml, azureopenai, elastic, googleai, mcp_server, openai, pinecone, rest

  • basic: elastic, mongodb, couchbase, confluent_jdbc, mcp_server, or rest

  • bearer: rest or mcp_server connections

  • oauth: rest or mcp_server connections

Note

For REST and MCP_SERVER connection types, you must provide exactly one authentication method. Mixing authentication methods (e.g., both api-key and username/password) results in an error.

Secrets are extracted to the secret store and aren’t displayed in subsequent DESCRIBE CONNECTION statements, the Flink SQL shell, or the Confluent Cloud Console.

The maximum secret length is 4000 bytes, which is checked after the string is converted to bytes.

MongoDB external table

-- Create a MongoDB connection with basic authorization.
CREATE CONNECTION `my_mongodb_connection`
  WITH (
    'type' = 'MONGODB',
    'endpoint' = 'mongodb+srv://myCluster.mongodb.net/myDatabase',
    'username' = '<atlas-user-name>',
    'password' = '<atlas-password>'
  );

-- Use the MongoDB connection to create a MongoDB external table.
CREATE TABLE mongodb_movies_full_text_search (
    title STRING,
    plot STRING
) WITH (
    'connector' = 'mongodb',
    'mongodb.connection' = 'my_mongodb_connection',
    'mongodb.database' = 'sample_mflix',
    'mongodb.collection' = 'movies',
    'mongodb.index' = 'default'
);