Marketo Source Connector for Confluent Platform
Marketo is a marketing automation platform used predominantly by marketers to manage marketing campaigns and flows to potential customers and prospects.
The Kafka Connect Marketo Source connector copies data into
Apache Kafka® from various Marketo entities such as
staticLists, and all activities entities like
, using the Marketo REST API.
You can find the list of supported Marketo entities in Supported Entities.
Initialize configuration parameter
tasks.max to the number of entities being configured in the
entities.names property of connector, treating all activity types as single entity.
The Marketo Source Connector offers the following features:
- Quick Turnaround: The Marketo connector ensures that data between your Marketo entities and corresponding Kafka topics are synced quickly, with minimal lag. The poll frequency on each entity has been specifically pre-configured within the connector, based on the size of the table, adhering to the daily quota, rate and concurrency limits as prescribed by Marketo. Larger and more dynamic tables like
leads are pulled with an appropriate delay of 5 minutes between consecutive pulls. Mostly static assets, which have infrequent updates, like
staticLists are pulled with a delay of 30 seconds betweeen consecutive pulls.
- At Least Once Delivery: The connector guarantees no loss of messages from Marketo to Kafka. Messages may be reprocessed because of task failures, which may cause duplication.
- Schema Detection and Evolution: The connector supports automatic schema detection and backward compatible schema evolution for asset entities such as
staticLists. For bulk export entities like
activities, records are produced with csv schema directly from column headers provided by the export files.
- Automatic Discovery of Bulk Export Entities: For
leads, the fields present in leads entities are automatically discovered by the connector, and the user does not have to provide them explicitly as part of connector configurations. For
activities entities, connector discovers the activity type IDs of the activity types provided by the user. Here, activity types are human-readable activity entity names, while activity type IDs are the Marketo system-assigned numbers to different activity types.
- Real-time and Historical Lookup: The connector supports fetching all the past historical records for all the listed entities. This past lookup of data can be achieved by specifying
marketo.since configuration parameter (see configuration property
- Automatic Retries: In case of a connection error between the API server and Kafka Connect, the connector may receive a not OK response from the API server or no response at all. In such cases, the connector can be made robust using the automatic retry mechanism with linear backoff using configuration properties
- Error Handling: Marketo has a rich and elaborate list of error codes. Connector handles all the retriable errors, especially the rate-limiting, authentication, quota-limit exceeded errors appropriately. For quota-limit exceeded errors, connector automatically schedules subsequent pull requests at the daily quota reset time (12:00 AM CST). Further, for rate-limit errors, connector employs exponential backoff with appropriate jitter to smoothly spread out the requests. Connector takes care of maintaining all the calls to Marketo server instance within prescribed limits without having the user to deal with these explicitly. Further, the connector places calls to Marketo server within the bulk extract limits for bulk export entities (
activities). In case of non-retriable errors, connector task fails, seeking manual intervention.
- Throughput Tuning: The
max.poll.interval.ms configuration properties can be used to fine tune and improve overall throughput.
- Downloaded Streaming of Bulk Export Entities: For
activities, export files are first downloaded by the connector, and then processed at a convenient pace (depending upon configured
max.poll.interval.ms configuration parameters). This helps in decoupling the download part from the creation of kafka records. HTTP connection between the connector and marketo server instance will be returned to connection pool immediately after download happens, instead of holding it until all the records are streamed to Kafka. Further, the export files are downloaded to temporary files, and connector cleans them up immediately after processing all the records.
- The connector currently doesn’t support multi-part download of export files. Future releases will contain this feature. Currently, the connector fetches entire export file at one go.
- As the size of export files can potentially be significantly high, especially when connector is configured to pull historical data, users are advised to provide sufficient heap-memory size to the connector deployment. In our performance tests, connector achieved smooth performance for 670MB
leads and 240MB
activities export files in a single run, having
-Xmx8g maximum heap size. Peak heap utilization reached 5GB. This usage will be reduced with introduction of multi-part file download in connector.
- Connector downloads the export files to disk of the worker node in temporary files. So it is advised to have disk space on connector task worker nodes, as needed, depending upon the size of historical data.
- Each entity will be worked upon by a single task, due to the CDC (change data capture) nature of the data. So, the performance in this connector would not scale with number of tasks configured for it.
The following entities from Marketo are supported in this version of Kafka Connect Marketo Source connector:
leads and all types of
Activity types are listed in the Activity Type Filters list. For example, in order to fetch activities of type Add to Opportunity, just include the entity name activities_add_to_opportunity in the configuration
entity.names, and the connector will do the rest.
The following are required to run the Kafka Connect Marketo Source Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Kafka Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above
- Java 1.8
- Marketo API-Only User: An API-Only user should be created, which will be used by this connector to access REST APIs. Process of creating an API-Only user, and best practices to follow are listed in Marketo Custom Services. Provide just READ-ONLY permissions on various entities, as needed, to the created API-Only user. Marketo source connector doesn’t need any write permissions. Exact permissions needed for the entities supported by this connector can be found in the Marketo Endpoint Permissions Reference.
- Marketo Corona Support: For bulk export entities
activities to be fetched by this connector, a feature called Corona needs to be enabled to your Marketo subscription. This feature enables incremental fetch of bulk export entities. Marketo team should be able to enable Corona support to your account.
Install the Marketo Source Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (
latest) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-marketo:latest
You can install a specific version by replacing
latest with a version number. For example:
confluent-hub install confluentinc/kafka-connect-marketo:1.0.0-preview
In this quick start guide, the Marketo Source Connector is used to consume records from Marketo entities
activities entities of types (
activities_add_to_nurture,activities_add_to_opportunity) and send the records to respective Kafka topics named
Install the connector through the Confluent Hub Client.
# run from your confluent platform installation directory
confluent-hub install confluentinc/kafka-connect-marketo:latest
Start the Confluent Platform.
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to
confluent local. For example, the syntax for
confluent start is now
confluent local services start. For more information, see confluent local.
confluent local services start
Check the status of all services.
confluent local services connect connector status
Configure your connector by first creating a JSON file named
marketo-configs.json with the following properties. Find the REST API Endpoint url from the process described in Marketo REST API Quickstart. This endpoint url will be used in
marketo.url configuration key (shown below) of the connector, but do note to remove the path rest from the endpoint url before using it in connector configurations. Refer same link to see the process of determining oauth client id and oauth client secret.
tasks.max should be 3 here, as there are three entity types, i.e.
// substitute <> with your config
"confluent.license": "<license>", // leave it empty for evaluation license
Start the Marketo Source connector by loading the connector’s configuration with the following command:
You must include a double dash (
--) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load marketo-connector -- -d marketo-configs.json
Confirm that the connector is in a
confluent local services connect connector status marketo-connector
campaigns records using Marketo APIs. Use POST or Bulk Import APIs of appropriate entities to inject some sample records.
Confirm the messages from entities
campaigns were delivered to the
marketo_campaigns topics respectively, in Kafka. Note, it may take about a minute for assets (
campaigns) and about 5 minutes or more (depending upon the time Marketo server instance takes to prepare the export file) for export entities (
confluent local services kafka consume marketo_leads -- --from-beginning