Marketo Source Connector for Confluent Platform

Marketo is a marketing automation platform used predominantly by marketers to manage marketing campaigns and flows to potential customers and prospects. The Kafka Connect Marketo Source connector copies data into Apache Kafka® from various Marketo entities such as leads, programs, campaigns, staticLists, and all activities entities like activities_add_to_nurture, activities_add_to_opportunity, etc. , using the Marketo REST API. You can find the list of supported Marketo entities in Supported Entities. Initialize configuration parameter tasks.max to the number of entities being configured in the entities.names property of connector, treating all activity types as single entity.

Features

The Marketo Source Connector offers the following features:

  • Quick Turnaround: The Marketo connector ensures that data between your Marketo entities and corresponding Kafka topics are synced quickly, with minimal lag. The poll frequency on each entity has been specifically pre-configured within the connector, based on the size of the table, adhering to the daily quota, rate and concurrency limits as prescribed by Marketo. Larger and more dynamic tables like activities and leads are pulled with an appropriate delay of 5 minutes between consecutive pulls. Mostly static assets, which have infrequent updates, like campaigns, programs and staticLists are pulled with a delay of 30 seconds betweeen consecutive pulls.
  • At Least Once Delivery: The connector guarantees no loss of messages from Marketo to Kafka. Messages may be reprocessed because of task failures, which may cause duplication.
  • Schema Detection and Evolution: The connector supports automatic schema detection and backward compatible schema evolution for asset entities such as campaigns, programs and staticLists. For bulk export entities like leads and activities, records are produced with csv schema directly from column headers provided by the export files.
  • Automatic Discovery of Bulk Export Entities: For leads, the fields present in leads entities are automatically discovered by the connector, and the user does not have to provide them explicitly as part of connector configurations. For activities entities, connector discovers the activity type IDs of the activity types provided by the user. Here, activity types are human-readable activity entity names, while activity type IDs are the Marketo system-assigned numbers to different activity types.
  • Real-time and Historical Lookup: The connector supports fetching all the past historical records for all the listed entities. This past lookup of data can be achieved by specifying marketo.since configuration parameter (see configuration property marketo.since).
  • Automatic Retries: In case of a connection error between the API server and Kafka Connect, the connector may receive a not OK response from the API server or no response at all. In such cases, the connector can be made robust using the automatic retry mechanism with linear backoff using configuration properties max.retries and retry.backoff.ms.
  • Error Handling: Marketo has a rich and elaborate list of error codes. Connector handles all the retriable errors, especially the rate-limiting, authentication, quota-limit exceeded errors appropriately. For quota-limit exceeded errors, connector automatically schedules subsequent pull requests at the daily quota reset time (12:00 AM CST). Further, for rate-limit errors, connector employs exponential backoff with appropriate jitter to smoothly spread out the requests. Connector takes care of maintaining all the calls to Marketo server instance within prescribed limits without having the user to deal with these explicitly. Further, the connector places calls to Marketo server within the bulk extract limits for bulk export entities (leads and activities). In case of non-retriable errors, connector task fails, seeking manual intervention.
  • Throughput Tuning: The max.batch.size, max.poll.interval.ms configuration properties can be used to fine tune and improve overall throughput.
  • Downloaded Streaming of Bulk Export Entities: For leads and activities, export files are first downloaded by the connector, and then processed at a convenient pace (depending upon configured max.batch.size and max.poll.interval.ms configuration parameters). This helps in decoupling the download part from the creation of kafka records. HTTP connection between the connector and marketo server instance will be returned to connection pool immediately after download happens, instead of holding it until all the records are streamed to Kafka. Further, the export files are downloaded to temporary files, and connector cleans them up immediately after processing all the records.

Limitations

  • The connector currently doesn’t support multi-part download of export files. Future releases will contain this feature. Currently, the connector fetches entire export file at one go.
  • As the size of export files can potentially be significantly high, especially when connector is configured to pull historical data, users are advised to provide sufficient heap-memory size to the connector deployment. In our performance tests, connector achieved smooth performance for 670MB leads and 240MB activities export files in a single run, having -Xmx8g maximum heap size. Peak heap utilization reached 5GB. This usage will be reduced with introduction of multi-part file download in connector.
  • Connector downloads the export files to disk of the worker node in temporary files. So it is advised to have disk space on connector task worker nodes, as needed, depending upon the size of historical data.
  • Each entity will be worked upon by a single task, due to the CDC (change data capture) nature of the data. So, the performance in this connector would not scale with number of tasks configured for it.

