Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Google Firebase Sink Connector for Confluent Platform¶
The Kafka Connect Google Firebase Sink Connector enables users to read data from multiple Kafka topics and write them to Google Firebase Realtime Database. For more information, please refer to Firebase sink connector configuration properties.
Configuration Properties¶
For a complete list of configuration properties for the source connector, see Google Firebase Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Quick Start¶
In this Quick Start, you configure the Firebase Sink connector to read records from Kafka topics and write them to a Firebase Realtime Database.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file firebase-sink.properties
with the following content.
This file should be placed inside the Confluent Platform installation directory.
This configuration is used typically along with standalone workers.
name=FirebaseSinkConnector
topics=artists,songs
connector.class=io.confluent.connect.firebase.FirebaseSinkConnector
tasks.max=1
gcp.firebase.credentials.path=file-path
gcp.firebase.database.reference=database-url
insert.mode=set/update/push
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url":"http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Run the connector with this configuration.
confluent local load FirebaseSinkConnector -- -d firebase-sink.properties
The output should resemble:
{
"name":"FirebaseSinkConnector",
"config":{
"topics":"artists,songs",
"tasks.max":"1",
"connector.class":"io.confluent.connect.firebase.FirebaseSinkConnector",
"gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
"gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
"insert.mode":"update",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"name":"FirebaseSinkConnector"
},
"tasks":[
{
"connector":"FirebaseSinkConnector",
"task":0
}
],
"type":"sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local status FirebaseSinkConnector
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
REST-based example¶
Use this setting with distributed workers.
Write the following JSON to config.json
, configure all of the required values,
and use the following command to post the configuration to one of the distributed connect workers.
Check here for more information about the Kafka Connect REST API
{
"name" : "FirebaseSinkConnector",
"config" : {
"topics":"artists,songs",
"connector.class" : "io.confluent.connect.firebase.FirebaseSinkConnector",
"tasks.max" : "1",
"gcp.firebase.credentials.path" : "credential path",
"gcp.firebase.database.reference": "database url",
"insert.mode" : "set/update/push",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es) and change the confluent.topic.replication.factor
to 3
for staging or production use.
Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSinkConnector/config
Confirm that the connector is in a RUNNING
state by running the following command:
curl http://localhost:8083/connectors/FirebaseSinkConnector/status | jq
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
Search for the endpoint /connectors/FirebaseSinkConnector/status
, the state of the connector and tasks should have status as RUNNING
.
To produce Avro data to Kafka topic: artists
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic artists \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"artists","fields":[{"name":"name","type":"string"},{"name":"genre","type":"string"}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
"artistId1":{"name":"Michael Jackson","genre":"Pop"}
"artistId2":{"name":"Bob Dylan","genre":"American folk"}
"artistId3":{"name":"Freddie Mercury","genre":"Rock"}
To produce Avro data to Kafka topic: songs
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic songs \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"songs","fields":[{"name":"title","type":"string"},{"name":"artist","type":"string"}]}'
While the console is waiting for the input, paste the following three records on the Firebase console.
"songId1":{"title":"billie jean","artist":"Michael Jackson"}
"songId2":{"title":"hurricane","artist":"Bob Dylan"}
"songId3":{"title":"bohemian rhapsody","artist":"Freddie Mercury"}
Finally, check the Firebase console to ensure that the collections named artists
and songs
were created
and the records are in the format defined in the Firebase database structure.
Firebase Database Structure¶
The connector pushes the records in the Kafka topic to the Firebase database in the following format :
{
"musicBlog":{
"artists":{
"artistId1":{
"name":"Michael Jackson",
"genre":"Pop"
},
"artistId2":{
"name":"Bob Dylan",
"genre":"American folk"
},
"artistId3":{
"name":"Freddie Mercury",
"genre":"Rock"
}
},
"songs":{
"songId1":{
"title":"billie jean",
"artist":"Michael Jackson"
},
"songId2":{
"title":"hurricane",
"artist":"Bob Dylan"
},
"songId3":{
"title":"bohemian rhapsody",
"artist":"Freddie Mercury"
}
}
}
}
If the gcp.firebase.database.reference
is configured to {firebase-application-url}/musicBlog
,
the connector creates collections with the same name as the Kafka topics from which records are being consumed.
The records within each Kafka topic are pushed to respective firebase collection.