VMware Tanzu GemFire Sink Connector for Confluent Platform¶
The Kafka Connect VMware Tanzu™ GemFire Sink connector exports data from Apache Kafka® to VMware Tanzu GemFire. The VMware Tanzu GemFire Sink connector periodically polls data from Kafka and adds it to VMware Tanzu GemFire.
Note
This connector is compatible with VMware Tanzu GemFire 9.x and above.
Features¶
The Tanzu GemFire Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Tanzu GemFire Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Prerequisites¶
The following are required to run the Tanzu GemFire Sink connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
Limitations¶
- The connector supports only one task since it can create only one region object (client to write data). More information can be found here.
- This connector expects non-null keys, hence having explicit keys and values is necessary for the data to be exported to Tanzu GemFire.
Install the VMware Tanzu GemFire Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Important
You must install the connector on every machine where Connect will run.
An installation of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-cassandra:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent-hub install confluentinc/kafka-connect-cassandra:2.0.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see VMware Tanzu GemFire Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Quick Start¶
In this quick start, the VMware Tanzu GemFire Sink connector is used to export data produced by the Avro console producer to the Tanzu GemFire cache region.
Note
Before you begin: Start the VMware Tanzu GemFire locator and server. Create a cache region to store the data.
Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status.
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
To import a few records with a simple schema in Kafka, start the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic input_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, enter the following:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic input_topic
in
Avro format.
Property-based example¶
Create a configuration file, gemfire.properties
. This configuration is used
typically along with standalone
workers.
name=gemfire-sink
connector.class=io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector
tasks.max=1
topics=input_topic
gemfire.locator.host=localhost
gemfire.locator.port=10334
gemfire.username= <gemfire username>
gemfire.password= <gemfire password>
gemfire.region=check
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Before starting the connector, make sure that the configurations in
gemfire.properties
are properly set.
Note
Provide either gemfire.locator.host
or gemfire.server.host
to
establish connection with VMware Tanzu GemFire and run the connector
Then start the VMware Tanzu GemFire connector by loading its configuration with the following command:
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load gemfire-sink --config gemfire.properties
{
"name": "gemfire-sink",
"config": {
"name":"gemfire-sink",
"connector.class":"io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector",
"tasks.max":"1",
"topics":"input_topic",
"gemfire.locator.host":"localhost",
"gemfire.locator.port":"10334",
"gemfire.username":"<gemfire username>",
"gemfire.password":"<gemfire password>",
"gemfire.region":"check",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
REST-based example¶
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API
{
"name": "gemfire-sink",
"config": {
"name":"gemfire-sink",
"connector.class":"io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector",
"tasks.max":"1",
"topics":"input_topic",
"gemfire.locator.host":"localhost",
"gemfire.locator.port":"10334",
"gemfire.username":"<gemfire username>",
"gemfire.password":"<gemfire password>",
"gemfire.region":"check",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers.
Change http://localhost:8083/
the endpoint of one of your Kafka Connect
worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config
Check that the connector started successfully. Review the Connect worker’s log by entering the following:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few
messages, and then adds data from Kafka to the Tanzu GemFire check
cache
region.
Once the connector has ingested records, check that the data is available in the
Tanzu GemFire check
cache region. Use the following command:
To see the values in the Gemfire check
region.
query --query="select * from /check"
Result : true
Limit : 100
Rows : 3
Result
-----------
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
To see the keys in the Gemfire check
region.
query --query="select * from /check.keySet"
Result : true
Limit : 100
Rows : 3
Result
-----------
kafka1$0$1
kafka1$0$2
kafka1$0$3
Finally, stop the Connect worker and all other Confluent services by running:
confluent local stop
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start by entering the following command:
confluent local destroy
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE