Log Compaction

Log compaction ensures that Apache Kafka® will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance. This topic discusses these uses cases in more detail and describes how compaction works.

Previously, you should have learned about data retention where old log data is discarded after a fixed period of time or when the log reaches some predetermined size. This works well for temporal event data such as logging where each record stands alone. However an important class of data streams are the log of changes to keyed, mutable data, such as changes to a database table.

For example, say there is a topic that contains user email addresses; every time a user updates their email address this topic receives a message using the user ID as the primary key. Over a period of time, the following messages are sent for user ID 123. In this example, each message corresponds to an email address change. (messages for other IDs are omitted):

123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

Log compaction provides a granular retention mechanism so at least the last update for each primary key is retained. For the example, bill@gmail.com would be retained. This guarantees that the log contains a full snapshot of the final value for every key, not just keys that changed recently. This means downstream consumers can restore their own state off this topic requiring the retention of a complete log of all changes.

Following are some use cases where this is important:

  1. Database change subscriptions. You may have a data set in multiple data systems and often one of these systems is a database. For example you might have the cache, search cluster, and Hadoop. If you are handling the real-time updates you only need the recent log, but if you want to be able to reload the cache or restore a failed search node you may need a complete data set.
  2. Event sourcing. This application design colocates query processing with application design and uses a log of changes as the primary store for the application.
  3. Journaling for high-availability. A process that does local computation can be made fault-tolerant by logging out changes that it makes to its local state so another process can reload these changes and carry on if it should fail. A concrete example of this is handling counts, aggregations, and other “group by”-like processing in a stream processing system. Kafka Streams uses this feature for this purpose.

Important

Compacted topics must have records with keys in order to implement record retention.

Compaction in Kafka does not guarantee there is only one record with the same key at any one time. There may be multiple records with the same key, including the tombstone, because compaction timing is non-deterministic. Compaction is only done when the topic partition satisfies a few certaib conditions, such as dirty ratio, or records in inactive segment files, etc.

In each of these cases, you primarily must handle the real-time feed of changes, but occasionally, when a machine crashes or data needs to be reloaded or reprocessed, you must do a full load. Log compaction enables feeding both of these use cases off the same backing topic. This style log usage is described in more detail in the blog post, The Log by Jay Kreps.

Simply, if the system had infinite log retention, and every change was logged, the state of the system at every moment from when it started would be captured. Using this log, the system could be restored to any point in time by replaying the first N records in the log. However, this hypothetical complete log is not practical for systems that update a single record many times, as the log will grow without bound. The simple log retention mechanism that discards old updates bounds space, but the log is cannot be used to restore the current state, meaning restoring from the beginning of the log no longer recreates the current state as old updates may not be captured.

Log compaction is a mechanism to provide finer-grained per-record retention instead of coarser-grained time-based retention. Records with the same primary key are selectively removed when there is a more recent update. This way the log is guaranteed to have at least the last state for each key.

This retention policy can be set per-topic, so a single cluster can have some topics where retention is enforced by size or time and other topics where retention is enforced by compaction.

Unlike most log-structured storage systems Kafka is built for subscription and organizes data for fast linear reads and writes. Kafka acts as a source-of-truth store so it is useful even in situations where the upstream data source would not otherwise be replayable.

Compaction in action

The following image provides the logical structure of a Kafka log, at a high level, with the offset for each message.

../../_images/log_cleaner_anatomy.png

The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log.

The image shows a log with a compacted tail. However, the messages in the tail of the log retain the original offset assigned when they were first written. Also, all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the previos image, the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.

Compaction enables deletes

Compaction also enables deletes. A message with a key and a null payload will be treated as a delete from the log, and is sometimes called a tombstone. This delete marker will cause any prior or new message with that key to be removed. Delete markers are special in that they are also cleaned out of the log after a period of time to free up space. The point in time at which deletes are no longer retained is marked as the Delete Retention Point in the previous image.

View of compaction

Compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like the following:

../../_images/log_compaction.png

Compaction guarantees

Log compaction guarantees the following:

  1. Any consumer that stays caught-up to the head of the log will see every message that is written; these messages will have sequential offsets. The topic’s min.compaction.lag.ms can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. That is, it provides a lower bound on how long each message will remain in the (uncompacted) head. The topic’s max.compaction.lag.ms can be used to guarantee the maximum delay between the time a message is written and the time the message becomes eligible for compaction.
  2. Ordering of messages is always maintained. Compaction will never reorder messages, just remove some.
  3. The offset for a message never changes. It is the permanent identifier for a position in the log.
  4. Any consumer progressing from the start of the log will see at least the final state of all records in the order they were written. Additionally, all delete markers for deleted records will be seen, provided the consumer reaches the head of the log in a time period less than the topic’s delete.retention.ms setting (the default is 24 hours). In other words: since the removal of delete markers happens concurrently with reads, it is possible for a consumer to miss delete markers if it lags by more than delete.retention.ms.

Configure compaction

Log compaction is is enabled by setting the cleanup policy, which is a broker level setting. You can override this setting at the topic level.

To enable log cleaning on a topic, add the log-specific property, either at creation time or using the alter command.

log.cleanup.policy=compact

The log cleaner can be configured to retain a minimum amount of the uncompacted “head” of the log. This is enabled by setting the compaction time lag.

log.cleaner.min.compaction.lag.ms

Use the min setting to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, meaning the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag. The log cleaner can be configured to ensure a maximum delay after which the uncompacted “head” of the log becomes eligible for log compaction.

log.cleaner.max.compaction.lag.ms

Use the max setting to prevent logs with low produce rates from remaining ineligible for compaction for an unbounded duration. If not set, logs that do not exceed min.cleanable.dirty.ratio are not compacted. Note that this compaction deadline is not a hard guarantee since it is still subjected to the availability of log cleaner threads and the actual compaction time. You will want to monitor the uncleanable-partitions-count, max-clean-time-secs and max-compaction-delay-secs metrics.

Confluent Tip

Read about cleaner configurations for Confluent Platform in Broker configuration.

Learn how to set the cleanup policy and retention settings for topics in Confluent Cloud with Topic settings for all cluster types.

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.