Camus is a simple MapReduce job developed by LinkedIn to load data from Kafka into HDFS. It is capable of incrementally copying data from Kafka into HDFS such that every run of the MapReduce job picks up where the previous run left off. At LinkedIn, Camus is used to load billions of messages per day from Kafka into HDFS. Confluent’s version of Camus integrates with Confluent’s Schema Registry which ensures data compatibility when loading to HDFS as schemas are evolved. You can find the design and architecture of Camus in the design section .
- Automatic topic discovery: When a Camus job starts, it automatically fetches available topics from Zookeeper and offsets from Kafka and filters topics.
- Avro schema management: Camus integrates with Confluent’s Schema Registry to ensure compatibility as Avro schema evolves.
- Output partitioning: Camus automatically partitions the output based on the timestamp of each record.
- Fault tolerance: Camus saves previous Kafka ETL requests and topic partition offsets to HDFS to provide fault tolerance on Zookeeper and Kafka failures. It also uses temp work directory to ensure consistency between Kafka and HDFS.
- Customizability: Many components of Camus are customizable. Camus provides interfaces for customized implementations of message decoder, data writer, data partitioner and work allocator.
- Load balance: Camus evenly assigns data to MapReduce tasks based on the size of each topic partitions. Moreover, as Camus jobs use temp working directories, speculative execution can be effective for straggler migration.
- Low operation overhead: Camus offers configurations to balance contention between topics and to control the Camus job behavior in case of incompatible data. By default, Camus will not fail the MapReduce job in case of incompatible data.
These instructions assume you have already installed Confluent Platform and that you have access to a Hadoop Cluster. For installation and deployment of a single node Hadoop in pseudo-distributed mode, see this guide. Finally, you should have Kafka and the Schema Registry running .
The recommended way to run a Camus job is via a small wrapper script,
sets the environment variables and passes the arguments required to get all the jars deployed
correctly and ensures the Camus jars are given priority, which ensures compatibility across a
variety of Hadoop distributions.
# Assuming that you have hadoop on your PATH, HADOOP_CONF_DIR is properly configured, and that # schema.registry.url points to the correct address. $ bin/camus-run -D schema.registry.url=http://localhost:8081 -P etc/camus/camus.properties
If you need more control over how the job is executed, see the Deployment section for more details about required configuration.
Once the Camus job is successfully completed, a couple of Avro files are created under
the topic output directory in sub-directories for each topic and date partition.
One example of full filename is
The filename is
. separated format that embeds metadata as
You may use Hive or other tools to perform offline analysis on the ingested Avro files.
See the installation instructions for the Confluent Platform. Before starting a Camus job you must have Hadoop, Kafka, and the Schema Registry running. The Confluent Platform quickstart explains how to start Kafka and the Schema Registry locally for testing. See this guide to setup a single Hadoop node in pseudo-distributed mode.
Camus can be run from the command line. You will need to set some configurations either by specifying a
properties file on the classpath using
-p (filename), or an external properties file using
(path to local file system, or to hdfs),
or from the command line using
If the same property is set with multiple methods,
the order of precedence is command-line properties, external properties file and
classpath properties file. You can find a list of settings in configuration section .
The recommended deployment method is to use the
camus-run script to initiate the MapReduce job:
$ bin/camus-run -D <property=value> \ -P <path to external properties file> \ -p <path to properties file from classpath>
If you need more control you can run the job yourself, but will have to configure some parameters
and environment variables yourself. You may want to reuse the
bin/camus-config script to generate the
configs without running the job. That script should be sourced into your script
and sets up four environment variables:
HADOOP_CLASSPATHis updated to include the jars for Camus and its dependencies
CAMUS_LIBJARScontains a comma separated list of the jars for Camus and its dependencies, suitable for use as the value of the
CAMUS_JARis set to the primary Camus jar, which is the jar file you should pass as the first argument to
HADOOP_USER_CLASSPATH_FIRSTis set to true to ensure versions of libraries included with the Hadoop distribution do not conflict with versions required by Camus
If you do not use the
camus-config script, you will need to configure these
settings manually. If you install Camus via zip/tgz archive,
you can find Camus’s jar files under
share/java/camus/. If you install Camus via rpm or deb,
the Camus’s jar files under
/usr/share/java/camus/. After configuring the
appropriate settings, as listed here, you can run the job with a command like this:
# Assuming: # 1. hadoop is on your PATH # 2. HADOOP_CLASSPATH includes all the Camus jars # 3. HADOOP_USER_CLASSPATH_FIRST=true # 4. CAMUS_LIBJARS contains the comma-separated list of all Camus jars $ cd camus $ hadoop jar confluent-camus-$VERSION.jar com.linkedin.camus.etl.kafka.CamusJob \ -libjars $CAMUS_LIBJARS -D mapreduce.job.user.classpath.first=true \ -D <property=value> \ -P <path to external properties file> \ -p <path to properties file from classpath>
For some Hadoop distributions, you may be able to remove some of these settings to simplify the
command. Specifically, the
variable and the
mapreduce.job.user.classpath.first setting are
only required when jars included on the classpath by the Hadoop distribution are
too old to satisfy Camus’s requirements.
To build a development version of Camus, you need to get development version of the Schema Registry and its dependencies and install it into local Maven repository. Once the dependencies are installed, you can build Confluent version of Camus as follows:
$ git clone https://github.com/confluentinc/camus.git $ cd camus $ mvn clean package
- Hadoop: Camus works with both MRv1 and YARN. We recommend CDH 5.3.x or HDP 2.2.x.
- Kafka: 0.9.0.1 (0.9.0.1-cp1 recommended)
- Schema Registry: Confluent Schema Registry 2.0.1
The project is licensed under the Apache 2 license.