Google Firebase Source Connector for Confluent Platform

The Kafka Connect Google Firebase Source connector enables users to read data from a Google Firebase Realtime Database and persist the data in Apache Kafka® topics. For more information, please refer to Firebase source connector configuration properties.

Features

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Prerequisites

The following are required to run the Kafka Connect Firebase Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • A GCP service account with role Firebase Realtime Database Viewer for source connector and Firebase Realtime Database Admin for sink connector is required. You can create this service account in the Google Cloud Console. For creating a service account, refer creating and managing service accounts.

Install the Google Firebase Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Important

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-firebase:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-firebase:1.1.0-preview
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Limitations

The Firebase Source Connector has the following limitation:

  • Unable to trace changes in data-reference when offline: When the connector is not running, the records which are added or updated (multiple times) from the data reference will not get captured after connector restarts. A complete snapshot is taken each time the connector restarts, when the configuration gcp.firebase.snapshot is set to true. This may lead to the connector writing duplicate records to the Kafka topic.

Configuration Properties

For a complete list of configuration properties for the source connector, see Google Firebase Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

In this Quick Start, you configure the Firebase Source connector to capture records from a Firebase database and write them to a Kafka topic.

Start Confluent

Start the Confluent services using the following Confluent CLI command:

confluent local services start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file firebase-source.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

name=FirebaseSourceConnector

connector.class=io.confluent.connect.firebase.FirebaseSourceConnector
tasks.max=1

gcp.firebase.credentials.path=file-path-to-your-gcp-service-account-json-file
gcp.firebase.database.reference=https://<gcp-project-id>.firebaseio.com
gcp.firebase.snapshot=true

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Run the connector with this configuration.

confluent local services connect connector load FirebaseSourceConnector --config firebase-source.properties

The output should resemble:

{
    "name":"FirebaseSourceConnector",
    "config":{
        "tasks.max":"1",
        "connector.class":"io.confluent.connect.firebase.FirebaseSourceConnector",
        "gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
        "gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
        "gcp.firebase.snapshot":"true",
        "confluent.topic.bootstrap.servers":"localhost:9092",
        "confluent.topic.replication.factor":"1",
         "name":"FirebaseSourceConnector"
     },
    "tasks":[
        {
            "connector":"FirebaseSourceConnector",
            "task":0
        }
    ],
    "type":"source"
}

Confirm that the connector is in a RUNNING state by running the following command:

confluent local services connect connector status FirebaseSourceConnector

The output should resemble:

{
   "name":"FirebaseSourceConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"source"
}

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. For more information about the Kafka Connect REST API, see this documentation.

{
  "name" : "FirebaseSourceConnector",
  "config" : {
    "connector.class" : "io.confluent.connect.firebase.FirebaseSourceConnector",
    "tasks.max" : "1",

    "gcp.firebase.credentials.path" : "file-path-to-your-gcp-service-account-json-file",
    "gcp.firebase.database.reference": "https://<gcp-project-id>.firebaseio.com",
    "gcp.firebase.snapshot" : "true",

    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license": " Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Connect workers. Change http://localhost:8083/ to the endpoint of one of your Connect worker(s).

curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSourceConnector/config

Confirm that the connector is in a RUNNING state by running the following command:

curl http://localhost:8083/connectors/FirebaseSourceConnector/status

The output should resemble:

{
   "name":"FirebaseSourceConnector",
   "connector":{
      "state":"RUNNING",
      "worker_id":"127.0.1.1:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"127.0.1.1:8083"
      }
   ],
   "type":"source"
}

To publish records into Firebase, follow the Firebase documentation. The data produced to firebase should adhere to the following data format. We can also use the JSON example mentioned in the data format section, save it into a data.json file and finally import it into a Firebase database reference via import feature in the Firebase console.

To consume records written by the connector to the Kafka topic, run the following command:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic artists --from-beginning
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic songs --from-beginning

Firebase Database Structure

The connector expects the Firebase database to have data in the following format:

{
   "musicBlog":{
      "artists":{
         "artistId1":{
            "name":"Michael Jackson",
            "genre":"Pop"
         },
         "artistId2":{
            "name":"Bob Dylan",
            "genre":"American folk"
         },
         "artistId3":{
            "name":"Freddie Mercury",
            "genre":"Rock"
         }
      },
      "songs":{
         "songId1":{
            "title":"billie jean",
            "artist":"Michael Jackson"
         },
         "songId2":{
            "title":"hurricane",
            "artist":"Bob Dylan"
         },
         "songId3":{
            "title":"bohemian rhapsody",
            "artist":"Freddie Mercury"
         }
      }
   }
}

If the gcp.firebase.database.reference is configured to {firebase-application-url}/musicBlog, the connector will realize artists and songs as kafka topic and the records within those topics will be processed and pushed to respective kafka topic named artists and songs.

Note

The key/recordId for records within the Firebase database collection should follow the naming conventions for Avro fields.

Record Schema

The source connector creates Kafka records in the following format:

  • Kafka Key: The Kafka record key consists of firebase record ID and is of type string schema.
  • Kafka Value: The value schema is inferred based on the connector’s examination of records in firebase database.
  • Kafka Header: The configured database reference will be a part of Kafka record header with key as Database_Reference.