Splunk Sink Connector for Confluent Cloud

The Splunk Sink connector is used to move messages from Apache Kafka® to Splunk using the Splunk HTTP Event Collector (HEC).

Features

The Splunk Sink connector supports the following features:

  • At least once delivery: This connector guarantees that records from the Kafka topic are delivered at least once.
  • Supports multiple tasks: The connector supports running one or more tasks. More tasks may improve performance (that is, consumer lag is reduced with multiple tasks running).

For configuration property values and descriptions, see Configuration Properties.

For additional information, refer to Cloud connector limitations.

Quick Start

Use this quick start to get up and running with the Confluent Cloud Splunk Sink connector. The quick start provides the basics of selecting the connector and configuring it to stream events to Splunk.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
  • Authorized access to Splunk.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Environment Limitations for additional information.
  • At least one source Kafka topic must exist in your Confluent Cloud cluster before creating the sink connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the Splunk Sink connector icon.

Splunk Sink Connector Icon

Step 4: Set up the connection.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  • Descriptions for optional UI properties are not provided in the following steps. See Configuration Properties for configuration property values and descriptions.
  1. Select one or more topics.
  2. Enter a connector Name.
  3. Select an Input Kafka record value format (data coming from the Kafka topic): AVRO, PROTOBUF, JSON_SR (JSON Schema), JSON (schemaless), or STRING. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Environment Limitations for additional information.
  4. Select the way you want to provide Kafka Cluster credentials. You can either select a service account resource ID or you can enter an API key and secret (or generate these in the Cloud Console).
  5. Enter your Splunk URIs. Add a comma-separated list of FQDNs or IP addresses for all Splunk indexers, or add a load balancer. For Splunk indexers, load balancing uses round-robin scheduling. Example: https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088.
  6. Enter your Splunk Token. Add the Splunk HTTP Event Collector token.
  7. Enter the number of tasks to use with the connector. More tasks may improve performance.
  8. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details and click Launch.

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running.

Step 7: Check for records.

Verify that records are being produced at Splunk.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Using the Confluent CLI

To set up and run the connector using the Confluent CLI, complete the following steps.

Note

  • Make sure you have all your prerequisites completed.
  • The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.

Step 1: List the available connectors.

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

confluent connect plugin describe <connector-catalog-name>

For example:

confluent connect plugin describe SplunkSink

Example output:

Following are the required configs:
connector.class: SplunkSink
topics
input.data.format
name
kafka.api.key
kafka.api.secret
splunk.hec.uri
splunk.hec.token
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "SplunkSink",
  "topics": "orders",
  "name": "SplunkSinkConnector_0",
  "input.data.format": "AVRO",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "splunk.hec.uri": "https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088",
  "splunk.hec.token": "<token>",
  "tasks.max": "1",

}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "input.data.format": Sets the input Kafka record value format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, or STRING. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "splunk.hec.uri": Add a comma-separated list of FQDNs or IP addresses for all Splunk indexers, or add a load balancer. For Splunk indexers, load balancing uses round-robin scheduling. Example: https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088.

  • "splunk.hec.token": Add the Splunk HTTP Event Collector token.

  • "tasks.max": Enter the maximum number of tasks for the connector to use. More tasks may improve performance.

  • "topics": Enter the topic name or a comma-separated list of topic names.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

For configuration property values and descriptions, see Configuration Properties.

Step 3: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

confluent connect create --config <file-name>.json

For example:

confluent connect create --config splunk-sink-config.json

Example output:

Created connector SplunkSinkConnector_0 lcc-do6vzd

Step 4: Check the connector status.

Enter the following command to check the connector status:

confluent connect list

Example output:

ID           |             Name                | Status  | Type | Trace
+------------+---------------------------------+---------+------+-------+
lcc-do6vzd   | SplunkSinkConnector_0           | RUNNING | sink |       |

Step 5: Check for records.

Verify that records are populating Splunk.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Tip

When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.

Configuration Properties

The following connector configuration properties can be used with the Splunk Sink connector for Confluent Cloud. The properties are listed by importance.

splunk.hec.uri

Splunk URIs. Either a comma-separated list of FQDNs or IP addresses for all Splunk indexers, or a load balancer. For Splunk indexers, the connector load balances using round-robin scheduling. Example: https://hec1.splunk.com:8088,https://hec2.splunk.com:8088,https://hec3.splunk.com:8088.

  • Type: string
  • Importance: high
splunk.hec.token

Splunk HTTP Event Collector token.

  • Type: password
  • Importance: high
splunk.hec.ssl.trust.store.path

Path on the local disk to the certificate trust store.

  • Type: password
  • Default: “”
  • Importance: high
splunk.hec.ssl.trust.store.password

Password for the trust store.

  • Type: password
  • Default: [hidden]
  • Importance: high
splunk.hec.total.channels

Total number HEC Channels to use when posting events to Splunk.

  • Type: int
  • Default: 2
  • Importance: high
splunk.hec.ssl.validate.certs

Enables or disables HTTPS certification validation. Defaults to true.

  • Type: boolean
  • Default: true
  • Importance: medium
splunk.indexes

Comma-separated Splunk index names for Kafka topic data. Supports multiple index entries. Example: "prod-index1,prod-index2,prod-index3". For additional information, refer to Cloud connector limitations.

  • Type: string
  • Default: “”
  • Importance: medium
splunk.sourcetypes

Splunk event source type metadata for Kafka topic data.

  • Type: string
  • Default: “”
  • Importance: medium
splunk.sources

Splunk event source metadata for Kafka topic data.

  • Type: string
  • Default: “”
  • Importance: medium
splunk.hec.raw

