Marketo Source Connector for Confluent Platform

Caution

Preview connectors aren’t currently supported, nor are they recommended for production use.

Marketo is a marketing automation platform used predominantly by marketers to manage marketing campaigns and flows to potential customers and prospects. The Kafka Connect Marketo Source connector copies data into Apache Kafka® from various Marketo entities such as leads, programs, campaigns, staticLists, and all activities entities like activities_add_to_nurture, activities_add_to_opportunity, and more, using the Marketo REST API. You can find the list of supported Marketo entities in Supported Entities. Initialize the tasks.max configuration parameter to the number of entities being configured in the entity.names property of the connector, treating all activity types as a single entity.

Features

The Marketo Source connector includes the following features:

At least once delivery

The connector guarantees no loss of messages from Marketo to Kafka. Messages may be reprocessed because of task failures, which may cause duplication.

Supports one task

The Marketo Source connector supports running only one task–one Marketo entity is covered by one task.

Quick turnaround

The Marketo Source connector ensures that data between your Marketo entities and corresponding Kafka topics are synced quickly, with minimal lag. The poll frequency on each entity has been specifically pre-configured within the connector, based on the size of the table, adhering to the daily quota, rate and concurrency limits as prescribed by Marketo. Larger and more dynamic tables like activities and leads are pulled with an appropriate delay of 5 minutes between consecutive pulls. Mostly static assets, which have infrequent updates, like campaigns, programs and staticLists are pulled with a delay of 30 seconds between consecutive pulls.

Schema detection and evolution

The connector supports automatic schema detection and backward compatible schema evolution for asset entities such as campaigns, programs and staticLists. For bulk export entities like leads and activities, records are produced with CSV schema directly from column headers provided by the export files.

Automatic discovery of bulk export entities

For leads, the fields present in leads entities are automatically discovered by the connector, and the user does not have to provide them explicitly as part of connector configurations. For activities entities, connector discovers the activity type IDs of the activity types provided by the user. Here, activity types are human-readable activity entity names, while activity type IDs are the Marketo system-assigned numbers to different activity types.

Real-time and historical lookup

The connector supports fetching all the past historical records for all the listed entities. This past lookup of data can be achieved by specifying marketo.since configuration parameter (see configuration property marketo.since).

Automatic retries

In case of a connection error between the API server and Kafka Connect, the connector may receive a not OK response from the API server or no response at all. In such cases, the connector can be made robust using the automatic retry mechanism with linear backoff using configuration properties max.retries and retry.backoff.ms.

Error handling

Marketo has a rich and elaborate list of error codes. Connector handles all the retriable errors, especially the rate-limiting, authentication, quota-limit exceeded errors appropriately. For quota-limit exceeded errors, connector automatically schedules subsequent pull requests at the daily quota reset time (12:00 AM CST). Further, for rate-limit errors, connector employs exponential backoff with appropriate jitter to smoothly spread out the requests. Connector takes care of maintaining all the calls to the Marketo server instance within prescribed limits without having the user to deal with these explicitly. Further, the connector places calls to the Marketo server within the bulk extract limits for bulk export entities (leads and activities). In case of non-retriable errors, connector task fails, seeking manual intervention.

Throughput tuning

You can fine tune and improve overall throughput using the max.batch.size and max.poll.interval.ms configuration properties.

Downloaded streaming of bulk export entities

For leads and activities, export files are first downloaded by the connector, and then processed at a convenient pace (depending upon configured max.batch.size and max.poll.interval.ms configuration parameters). This helps in decoupling the download part from the creation of Kafka records. The HTTP connection between the connector and Marketo server instance will be returned to connection pool immediately after download happens, instead of holding it until all the records are streamed to Kafka. Further, the export files are downloaded to temporary files, and connector cleans them up immediately after processing all the records.

Limitations

  • The connector currently doesn’t support multi-part download of export files. Future releases will contain this feature. Currently, the connector fetches entire export file at one go.
  • As the size of export files can potentially be significantly high, especially when connector is configured to pull historical data, users are advised to provide sufficient heap-memory size to the connector deployment. In our performance tests, connector achieved smooth performance for 670MB leads and 240MB activities export files in a single run, having -Xmx8g maximum heap size. Peak heap utilization reached 5GB. This usage will be reduced with introduction of multi-part file download in connector.
  • Connector downloads the export files to disk of the worker node in temporary files. So it is advised to have disk space on connector task worker nodes, as needed, depending upon the size of historical data.
  • Each entity will be worked upon by a single task, due to the CDC (change data capture) nature of the data. So, the performance in this connector would not scale with number of tasks configured for it.

Supported Entities

The following entities from Marketo are supported in this version of Kafka Connect Marketo Source connector: campaigns, programs, staticLists, leads and all types of activities. Activity types are listed in the Activity Type Filters list. For example, in order to fetch activities of type Add to Opportunity, just include the entity name activities_add_to_opportunity in the configuration entity.names, and the connector will do the rest.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for this connector, see Marketo Source Connector Configuration Properties.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Marketo Source Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Kafka Connect: Confluent Platform 4.1.0 or later, or Kafka 1.1.0 or later.

  • Java 1.8.

  • An installation of the Confluent Hub Client.This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-marketo:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-marketo:1.0.0-preview
    
  • Marketo API-Only User: You must create an API-Only user that will be used by the connector to access REST APIs. You can find the process to create an API-Only user, along with best practices to follow in the Marketo Custom Services documentation. You can provide read-only permissions on various entities, as needed, for the API-Only user you create. The Marketo Source connector doesn’t need any write permissions. For the permissions needed for the entities supported by this connector, see Marketo Endpoint Permissions Reference.

  • Marketo Corona Support: You must enable the Marketo Corona feature in your Marketo subscription for bulk export entities leads and activities to be fetched by the Marketo connector. This feature enables incremental fetch of bulk export entities. The Marketo team should be able to enable Corona support for your account.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

In this quick start guide, the Marketo Source connector is used to consume records from Marketo entities leads, campaigns, activities entities of types (activities_add_to_nurture,activities_add_to_opportunity) and send the records to respective Kafka topics named marketo_leads, marketo_campaigns and marketo_activities.

  1. Install the connector through the Confluent Hub Client.

    # run from your confluent platform installation directory
    confluent connect plugin install confluentinc/kafka-connect-marketo:latest
    
  2. Start the Confluent Platform.

    confluent local services start
    
  3. Check the status of all services.

    confluent local services connect connector status
    
  4. Configure your connector by first creating a JSON file named marketo-configs.json with the following properties. Find the REST API endpoint URL from the process described in Marketo REST API Quickstart. This endpoint URL will be used in the marketo.url configuration key (as shown in the following example) of the connector, but ensure you remove the path rest from the endpoint URL before using it in connector configurations. To see the process of determining the OAuth client ID and OAuth client secret, see Marketo REST API Quickstart. tasks.max should be 3 here as there are three entity types: leads, campaigns and activities.

    // substitute <> with your config
    {
        "name": "marketo-connector",
        "config": {
            "connector.class": "io.confluent.connect.marketo.MarketoSourceConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "confluent.topic.bootstrap.servers": "127.0.0.1:9092",
            "confluent.topic.replication.factor": 1,
            "confluent.license": "<license>", // leave it empty for evaluation license
            "tasks.max": 3,
            "poll.interval.ms": 1000,
            "topic.name.pattern": "marketo_${entityName}",
            "marketo.url": "https://<instance-id>.mktorest.com/",
            "marketo.since": "2020-07-01T00:00:00+00:00",
            "entity.names": "activities_add_to_nurture,activities_add_to_opportunity,campaigns,leads",
            "oauth2.client.id": "<client_id>",
            "oauth2.client.secret": "<client_secret>"
        }
    }
    
  5. Start the Marketo Source connector by loading the connector’s configuration with the following command:

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load marketo-connector -- -d marketo-configs.json
    
  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status marketo-connector
    
  7. Create some leads, activities and campaigns records using Marketo APIs. Use POST or Bulk Import APIs of appropriate entities to inject some sample records.

  8. Confirm the messages from entities leads, activities, and campaigns were delivered to the marketo_leads, marketo_activities and marketo_campaigns topics respectively, in Kafka. Note, it may take about a minute for assets (campaigns) and about 5 minutes or more (depending upon the time Marketo server instance takes to prepare the export file) for export entities (leads and activities).

    confluent local services kafka consume marketo_leads -- --from-beginning