Google Firebase Sink Connector for Confluent Platform

The Kafka Connect Google Firebase Sink Connector enables users to read data from multiple Kafka topics and write them to Google Firebase Realtime Database. For more information, please refer to Firebase sink connector configuration properties.

Quick Start

In this Quick Start, you configure the Firebase Sink connector to read records from Kafka topics and write them to a Firebase Realtime Database.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file firebase-sink.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

name=FirebaseSinkConnector

topics=artists,songs
connector.class=io.confluent.connect.firebase.FirebaseSinkConnector
tasks.max=1

gcp.firebase.credentials.path=file-path
gcp.firebase.database.reference=database-url
insert.mode=set/update/push

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url":"http://localhost:8081

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Run the connector with this configuration.

confluent local load FirebaseSinkConnector -- -d firebase-sink.properties

The output should resemble:

 {
    "name":"FirebaseSinkConnector",
    "config":{
        "topics":"artists,songs",
        "tasks.max":"1",
        "connector.class":"io.confluent.connect.firebase.FirebaseSinkConnector",
        "gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
        "gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
        "insert.mode":"update",
        "key.converter" : "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter" : "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://localhost:8081",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1",
        "name":"FirebaseSinkConnector"
     },
    "tasks":[
      {
        "connector":"FirebaseSinkConnector",
        "task":0
      }
     ],
     "type":"sink"
}

Confirm that the connector is in a RUNNING state.

confluent local status FirebaseSinkConnector

The output should resemble:

{
   "name":"FirebaseSinkConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

{
  "name" : "FirebaseSinkConnector",
  "config" : {
    "topics":"artists,songs",
    "connector.class" : "io.confluent.connect.firebase.FirebaseSinkConnector",
    "tasks.max" : "1",

    "gcp.firebase.credentials.path" : "credential path",
    "gcp.firebase.database.reference": "database url",
    "insert.mode" : "set/update/push",

    "key.converter" : "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"http://localhost:8081",
    "value.converter" : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",


    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license": " Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSinkConnector/config

Confirm that the connector is in a RUNNING state by running the following command:

curl http://localhost:8083/connectors/FirebaseSinkConnector/status | jq

The output should resemble:

{
   "name":"FirebaseSinkConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

Search for the endpoint /connectors/FirebaseSinkConnector/status, the state of the connector and tasks should have status as RUNNING.

To produce Avro data to Kafka topic: artists, use the following command.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic artists \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"artists","fields":[{"name":"name","type":"string"},{"name":"genre","type":"string"}]}'

While the console is waiting for the input, use the following three records and paste each of them on the console.

"artistId1":{"name":"Michael Jackson","genre":"Pop"}
"artistId2":{"name":"Bob Dylan","genre":"American folk"}
"artistId3":{"name":"Freddie Mercury","genre":"Rock"}

To produce Avro data to Kafka topic: songs, use the following command.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic songs \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"songs","fields":[{"name":"title","type":"string"},{"name":"artist","type":"string"}]}'

While the console is waiting for the input, paste the following three records on the Firebase console.

"songId1":{"title":"billie jean","artist":"Michael Jackson"}
"songId2":{"title":"hurricane","artist":"Bob Dylan"}
"songId3":{"title":"bohemian rhapsody","artist":"Freddie Mercury"}

Finally, check the Firebase console to ensure that the collections named artists and songs were created and the records are in the format defined in the Firebase database structure.

Firebase Database Structure

The connector pushes the records in the Kafka topic to the Firebase database in the following format :

{
   "musicBlog":{
      "artists":{
         "artistId1":{
            "name":"Michael Jackson",
            "genre":"Pop"
         },
         "artistId2":{
            "name":"Bob Dylan",
            "genre":"American folk"
         },
         "artistId3":{
            "name":"Freddie Mercury",
            "genre":"Rock"
         }
      },
      "songs":{
         "songId1":{
            "title":"billie jean",
            "artist":"Michael Jackson"
         },
         "songId2":{
            "title":"hurricane",
            "artist":"Bob Dylan"
         },
         "songId3":{
            "title":"bohemian rhapsody",
            "artist":"Freddie Mercury"
         }
      }
   }
}

If the gcp.firebase.database.reference is configured to {firebase-application-url}/musicBlog, the connector creates collections with the same name as the Kafka topics from which records are being consumed. The records within each Kafka topic are pushed to respective firebase collection.

Additional Documentation