Set this to true to ingest data using the /raw HEC endpoint. By default, this setting is false and the connector uses the /event HEC endpoint.

  • Type: boolean
  • Default: false
  • Importance: medium
splunk.hec.raw.line.breaker

Applicable only when splunk.hec.raw is set to true (using the``/raw`` HEC endpoint). This property is used to specify a custom line breaker to help Splunk separate events correctly. For example, you can specify ##### as a special line breaker and Splunk will split events on those characters.

  • Type: string
  • Default: “”
  • Importance: medium
splunk.hec.http.keepalive

This setting enables or disables HTTP connection keep-alive. By default, this is set to true.

  • Type: boolean
  • Default: true
  • Importance: medium
splunk.hec.use.record.timestamp

When set to true, the connector retrieves the timestamp from the Kafka record and passes it to Splunk as a HEC metadata override. This indexes events in Splunk with the record timestamp. By default, this is set to true.

  • Type: boolean
  • Default: true
  • Importance: medium
splunk.hec.max.http.connection.per.channel

The maximum number of HTTP connections that are pooled for one HEC Channel when posting events to Splunk.

  • Type: int
  • Default: 2
  • Importance: medium
splunk.hec.max.batch.size

The maximum batch size when posting events to Splunk. The size is the actual number of Kafka records, not the byte size. Defaults to 500 events.

  • Type: int
  • Default: 500
  • Importance: medium
splunk.hec.max.outstanding.events

The maximum amount of unacknowledged events kept in memory by the connector. When the threshold is exceeded, the connector triggers a backpressure event to slow event collection. Defaults to 1000000 events.

  • Type: int
  • Default: 1000000
  • Importance: medium
splunk.hec.max.retries

The maximum number of retries for a failed batch before terminating the task. When set to -1 (the default) the connector retries indefinitely.

  • Type: int
  • Default: -1
  • Importance: medium
splunk.hec.backoff.threshold.seconds

The amount of time the connector waits before attempting to resend failed events to Splunk.

  • Type: int
  • Default: 60
  • Importance: medium
splunk.hec.ack.enabled

When set to true, the connector polls event acknowledgements (ACKs) for POST events before check-pointing the Kafka offsets. This property implements guaranteed delivery (preventing data loss).

  • Type: boolean
  • Default: false
  • Importance: medium
splunk.hec.ack.poll.interval

Applicable only when splunk.hec.ack.enabled is set to true. It controls the event ACKs polling interval. Defaults to 10 seconds.

  • Type: int
  • Default: 10
  • Importance: medium
splunk.hec.ack.poll.threads

Applicable only when splunk.hec.ack.enabled is set to true. It controls how many threads should be spawned to poll event ACKs. By default, this is set to 2.

  • Type: int
  • Default: 2
  • Importance: medium
splunk.hec.event.timeout

Applicable only when splunk.hec.ack.enabled is set to true. This property determines how long the connector waits for an event ACK before timing out and attempting to resend the event. By default, this is set to 300 seconds.

  • Type: int
  • Default: 300
  • Importance: medium
splunk.header.support

When set to true the connector parses Kafka record headers for use as metadata in Splunk events.

  • Type: boolean
  • Default: false
  • Importance: medium
splunk.header.custom

The connector looks for Kafka record headers with the property values, and then adds them to each event (if present). Use comma-separated entries for multiple custom headers. For example, "custom_header_1,custom_header_2,custom_header_3.

  • Type: string
  • Default: “”
  • Importance: medium
splunk.header.index

Header to use for Splunk Header Index.

  • Type: string
  • Default: splunk.header.index
  • Importance: medium
splunk.header.source

Header to use for Splunk Header Source.

  • Type: string
  • Default: splunk.header.source
  • Importance: medium
splunk.header.sourcetype

Header to use for Splunk Header Sourcetype.

  • Type: string
  • Default: splunk.header.sourcetype
  • Importance: medium
splunk.header.host

Header to use for Splunk Header Host.

  • Type: string
  • Default: splunk.header.host
  • Importance: medium
splunk.hec.json.event.enrichment

Applicable only when splunk.hec.raw is set to false (using the /event HEC endpoint). This setting is used to enrich raw data with extra metadata fields. It contains a list of key value pairs separated by ,. The configured enrichment metadata is indexed along with raw event data by Splunk. Data enrichment for the /event HEC endpoint is only available in Splunk Enterprise version 6.5 (and later). By default, this setting is empty.

  • Type: string
  • Default: “”
  • Importance: low
splunk.hec.track.data

Applicable only when splunk.hec.raw is set to false (using the /event HEC endpoint). When set to true, data loss and data injection latency metadata is indexed along with raw data.

  • Type: boolean
  • Default: false
  • Importance: low
splunk.hec.socket.timeout

The maximum duration in seconds the connector waits (for authentication, etc.) before an internal TCP socket timeout occurs. By default, this is set to 60 seconds.

  • Type: int
  • Default: 60
  • Importance: low
splunk.hec.threads

The number of threads spawned for data injection using HEC in a single connector task.

  • Type: int
  • Default: 1
  • Importance: low
splunk.hec.json.event.formatted

This setting ensures events are preformatted into the correct HEC JSON format, and that events have metadata and event data. Set this property to true for events that are already in HEC format.

  • Type: boolean
  • Default: false
  • Importance: low
splunk.hec.lb.poll.interval

This setting controls the load balancer polling interval. Defaults to 120 seconds.

  • Type: int
  • Default: 120
  • Importance: low
splunk.flush.window

The interval in seconds between interations when the connector flushes events from Kafka to Splunk. Defaults to 120 seconds.

  • Type: int
  • Default: 120
  • Importance: low

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png