Supported Entities

The following entities from Marketo are supported in this version of Kafka Connect Marketo Source connector: campaigns, programs, staticLists, leads and all types of activities. Activity types are listed in the Activity Type Filters list. For example, in order to fetch activities of type Add to Opportunity, just include the entity name activities_add_to_opportunity in the configuration entity.names, and the connector will do the rest.

Prerequisites

The following are required to run the Kafka Connect Marketo Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Kafka Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above
  • Java 1.8
  • Marketo API-Only User: An API-Only user should be created, which will be used by this connector to access REST APIs. Process of creating an API-Only user, and best practices to follow are listed in Marketo Custom Services. Provide just READ-ONLY permissions on various entities, as needed, to the created API-Only user. Marketo source connector doesn’t need any write permissions. Exact permissions needed for the entities supported by this connector can be found in the Marketo Endpoint Permissions Reference.
  • Marketo Corona Support: For bulk export entities leads and activities to be fetched by this connector, a feature called Corona needs to be enabled to your Marketo subscription. This feature enables incremental fetch of bulk export entities. Marketo team should be able to enable Corona support to your account.

Install the Marketo Source Connector

You can install this connector by using the instructions or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-marketo:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-marketo:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Configuration Properties

For a complete list of configuration properties for this connector, see Marketo Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

In this quick start guide, the Marketo Source Connector is used to consume records from Marketo entities leads, campaigns, activities entities of types (activities_add_to_nurture,activities_add_to_opportunity) and send the records to respective Kafka topics named marketo_leads, marketo_campaigns and marketo_activities.

  1. Install the connector through the Confluent Hub Client.

    # run from your confluent platform installation directory
    confluent-hub install confluentinc/kafka-connect-marketo:latest
    
  2. Start the Confluent Platform.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  3. Check the status of all services.

    confluent local services connect connector status
    
  4. Configure your connector by first creating a JSON file named marketo-configs.json with the following properties. Find the REST API Endpoint url from the process described in Marketo REST API Quickstart. This endpoint url will be used in marketo.url configuration key (shown below) of the connector, but do note to remove the path rest from the endpoint url before using it in connector configurations. Refer same link to see the process of determining oauth client id and oauth client secret. tasks.max should be 3 here, as there are three entity types, i.e. leads, campaigns and activities.

    // substitute <> with your config
    {
        "name": "marketo-connector",
        "config": {
            "connector.class": "io.confluent.connect.marketo.MarketoSourceConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "confluent.topic.bootstrap.servers": "127.0.0.1:9092",
            "confluent.topic.replication.factor": 1,
            "confluent.license": "<license>", // leave it empty for evaluation license
            "tasks.max": 3,
            "poll.interval.ms": 1000,
            "topic.name.pattern": "marketo_${entityName}",
            "marketo.url": "https://<instance-id>.mktorest.com/",
            "marketo.since": "2020-07-01T00:00:00+00:00",
            "entity.names": "activities_add_to_nurture,activities_add_to_opportunity,campaigns,leads",
            "oauth2.client.id": "<client_id>",
            "oauth2.client.secret": "<client_secret>"
        }
    }
    
  5. Start the Marketo Source connector by loading the connector’s configuration with the following command:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load marketo-connector -- -d marketo-configs.json
    
  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status marketo-connector
    
  7. Create some leads, activities and campaigns records using Marketo APIs. Use POST or Bulk Import APIs of appropriate entities to inject some sample records.

  8. Confirm the messages from entities leads, activities, and campaigns were delivered to the marketo_leads, marketo_activities and marketo_campaigns topics respectively, in Kafka. Note, it may take about a minute for assets (campaigns) and about 5 minutes or more (depending upon the time Marketo server instance takes to prepare the export file) for export entities (leads and activities).

    confluent local services kafka consume marketo_leads -- --from-beginning