Google Firebase Sink Connector for Confluent Platform

The Kafka Connect Google Firebase Sink connector enables users to read data from multiple Kafka topics and write them to Google Firebase Realtime Database. For more information, see Firebase Sink connector configuration properties.

The Firebase Sink connector for Confluent Platform includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Google Firebase Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for the sink connector, see Configuration Reference for Firebase Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Google Firebase Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • A GCP service account with role Firebase Realtime Database Viewer for source connector and Firebase Realtime Database Admin for sink connector is required. You can create this service account in the Google Cloud Console. For creating a service account, refer creating and managing service accounts.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-firebase:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-firebase:1.2.0

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

In this Quick Start, you configure the Firebase Sink connector to read records from Kafka topics and write them to a Firebase Realtime Database.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local services start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file firebase-sink.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

name=FirebaseSinkConnector

topics=artists,songs
connector.class=io.confluent.connect.firebase.FirebaseSinkConnector
tasks.max=1

gcp.firebase.credentials.path=file-path
gcp.firebase.database.reference=database-url
insert.mode=set/update/push

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url":"http://localhost:8081

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Run the connector with this configuration.

confluent local services connect connector load FirebaseSinkConnector --config firebase-sink.properties

The output should resemble:

 {
    "name":"FirebaseSinkConnector",
    "config":{
        "topics":"artists,songs",
        "tasks.max":"1",
        "connector.class":"io.confluent.connect.firebase.FirebaseSinkConnector",
        "gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
        "gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
        "insert.mode":"update",
        "key.converter" : "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter" : "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://localhost:8081",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1",
        "name":"FirebaseSinkConnector"
     },
    "tasks":[
      {
        "connector":"FirebaseSinkConnector",
        "task":0
      }
     ],
     "type":"sink"
}

Confirm that the connector is in a RUNNING state.

confluent local services connect connector status FirebaseSinkConnector

The output should resemble:

{
   "name":"FirebaseSinkConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. For more information about the Kafka Connect REST API, see this documentation.

{
  "name" : "FirebaseSinkConnector",
  "config" : {
    "topics":"artists,songs",
    "connector.class" : "io.confluent.connect.firebase.FirebaseSinkConnector",
    "tasks.max" : "1",

    "gcp.firebase.credentials.path" : "credential path",
    "gcp.firebase.database.reference": "database url",
    "insert.mode" : "set/update/push",

    "key.converter" : "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"http://localhost:8081",
    "value.converter" : "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",

    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license": " Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSinkConnector/config

Confirm that the connector is in a RUNNING state by running the following command:

curl http://localhost:8083/connectors/FirebaseSinkConnector/status | jq

The output should resemble:

{
   "name":"FirebaseSinkConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"sink"
}

Search for the endpoint /connectors/FirebaseSinkConnector/status, the state of the connector and tasks should have status as RUNNING.

To produce Avro data to Kafka topic: artists, use the following command.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic artists \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"artists","fields":[{"name":"name","type":"string"},{"name":"genre","type":"string"}]}'

While the console is waiting for the input, use the following three records and paste each of them on the console.

"artistId1":{"name":"Michael Jackson","genre":"Pop"}
"artistId2":{"name":"Bob Dylan","genre":"American folk"}
"artistId3":{"name":"Freddie Mercury","genre":"Rock"}

To produce Avro data to Kafka topic: songs, use the following command.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic songs \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"songs","fields":[{"name":"title","type":"string"},{"name":"artist","type":"string"}]}'

While the console is waiting for the input, paste the following three records on the Firebase console.

"songId1":{"title":"billie jean","artist":"Michael Jackson"}
"songId2":{"title":"hurricane","artist":"Bob Dylan"}
"songId3":{"title":"bohemian rhapsody","artist":"Freddie Mercury"}

Finally, check the Firebase console to ensure that the collections named artists and songs were created and the records are in the format defined in the Firebase database structure.

Firebase Database Structure

The connector pushes the records in the Kafka topic to the Firebase database in the following format:

{
   "musicBlog":{
      "artists":{
         "artistId1":{
            "name":"Michael Jackson",
            "genre":"Pop"
         },
         "artistId2":{
            "name":"Bob Dylan",
            "genre":"American folk"
         },
         "artistId3":{
            "name":"Freddie Mercury",
            "genre":"Rock"
         }
      },
      "songs":{
         "songId1":{
            "title":"billie jean",
            "artist":"Michael Jackson"
         },
         "songId2":{
            "title":"hurricane",
            "artist":"Bob Dylan"
         },
         "songId3":{
            "title":"bohemian rhapsody",
            "artist":"Freddie Mercury"
         }
      }
   }
}

If the gcp.firebase.database.reference is configured to {firebase-application-url}/musicBlog, the connector creates collections with the same name as the Kafka topics from which records are being consumed. The records within each Kafka topic are pushed to respective firebase collection.