Google Firebase Source Connector for Confluent Platform¶
The Kafka Connect Google Firebase Source connector enables users to read data from a Google Firebase Realtime Database and persist the data in Apache Kafka® topics. For more information, refer to Firebase Source connector configuration properties.
Features¶
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
Limitations¶
The Firebase Source connector has the following limitation:
- Unable to trace changes in data-reference when offline: When the
connector is not running, the records which are added or updated (multiple
times) from the data reference will not get captured after connector restarts.
A complete snapshot is taken each time the connector restarts, when the
configuration
gcp.firebase.snapshot
is set totrue
. This may lead to the connector writing duplicate records to the Kafka topic.
Configuration Properties¶
For a complete list of configuration properties for the source connector, see Configuration Reference for Firebase Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Google Firebase Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- A GCP service account with role
Firebase Realtime Database Viewer
for source connector andFirebase Realtime Database Admin
for sink connector is required. You can create this service account in the Google Cloud Console. For creating a service account, refer creating and managing service accounts.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-firebase:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-firebase:1.2.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
In this Quick Start, you configure the Firebase Source connector to capture records from a Firebase database and write them to a Kafka topic.
Start Confluent¶
Start the Confluent services using the following Confluent CLI command:
confluent local services start
Important
Do not use the Confluent CLI in production environments.
Property-based example¶
Create a configuration file firebase-source.properties
with the following
content. This file should be placed inside the Confluent Platform installation directory. This
configuration is used typically along with standalone
workers.
name=FirebaseSourceConnector
connector.class=io.confluent.connect.firebase.FirebaseSourceConnector
tasks.max=1
gcp.firebase.credentials.path=file-path-to-your-gcp-service-account-json-file
gcp.firebase.database.reference=https://<gcp-project-id>.firebaseio.com
gcp.firebase.snapshot=true
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Run the connector with this configuration.
confluent local services connect connector load FirebaseSourceConnector --config firebase-source.properties
The output should resemble:
{
"name":"FirebaseSourceConnector",
"config":{
"tasks.max":"1",
"connector.class":"io.confluent.connect.firebase.FirebaseSourceConnector",
"gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
"gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
"gcp.firebase.snapshot":"true",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"name":"FirebaseSourceConnector"
},
"tasks":[
{
"connector":"FirebaseSourceConnector",
"task":0
}
],
"type":"source"
}
Confirm that the connector is in a RUNNING
state by running the following
command:
confluent local services connect connector status FirebaseSourceConnector
The output should resemble:
{
"name":"FirebaseSourceConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"source"
}
REST-based example¶
Use this setting with distributed
workers. Write the following JSON to
config.json
, configure all of the required values, and use the following
command to post the configuration to one of the distributed connect workers. For
more information about the Kafka Connect REST API, see this
documentation.
{
"name" : "FirebaseSourceConnector",
"config" : {
"connector.class" : "io.confluent.connect.firebase.FirebaseSourceConnector",
"tasks.max" : "1",
"gcp.firebase.credentials.path" : "file-path-to-your-gcp-service-account-json-file",
"gcp.firebase.database.reference": "https://<gcp-project-id>.firebaseio.com",
"gcp.firebase.snapshot" : "true",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your
broker address(es), and change the confluent.topic.replication.factor
to
3
for staging or production use.
Use curl to post a configuration to one of the Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Connect worker(s).
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSourceConnector/config
Confirm that the connector is in a RUNNING
state by running the following
command:
curl http://localhost:8083/connectors/FirebaseSourceConnector/status
The output should resemble:
{
"name":"FirebaseSourceConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"source"
}
To publish records into Firebase, follow the Firebase documentation. The data
produced to firebase should adhere to the following data format. You can also use the JSON example mentioned in
the data format section, save it into a
data.json
file and finally import it into a Firebase database reference using the
import feature in the Firebase console.
To consume records written by the connector to the Kafka topic, run the following command:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic artists --from-beginning
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic songs --from-beginning
Firebase Database Structure¶
The connector expects the Firebase database to have data in the following format:
{
"musicBlog":{
"artists":{
"artistId1":{
"name":"Michael Jackson",
"genre":"Pop"
},
"artistId2":{
"name":"Bob Dylan",
"genre":"American folk"
},
"artistId3":{
"name":"Freddie Mercury",
"genre":"Rock"
}
},
"songs":{
"songId1":{
"title":"billie jean",
"artist":"Michael Jackson"
},
"songId2":{
"title":"hurricane",
"artist":"Bob Dylan"
},
"songId3":{
"title":"bohemian rhapsody",
"artist":"Freddie Mercury"
}
}
}
}
If the gcp.firebase.database.reference
is configured to
{firebase-application-url}/musicBlog
, the connector will realize artists
and songs
as kafka topic and the records within those topics will be
processed and pushed to respective kafka topic named artists
and songs
.
Note
The key/recordId
for records within the Firebase database collection
should follow the naming conventions for Avro fields.
Record Schema¶
The source connector creates Kafka records in the following format:
- Kafka Key: The Kafka record key consists of firebase record ID and is of type string schema.
- Kafka Value: The value schema is inferred based on the connector’s examination of records in firebase database.
- Kafka Header: The configured database reference will be a part of Kafka
record header with key as
Database_Reference
.