.. title:: Manage Kafka Streams Application Topics in Confluent Platform .. meta:: :description: Learn about the differences between different kinds of topics in Kafka Streams applicaitons. .. _streams_developer-guide_topics: Manage |kstreams| Application Topics in |cp| -------------------------------------------- A |kstreams| application continuously reads from |ak-tm| topics, processes the read data, and then writes the processing results back into |ak| topics. The application may also auto-create other |ak| topics in the |ak| brokers, for example state store changelogs topics. This section describes the differences these topic types and how to manage the topics and your applications. |kstreams| distinguishes between :ref:`user topics ` and :ref:`internal topics `. .. _streams_developer-guide_topics-user: User topics ^^^^^^^^^^^ User topics exist externally to an application and are read from or written to by the application, including: Input topics Topics that are specified via source processors in the application's topology; e.g. via ``StreamsBuilder#stream()``, ``StreamsBuilder#table()`` and ``Topology#addSource()``. Output topics Topics that are specified via sink processors in the application's topology; e.g. via ``KStream#to()``, ``KTable.to()`` and ``Topology#addSink()``. Intermediate topics Topics that are both input and output topics of the application's topology. User topics must be created and manually managed ahead of time (e.g., via the :ref:`topic tools `). If user topics are shared among multiple applications for reading and writing, the application users must coordinate topic management. If user topics are centrally managed, application users would not need to manage topics themselves but simply obtain access to them. Don't use the auto-create topic feature on the brokers to create user topics, for the following reasons: - Auto-creation of topics may be disabled in your |ak| cluster. - Auto-creation automatically applies the default topic settings, like the replicaton factor. These default settings may not be what you want for certain output topics, for example, ``auto.create.topics.enable=true`` in the :ref:`Kafka broker configuration `. .. _streams_developer-guide_topics-internal: Internal topics ^^^^^^^^^^^^^^^ Internal topics are used internally by the |kstreams| application while executing, for example the changelog topics for state stores. These topics are created by the application and are only used by that stream application. From the broker perspective, internal topics are regular topics, in contrast to "broker internal" topics like ``__consumer_offsets`` or ``__transaction_state``. If security is enabled on the |ak| brokers, you must grant the underlying clients admin permissions so that they can create internal topics set. For more information, see :ref:`streams_developer-guide_security`. The internal topics follow the naming convention ``--``, but this convention isn't guaranteed for future releases. For more information about configuring parameters for internal topics, see :ref:`Internal topic parameters `. If automatic topic creation has been disabled, |kstreams| applications will continue to work. |kstreams| applications use the Admin Client, so internal topics are still created. Even if automatic topic creation is enabled, internal topics will be created with a specific number of partitions and a replication factor as specified in the ``StreamsConfig``. The following settings apply to the default configuration for internal topics: - For all internal topics, ``message.timestamp.type`` is set to ``CreateTime``. - For internal repartition topics, the compaction policy is ``delete`` and the retention time is ``-1`` (infinite). - For internal changelog topics for key-value stores, the compaction policy is ``compact``. - For internal changelog topics for windowed key-value stores, the compaction policy is ``delete,compact``. The retention time is set to 24 hours plus your setting for the windowed store. - For internal changelog topics for versioned state stores, the cleanup policy is ``compact``, and ``min.compaction.lag.ms`` is set to 24 hours plus the store's ``historyRetentionMs`` value. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst