Google Firebase Sink Connector for Confluent Platform¶
The Kafka Connect Google Firebase Sink connector enables users to read data from multiple Kafka topics and write them to Google Firebase Realtime Database. For more information, see Firebase Sink connector configuration properties.
The Firebase Sink connector for Confluent Platform includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Google Firebase Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for the sink connector, see Configuration Reference for Firebase Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Google Firebase Sink Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- A GCP service account with role
Firebase Realtime Database Viewer
for source connector andFirebase Realtime Database Admin
for sink connector is required. You can create this service account in the Google Cloud Console. For creating a service account, refer creating and managing service accounts.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-firebase:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-firebase:1.2.0
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
In this Quick Start, you configure the Firebase Sink connector to read records from Kafka topics and write them to a Firebase Realtime Database.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file firebase-sink.properties
with the following
content. This file should be placed inside the Confluent Platform installation directory. This
configuration is used typically along with standalone
workers.
name=FirebaseSinkConnector
topics=artists,songs
connector.class=io.confluent.connect.firebase.FirebaseSinkConnector
tasks.max=1
gcp.firebase.credentials.path=file-path
gcp.firebase.database.reference=database-url
insert.mode=set/update/push
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url":"http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Run the connector with this configuration.
confluent local services connect connector load FirebaseSinkConnector --config firebase-sink.properties
The output should resemble:
{
"name":"FirebaseSinkConnector",
"config":{
"topics":"artists,songs",
"tasks.max":"1",
"connector.class":"io.confluent.connect.firebase.FirebaseSinkConnector",
"gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
"gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
"insert.mode":"update",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"name":"FirebaseSinkConnector"
},
"tasks":[
{
"connector":"FirebaseSinkConnector",
"task":0
}
],
"type":"sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status FirebaseSinkConnector
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
REST-based example¶
Use this setting with distributed
workers. Write the following JSON to
config.json
, configure all of the required values, and use the following
command to post the configuration to one of the distributed connect workers. For
more information about the Kafka Connect REST API, see this
documentation.
{
"name" : "FirebaseSinkConnector",
"config" : {
"topics":"artists,songs",
"connector.class" : "io.confluent.connect.firebase.FirebaseSinkConnector",
"tasks.max" : "1",
"gcp.firebase.credentials.path" : "credential path",
"gcp.firebase.database.reference": "database url",
"insert.mode" : "set/update/push",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your
broker address(es) and change the confluent.topic.replication.factor
to
3
for staging or production use.
Use curl to post a configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect
worker(s).
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSinkConnector/config
Confirm that the connector is in a RUNNING
state by running the following
command:
curl http://localhost:8083/connectors/FirebaseSinkConnector/status | jq
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
Search for the endpoint /connectors/FirebaseSinkConnector/status
, the state
of the connector and tasks should have status as RUNNING
.
To produce Avro data to Kafka topic: artists
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic artists \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"artists","fields":[{"name":"name","type":"string"},{"name":"genre","type":"string"}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
"artistId1":{"name":"Michael Jackson","genre":"Pop"}
"artistId2":{"name":"Bob Dylan","genre":"American folk"}
"artistId3":{"name":"Freddie Mercury","genre":"Rock"}
To produce Avro data to Kafka topic: songs
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic songs \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"songs","fields":[{"name":"title","type":"string"},{"name":"artist","type":"string"}]}'
While the console is waiting for the input, paste the following three records on the Firebase console.
"songId1":{"title":"billie jean","artist":"Michael Jackson"}
"songId2":{"title":"hurricane","artist":"Bob Dylan"}
"songId3":{"title":"bohemian rhapsody","artist":"Freddie Mercury"}
Finally, check the Firebase console to ensure that the collections named
artists
and songs
were created and the records are in the format defined
in the Firebase database structure.
Firebase Database Structure¶
The connector pushes the records in the Kafka topic to the Firebase database in the following format:
{
"musicBlog":{
"artists":{
"artistId1":{
"name":"Michael Jackson",
"genre":"Pop"
},
"artistId2":{
"name":"Bob Dylan",
"genre":"American folk"
},
"artistId3":{
"name":"Freddie Mercury",
"genre":"Rock"
}
},
"songs":{
"songId1":{
"title":"billie jean",
"artist":"Michael Jackson"
},
"songId2":{
"title":"hurricane",
"artist":"Bob Dylan"
},
"songId3":{
"title":"bohemian rhapsody",
"artist":"Freddie Mercury"
}
}
}
}
If the gcp.firebase.database.reference
is configured to
{firebase-application-url}/musicBlog
, the connector creates collections with
the same name as the Kafka topics from which records are being consumed. The
records within each Kafka topic are pushed to respective firebase collection.