Salesforce Bulk API Source Connector for Confluent Platform¶
Salesforce Bulk API Source Connector により、Salesforce.com と Apache Kafka® が統合されます。
Salesforce Bulk API Source Connector では、Salesforce.com から Salesforce 一括クエリ API を介して、レコードのプルおよび変更のキャプチャを行うことができます。
Salesforce オブジェクト(SObject) は、標準の Salesforce オブジェクトです。SalesforceBulkApiSourceConnector
は、オブジェクトをプル、または変更イベントをキャプチャーして、Kafka のトピックに書き込むために使用できます。このコネクターは、スタンドアロンと分散のどちらの Connect ワーカーでも使用できます。
機能¶
Salesforce Bulk API Source Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合、Kafka のトピックに重複レコードが存在している可能性があります。
1 つのタスクをサポート¶
Salesforce Bulk API Source Connector は、1 つのタスクのみの実行をサポートしています。
構成プロパティ¶
このコネクターの構成プロパティの網羅的なリストについては、「Salesforce Bulk API Source Connector 構成プロパティ」を参照してください。
考慮事項¶
Salesforce Bulk API Source Connector を使用する場合は、以下に従います。
再起動¶
コネクターでは、操作する際、Connect オフセットトピックに "クエリの最終実行時刻" が定期的に記録されます。コネクターは、再起動されると、LastModifiedDate
がクエリの最終実行時刻よりも後の Salesforce オブジェクトをフェッチします。
API の制限¶
Salesforce Bulk API コネクターは、複合以外のフィールドに制限されます。たとえば、一括クエリでは、住所または地理位置情報のフィールドはサポートされていません。コネクターでは、住所および地理位置情報のフィールドは破棄されます。
SObject の制限¶
Salesforce Bulk API Source Connector を使用しているときに、以下の Salesforce オブジェクト(SObject)エラーメッセージが表示される場合があります。
Entity 'Order' is not supported to use PKChunking.
これらの SObject に batch.enable=false
構成のプロパティを設定します。このプロパティは Salesforce Bulk API Source Connector バージョン 1.7.0 以上で利用できます。
Salesforce オブジェクトのサポート¶
このバージョンの Kafka Connect Salesforce Bulk API Source Connector では、Salesforce の以下のオブジェクトがサポートされています。
- Account
- Campaign
- CampaignMember
- Case
- Contact
- Contract
- Event
- Group
- Lead
- Opportunity
- OpportunityContactRole
- OpportunityLineItem
- Period
- PricebookEntry
- Product2
- Task
- TaskFeed
- TaskRelation
- User
- UserRole
Kafka Connect Salesforce Bulk API Source Connector では、複合以外のフィールドを持つカスタムオブジェクトもサポートされています。
以下のオブジェクトは、Salesforce Bulk API ではサポートされていません。
- Feed(AccountFeed、AssetFeed など)
- Share(AccountBrandShare、ChannelProgramLevelShare など)
- History(AccountHistory、ActivityHistory など)
- EventRelation(AcceptedEventRelation、DeclinedEventRelation など)
- AggregateResult
- AttachedContentDocument
- CaseStatus
- CaseTeamMember
- CaseTeamRole
- CaseTeamTemplate
- CaseTeamTemplateMember
- CaseTeamTemplateRecord
- CombinedAttachment
- ContentFolderItem
- ContractStatus
- EventWhoRelation
- FolderedContentDocument
- KnowledgeArticleViewStat
- KnowledgeArticleVoteStat
- LookedUpFromActivity
- Name
- NoteAndAttachment
- OpenActivity
- OwnedContentDocument
- PartnerRole
- RecentlyViewed
- ServiceAppointmentStatus
- SolutionStatus
- TaskPriority
- TaskStatus
- TaskWhoRelation
- UserRecordAccess
- WorkOrderLineItemStatus
- WorkOrderStatus
クイックスタート¶
このクイックスタートでは、Salesforce Bulk API Source Connector を使用して、Salesforce から Kafka にデータをインポートします。
- Salesforce Developer Edition のアカウントがない場合は、この リンク を使用して作成します。
- アプリケーションランチャーをクリックし、必要な Salesforce オブジェクトを選択して、レコードをオブジェクトに追加します。
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest
ちなみに
デフォルトで、プラグインが share/confluent-hub-components
にインストールされ、このディレクトリがプラグインパスに追加されます。これが初めてインストールしたコネクターだった場合は、プラグインパスの変更を反映するために、Connect サーバーの再起動が必要となる場合があります。
Confluent CLI を使用してサービスを開始します。
confluent local start
各サービスが順番に開始され、ステータスを含むメッセージが出力されます。
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
注釈
SalesforceBulkApiSourceConnector
では、1 つのタスクのみがサポートされます。
プロパティベースの例¶
構成ファイル salesforce-bulk-api.properties
を作成します。この構成は通常、 スタンドアロンワーカー で使用されます。
name=SalesforceBulkApiSourceConnector
tasks.max=1
connector.class=io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
salesforce.username=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >
salesforce.object=< Required Configuration >
salesforce.since=< Required Configuration >
kafka.topic=< Required Configuration >
salesforce.instance=< Required Configuration >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=Omit to enable trial mode
コネクターを起動する前に、salesforce-bulk-api.properties
の構成が正しく設定されていることを確認します。
以下のコマンドを使用して構成を読み込み、Salesforce Bulk API Source Connector を起動します。
注意
コネクター名とフラグの間には、ダブルダッシュ( --
)を入れる必要があります。詳細については、こちらの投稿 を参照してください。
confluent local load salesforce-bulk-api-source -- -d salesforce-bulk-api.properties
{
"name" : "SalesforceBulkApiSourceConnector",
"config" : {
"connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
"tasks.max" : "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"kafka.topic" : "< Required Configuration >",
"salesforce.password" : "< Required Configuration >",
"salesforce.password.token" : "< Required Configuration >",
"salesforce.object" : "< Required Configuration >",
"salesforce.username" : "< Required Configuration >",
"salesforce.since" : "< Required Configuration >",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": ""
},
"tasks": []
}
コネクターが正常に起動されたことを確認します。以下を入力して、Connect ワーカーのログを確認します。
confluent local log connect
コネクターが RUNNING
ステートであることを確認します。
confluent local status SalesforceBulkApiSourceConnector
メッセージが Kafka に送信されていることを確認します。
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic <topic-name> \
--from-beginning | jq '.'
REST ベースの例¶
この構成は通常、 分散ワーカー で使用されます。以下の JSON を connector.json
に書き込み、すべての必要な値を構成し、以下のコマンドを使用して構成を分散 Connect ワーカーのいずれかにポストします。詳細については、「Kafka Connect REST インターフェイス 」を参照してください。
Connect 分散 REST の例 :
{
"name" : "SalesforceBulkApiSourceConnector",
"config" : {
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
"tasks.max" : "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"kafka.topic" : "< Required Configuration >",
"salesforce.password" : "< Required Configuration >",
"salesforce.password.token" : "< Required Configuration >",
"salesforce.object" : "< Required Configuration >",
"salesforce.username" : "< Required Configuration >",
"salesforce.since" : "< Required Configuration >",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
注釈
confluent.topic.bootstrap.servers
プロパティを変更してブローカーのアドレスを含めます。また、ステージングまたは本稼働環境では、confluent.topic.replication.factor
を 3 に変更します。
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/
を、Kafka Connect ワーカーのいずれかのエンドポイントに変更します。
新しいコネクターの作成 :
curl -sS -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
既存のコネクターのアップデート :
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforceBulkApiSourceConnector/config
サンプルデータフォーマット¶
以下の例は、Salesforce 一括クエリの結果の JSON ドキュメント構造(Contact オブジェクトの場合)を示しています。結果は Salesforce コネクターで受信され、Kafka レコードに変換された後、トピックに保管されます。
Salesforce 一括クエリから受信した未加工の JSON:
[
{
"attributes" : {
"type" : "Contact",
"url" : "/services/data/v47.0/sobjects/Contact/0032v00002qXTBlAAO"
},
"Id" : "0032v00002qXTBlAAO",
"IsDeleted" : false,
"MasterRecordId" : null,
"AccountId" : "0012v00002RkgUVAAZ",
"LastName" : "Gonzalez",
"FirstName" : "Rose",
"Salutation" : "Ms.",
"Name" : "Rose Gonzalez",
"OtherStreet" : null,
"OtherCity" : null,
"OtherState" : null,
"OtherPostalCode" : null,
"OtherCountry" : null,
"OtherLatitude" : null,
"OtherLongitude" : null,
"OtherGeocodeAccuracy" : null,
"MailingStreet" : "313 Constitution Place\nAustin, TX 78767\nUSA",
"MailingCity" : null,
"MailingState" : null,
"MailingPostalCode" : null,
"MailingCountry" : null,
"MailingLatitude" : null,
"MailingLongitude" : null,
"MailingGeocodeAccuracy" : null,
"Phone" : "(512) 757-6000",
"Fax" : "(512) 757-9000",
"MobilePhone" : "(512) 757-9340",
"HomePhone" : null,
"OtherPhone" : null,
"AssistantPhone" : null,
"ReportsToId" : null,
"Email" : "rose@edge.com",
"Title" : "SVP, Procurement",
"Department" : "Procurement",
"AssistantName" : null,
"LeadSource" : "Trade Show",
"Birthdate" : "1967-07-14",
"Description" : null,
"OwnerId" : "0052v00000ajtG3AAI",
"CreatedDate" : 1564636138000,
"CreatedById" : "0052v00000ajtG3AAI",
"LastModifiedDate" : 1564636138000,
"LastModifiedById" : "0052v00000ajtG3AAI",
"SystemModstamp" : 1564636138000,
"LastActivityDate" : null,
"LastCURequestDate" : null,
"LastCUUpdateDate" : null,
"LastViewedDate" : 1573528066000,
"LastReferencedDate" : 1573528066000,
"EmailBouncedReason" : null,
"EmailBouncedDate" : null,
"IsEmailBounced" : false,
"PhotoUrl" : "/services/images/photo/0032v00002qXTBlAAO",
"Jigsaw" : null,
"JigsawContactId" : null,
"CleanStatus" : "Pending",
"IndividualId" : null,
"Level__c" : "Primary",
"Languages__c" : "English"
}
]
Kafka レコード値 :
{
"Id":"0032v00002qXTBlAAO",
"IsDeleted":{
"boolean":false
},
"MasterRecordId":null,
"AccountId":{
"string":"0012v00002RkgUVAAZ"
},
"LastName":{
"string":"Gonzalez"
},
"FirstName":{
"string":"Rose"
},
"Salutation":{
"string":"Ms."
},
"Name":{
"string":"Rose Gonzalez"
},
"OtherStreet":null,
"OtherCity":null,
"OtherState":null,
"OtherPostalCode":null,
"OtherCountry":null,
"OtherLatitude":null,
"OtherLongitude":null,
"OtherGeocodeAccuracy":null,
"MailingStreet":{
"string":"313 Constitution Place\nAustin, TX 78767\nUSA"
},
"MailingCity":null,
"MailingState":null,
"MailingPostalCode":null,
"MailingCountry":null,
"MailingLatitude":null,
"MailingLongitude":null,
"MailingGeocodeAccuracy":null,
"Phone":{
"string":"(512) 757-6000"
},
"Fax":{
"string":"(512) 757-9000"
},
"MobilePhone":{
"string":"(512) 757-9340"
},
"HomePhone":null,
"OtherPhone":null,
"AssistantPhone":null,
"ReportsToId":null,
"Email":{
"string":"rose@edge.com"
},
"Title":{
"string":"SVP, Procurement"
},
"Department":{
"string":"Procurement"
},
"AssistantName":null,
"LeadSource":{
"string":"Trade Show"
},
"Birthdate":{
"int":-903
},
"Description":null,
"OwnerId":{
"string":"0052v00000ajtG3AAI"
},
"CreatedDate":{
"long":1564636138000
},
"CreatedById":{
"string":"0052v00000ajtG3AAI"
},
"LastModifiedDate":{
"long":1564636138000
},
"LastModifiedById":{
"string":"0052v00000ajtG3AAI"
},
"SystemModstamp":{
"long":1564636138000
},
"LastActivityDate":null,
"LastCURequestDate":null,
"LastCUUpdateDate":null,
"LastViewedDate":{
"long":1573722558000
},
"LastReferencedDate":{
"long":1573722558000
},
"EmailBouncedReason":null,
"EmailBouncedDate":null,
"IsEmailBounced":{
"boolean":false
},
"PhotoUrl":{
"string":"/services/images/photo/0032v00002qXTBlAAO"
},
"Jigsaw":null,
"JigsawContactId":null,
"CleanStatus":{
"string":"Pending"
},
"IndividualId":null,
"Level__c":{
"string":"Primary"
},
"Languages__c":{
"string":"English"
}
}