CREATE CONNECTION Statement in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports creating secure connections to external services and data sources. You can use these connections in your Flink statements.
Connections are resources that you define to configure parameters needed for connecting to third-party services. Connections include endpoint and authentication information. They provide a way to handle sensitive information such as credentials while ensuring security.
Connections are essential for secure communications in Confluent AI and Flink UDFs to make secure calls to external services. For more information, see Reuse Confluent Cloud Connections With External Services.
A connection has its own lifecycle and can be created, managed, updated, or deleted by users with appropriate permissions. For more information, see Manage Connections.
Confluent Cloud for Apache Flink makes a best-effort attempt to redact sensitive values from the CREATE CONNECTION and ALTER CONNECTION statements by masking the values for the known sensitive keys. In Confluent Cloud Console, the sensitive values are redacted in the Flink SQL workspace if you navigate away from the workspace and return, or if you reload the page in the browser. Alternatively, you can use the Confluent CLI commands to create and manage connections.
In addition, if syntax in the CREATE CONNECTION statement is incorrect, Confluent Cloud for Apache Flink may not detect the secrets. For example, if you type CREATE CONNECTION my_conn WITH ('ap-key' = 'x'), Flink won’t redact the x, because api-key is misspelled.
Note
Connection resources are an Open Preview feature in Confluent Cloud.
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
Syntax
CREATE CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name
[COMMENT connection_comment]
WITH (
'type' = '<connection-type>',
'endpoint' = '<endpoint-url>',
['sse-endpoint' = '<sse-endpoint-url>'],
['api-key' = 'api_key'] |
['username' = 'user_name', 'password' = 'user_password'] |
['aws-access-key' = '<aws-access-key-id>', 'aws-secret-key' = '<aws-secret-access-key>', 'aws-session-token' = '<aws-session-token>'] |
);
Description
Create a new secure connection to an external service or data source.
Change the authorization settings of an existing connection by using the ALTER CONNECTION statement.
To remove a connection from the current database, use the DROP CONNECTION statement.
Confluent Cloud for Apache Flink supports these authentication methods:
Basic:
usernameandpassword. The credentials are added to the HTTP request as a BASIC header.Bearer:
token. The credentials are added to the HTTP request as a BEARER header.OAuth:
token-endpoint,client-id,client-secret, andscope. The provided options are used to retrieve the OAuth token from the token endpoint and add the token to the HTTP request as a BEARER token.
Connection types
The following connection types are supported:
Connection type reference
All connections require two standard parameters:
type: The connection type (case-insensitive)endpoint: The service endpoint URL
Beyond these standard parameters, each connection type has specific authentication and configuration requirements.
Amazon Bedrock
Type:
bedrockRequired Parameters:
aws-access-key: AWS Access Key IDaws-secret-key: AWS Secret Access Key
Optional Parameters:
aws-session-token: AWS Session Token (for temporary credentials)
Example:
CREATE CONNECTION `my_bedrock_connection` WITH ( 'type' = 'bedrock', 'endpoint' = 'https://bedrock-runtime.us-east-1.amazonaws.com/model/my-model/invoke', 'aws-access-key' = '<aws-access-key-id>', 'aws-secret-key' = '<aws-secret-access-key>', 'aws-session-token' = '<aws-session-token>' );
Amazon SageMaker
Type:
sagemakerRequired Parameters:
aws-access-key: AWS Access Key IDaws-secret-key: AWS Secret Access Key
Optional Parameters:
aws-session-token: AWS Session Token (for temporary credentials)
Example:
CREATE CONNECTION `my_sagemaker_connection` WITH ( 'type' = 'sagemaker', 'endpoint' = 'https://runtime.sagemaker.us-east-1.amazonaws.com', 'aws-access-key' = '<aws-access-key-id>', 'aws-secret-key' = '<aws-secret-access-key>' );
Azure ML
Type:
azuremlRequired Parameters:
api-key: Your Azure ML API key
Examples:
-- Azure ML connection with API key CREATE CONNECTION `my_azureml_connection` WITH ( 'type' = 'azureml', 'endpoint' = 'https://myworkspace.myregion.inference.ml.azure.com/test', 'api-key' = '<your-api-key>' ); -- Azure ML connection with comment CREATE CONNECTION `my_azureml_connection` COMMENT 'Connection Comment' WITH ( 'type' = 'azureml', 'endpoint' = 'https://myworkspace.myregion.inference.ml.azure.com/test', 'api-key' = '<your-api-key>' );
Azure OpenAI
Type:
azureopenaiRequired Parameters:
api-key: Your Azure OpenAI API key
Example:
CREATE CONNECTION `my_azureopenai_connection` WITH ( 'type' = 'azureopenai', 'endpoint' = 'https://your-resource.openai.azure.com/', 'api-key' = '<your-azure-openai-api-key>' );
Couchbase
Type:
couchbaseRequired Parameters:
username: Couchbase usernamepassword: Couchbase password
Example:
CREATE CONNECTION `my_couchbase_connection` WITH ( 'type' = 'couchbase', 'endpoint' = 'couchbases://my-cluster.cloud.couchbase.com', 'username' = '<user-name>', 'password' = '<password>' );
Confluent JDBC
Type:
confluent_jdbcRequired Parameters:
username: Database usernamepassword: Database password
Example: See Confluent JDBC section.
Elasticsearch
Type:
elasticRequired Parameters (choose one):
API key authentication:
api-key(for Elasticsearch Service)Basic authentication:
usernameandpassword(for self-hosted)
Examples:
-- Elasticsearch connection with API key (for Elasticsearch Service) CREATE CONNECTION `my_elastic_connection` WITH ( 'type' = 'elastic', 'endpoint' = '<elasticsearch-endpoint>', 'api-key' = '<elastic-api-key>' ); -- Elasticsearch connection with basic authentication (for self-hosted) CREATE CONNECTION `my_elastic_connection` WITH ( 'type' = 'elastic', 'endpoint' = '<elasticsearch-endpoint>', 'username' = '<elastic-username>', 'password' = '<elastic-password>' );
Google AI
Type:
googleaiRequired Parameters:
api-key: Your Google AI API key
Example:
CREATE CONNECTION `my_googleai_connection` WITH ( 'type' = 'googleai', 'endpoint' = 'https://generativelanguage.googleapis.com', 'api-key' = '<your-google-ai-api-key>' );
MCP Server
Type:
mcp_serverSupported Authentication Methods (choose one):
API Key:
api-keyBearer Token:
tokenBasic:
usernameandpasswordOAuth2:
token-endpoint,client-id,client-secret, andscope
Optional Parameters:
transport-type: Set toSTREAMABLE_HTTPfor streamable HTTP transport
Examples:
-- MCP server connection with OAuth CREATE CONNECTION `my_mcp_connection` WITH ( 'type' = 'mcp_server', 'endpoint' = 'https://mymcp.connection.com', 'token-endpoint' = '<token-endpoint>', 'client-id' = '<client-id>', 'client-secret' = '<client-secret>', 'scope' = '<scope>' ); -- MCP server connection with streamable HTTP CREATE CONNECTION `linear_mcp_connection` WITH ( 'type' = 'mcp_server', 'endpoint' = 'https://mcp.linear.app/mcp', 'token' = '<LINEAR_API_KEY>', 'transport-type' = 'STREAMABLE_HTTP' );
MongoDB
Type:
mongodbRequired Parameters:
username: MongoDB usernamepassword: MongoDB password
Example: See MongoDB external table section.
OpenAI
Type:
openaiRequired Parameters:
api-key: Your OpenAI API key
Example:
CREATE CONNECTION `my_openai_connection` WITH ( 'type' = 'openai', 'endpoint' = 'https://api.openai.com/v1', 'api-key' = '<your-api-key>' );
Pinecone
Type:
pineconeRequired Parameters:
api-key: Pinecone API key
Example:
CREATE CONNECTION `my_pinecone_connection` WITH ( 'type' = 'pinecone', 'endpoint' = 'https://your-index-project.svc.environment.pinecone.io', 'api-key' = '<your-pinecone-api-key>' );
REST
Type:
restSupported Authentication Methods (choose one):
API Key:
api-keyBearer Token:
tokenBasic:
usernameandpasswordOAuth2:
token-endpoint,client-id,client-secret, andscope
Examples:
-- REST connection with API key CREATE CONNECTION `my_rest_api_key_connection` WITH ( 'type' = 'rest', 'endpoint' = 'https://api.example.com/v1', 'api-key' = '<your-api-key>' ); -- REST connection with bearer token CREATE CONNECTION `my_rest_bearer_connection` WITH ( 'type' = 'rest', 'endpoint' = 'https://api.example.com/v1', 'token' = '<your-bearer-token>' ); -- REST connection with basic authentication CREATE CONNECTION `my_rest_basic_connection` WITH ( 'type' = 'rest', 'endpoint' = 'https://api.example.com/v1', 'username' = '<your-username>', 'password' = '<your-password>' ); -- REST connection with OAuth2 CREATE CONNECTION `my_rest_oauth_connection` WITH ( 'type' = 'rest', 'endpoint' = 'https://api.example.com/v1', 'token-endpoint' = 'https://auth.example.com/oauth/token', 'client-id' = '<your-client-id>', 'client-secret' = '<your-client-secret>', 'scope' = 'read write' );
Vertex AI
Type:
vertexaiRequired Parameters:
service-key: Google Cloud service account key (JSON format)
Example:
CREATE CONNECTION `my_vertexai_connection` WITH ( 'type' = 'vertexai', 'endpoint' = 'https://us-central1-aiplatform.googleapis.com', 'service-key' = '{"type": "service_account", "project_id": "your-project", ...}' );
MongoDB external table
-- Create a MongoDB connection with basic authorization.
CREATE CONNECTION `my_mongodb_connection`
WITH (
'type' = 'MONGODB',
'endpoint' = 'mongodb+srv://myCluster.mongodb.net/myDatabase',
'username' = '<atlas-user-name>',
'password' = '<atlas-password>'
);
-- Use the MongoDB connection to create a MongoDB external table.
CREATE TABLE mongodb_movies_full_text_search (
title STRING,
plot STRING
) WITH (
'connector' = 'mongodb',
'mongodb.connection' = 'my_mongodb_connection',
'mongodb.database' = 'sample_mflix',
'mongodb.collection' = 'movies',
'mongodb.index' = 'default'
);
Confluent JDBC
-- Create a Confluent JDBC connection with basic authorization.
CREATE CONNECTION `jdbc_postgres_connection`
WITH (
'type' = 'confluent_jdbc',
'endpoint' = 'jdbc:postgresql://my.example.com:5432/mydatabase',
'username' = '<user-name>',
'password' = '<password>');
-- Use the Confluent JDBC connection to create a table.
CREATE TABLE jdbc_postgres (
show_id STRING,
type STRING,
title STRING,
cast_members STRING,
country STRING,
date_added DATE,
release_year INT,
rating STRING,
duration STRING,
listed_in STRING,
description STRING
) WITH (
'connector' = 'confluent-jdbc',
'confluent-jdbc.connection' = 'jdbc_postgres_connection',
'confluent-jdbc.table-name' = 'netflix_shows'
);