Marketo Source Connector for Confluent Platform¶
Caution
Preview connectors aren’t currently supported, nor are they recommended for production use.
Marketo is a marketing automation platform used predominantly by marketers to
manage marketing campaigns and flows to potential customers and prospects. The
Kafka Connect Marketo Source connector copies data into Apache Kafka® from various
Marketo entities such as leads
, programs
, campaigns
,
staticLists
, and all activities entities like activities_add_to_nurture
,
activities_add_to_opportunity
, and more, using the Marketo REST API. You can find the list of
supported Marketo entities in Supported Entities.
Initialize the tasks.max
configuration parameter to the number of entities being
configured in the entity.names
property of the connector, treating all activity
types as a single entity.
Features¶
The Marketo Source connector includes the following features:
- At least once delivery
- Supports one task
- Quick turnaround
- Schema detection and evolution
- Automatic discovery of bulk export entities
- Real-time and historical lookup
- Automatic retries
- Error handling
- Throughput tuning
- Downloaded streaming of bulk export entities
At least once delivery¶
The connector guarantees no loss of messages from Marketo to Kafka. Messages may be reprocessed because of task failures, which may cause duplication.
Supports one task¶
The Marketo Source connector supports running only one task–one Marketo entity is covered by one task.
Quick turnaround¶
The Marketo Source connector ensures that data between your Marketo entities and
corresponding Kafka topics are synced quickly, with minimal lag. The poll
frequency on each entity has been specifically pre-configured within the
connector, based on the size of the table, adhering to the daily quota, rate and
concurrency limits as prescribed by Marketo.
Larger and more dynamic tables like activities
and leads
are pulled with
an appropriate delay of 5 minutes between consecutive pulls. Mostly static
assets, which have infrequent updates, like campaigns
, programs
and
staticLists
are pulled with a delay of 30 seconds between consecutive
pulls.
Schema detection and evolution¶
The connector supports automatic schema detection and backward compatible schema
evolution for asset entities such as campaigns
, programs
and
staticLists
. For bulk export entities like leads
and activities
,
records are produced with CSV schema directly from column headers provided by
the export files.
Automatic discovery of bulk export entities¶
For leads
, the fields present in leads entities are automatically discovered
by the connector, and the user does not have to provide them explicitly as part
of connector configurations. For activities
entities, connector discovers
the activity type IDs of the activity types provided by the user. Here, activity
types are human-readable activity entity names, while activity type IDs are the
Marketo system-assigned numbers to different activity types.
Real-time and historical lookup¶
The connector supports fetching all the past historical records for all the
listed entities. This past lookup of data can be achieved by specifying
marketo.since
configuration parameter (see configuration property marketo.since
).
Automatic retries¶
In case of a connection error between the API server and Kafka Connect, the
connector may receive a not OK response from the API server or no response at
all. In such cases, the connector can be made robust using the automatic retry
mechanism with linear backoff using configuration properties max.retries
and retry.backoff.ms
.
Error handling¶
Marketo has a rich and elaborate list of error codes. Connector handles all
the retriable errors, especially the rate-limiting, authentication, quota-limit
exceeded errors appropriately. For quota-limit exceeded errors, connector
automatically schedules subsequent pull requests at the daily quota reset time
(12:00 AM CST). Further, for rate-limit errors, connector employs exponential
backoff with appropriate jitter to smoothly spread out the requests. Connector
takes care of maintaining all the calls to the Marketo server instance within
prescribed limits
without having the user to deal with these explicitly. Further, the connector
places calls to the Marketo server within the bulk extract limits for bulk
export entities (leads
and activities
). In case of non-retriable errors,
connector task fails, seeking manual intervention.
Throughput tuning¶
You can fine tune and improve overall throughput using the max.batch.size
and max.poll.interval.ms
configuration properties.
Downloaded streaming of bulk export entities¶
For leads
and activities
, export files are first downloaded by the
connector, and then processed at a convenient pace (depending upon configured
max.batch.size
and max.poll.interval.ms
configuration parameters). This
helps in decoupling the download part from the creation of Kafka records. The HTTP
connection between the connector and Marketo server instance will be returned to
connection pool immediately after download happens, instead of holding it until
all the records are streamed to Kafka. Further, the export files are downloaded
to temporary files, and connector cleans them up immediately after processing
all the records.
Limitations¶
- The connector currently doesn’t support multi-part download of export files. Future releases will contain this feature. Currently, the connector fetches entire export file at one go.
- As the size of export files can potentially be significantly high, especially when connector is configured to pull historical data, users are advised to provide sufficient heap-memory size to the connector deployment. In our performance tests, connector achieved smooth performance for 670MB
leads
and 240MBactivities
export files in a single run, having-Xmx8g
maximum heap size. Peak heap utilization reached 5GB. This usage will be reduced with introduction of multi-part file download in connector. - Connector downloads the export files to disk of the worker node in temporary files. So it is advised to have disk space on connector task worker nodes, as needed, depending upon the size of historical data.
- Each entity will be worked upon by a single task, due to the CDC (change data capture) nature of the data. So, the performance in this connector would not scale with number of tasks configured for it.
Supported Entities¶
The following entities from Marketo are supported in this version of Kafka Connect Marketo Source connector: campaigns
, programs
, staticLists
, leads
and all types of activities
.
Activity types are listed in the Activity Type Filters list. For example, in order to fetch activities of type Add to Opportunity, just include the entity name activities_add_to_opportunity in the configuration entity.names
, and the connector will do the rest.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Marketo Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Marketo Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Kafka Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later.
Java 1.8.
An installation of the Confluent Hub Client.This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-marketo:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-marketo:1.0.0-preview
Marketo API-Only User: You must create an API-Only user that will be used by the connector to access REST APIs. You can find the process to create an API-Only user, along with best practices to follow in the Marketo Custom Services documentation. You can provide read-only permissions on various entities, as needed, for the API-Only user you create. The Marketo Source connector doesn’t need any write permissions. For the permissions needed for the entities supported by this connector, see Marketo Endpoint Permissions Reference.
Marketo Corona Support: You must enable the Marketo Corona feature in your Marketo subscription for bulk export entities
leads
andactivities
to be fetched by the Marketo connector. This feature enables incremental fetch of bulk export entities. The Marketo team should be able to enable Corona support for your account.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
In this quick start guide, the Marketo Source connector is used to consume records from Marketo entities leads
, campaigns
, activities
entities of types (activities_add_to_nurture,activities_add_to_opportunity
) and send the records to respective Kafka topics named marketo_leads
, marketo_campaigns
and marketo_activities
.
Install the connector through the Confluent Hub Client.
# run from your confluent platform installation directory confluent connect plugin install confluentinc/kafka-connect-marketo:latest
Start the Confluent Platform.
confluent local services start
Check the status of all services.
confluent local services connect connector status
Configure your connector by first creating a JSON file named
marketo-configs.json
with the following properties. Find the REST API endpoint URL from the process described in Marketo REST API Quickstart. This endpoint URL will be used in themarketo.url
configuration key (as shown in the following example) of the connector, but ensure you remove the pathrest
from the endpoint URL before using it in connector configurations. To see the process of determining the OAuth client ID and OAuth client secret, see Marketo REST API Quickstart.tasks.max
should be 3 here as there are three entity types:leads
,campaigns
andactivities
.// substitute <> with your config { "name": "marketo-connector", "config": { "connector.class": "io.confluent.connect.marketo.MarketoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "confluent.topic.bootstrap.servers": "127.0.0.1:9092", "confluent.topic.replication.factor": 1, "confluent.license": "<license>", // leave it empty for evaluation license "tasks.max": 3, "poll.interval.ms": 1000, "topic.name.pattern": "marketo_${entityName}", "marketo.url": "https://<instance-id>.mktorest.com/", "marketo.since": "2020-07-01T00:00:00+00:00", "entity.names": "activities_add_to_nurture,activities_add_to_opportunity,campaigns,leads", "oauth2.client.id": "<client_id>", "oauth2.client.secret": "<client_secret>" } }
Start the Marketo Source connector by loading the connector’s configuration with the following command:
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load marketo-connector -- -d marketo-configs.json
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status marketo-connector
Create some
leads
,activities
andcampaigns
records using Marketo APIs. Use POST or Bulk Import APIs of appropriate entities to inject some sample records.Confirm the messages from entities
leads
,activities
, andcampaigns
were delivered to themarketo_leads
,marketo_activities
andmarketo_campaigns
topics respectively, in Kafka. Note, it may take about a minute for assets (campaigns
) and about 5 minutes or more (depending upon the time Marketo server instance takes to prepare the export file) for export entities (leads
andactivities
).confluent local services kafka consume marketo_leads -- --from-beginning