.. _audit-logs: Audit Logs ========== Audit logs provide a way to capture, protect, and preserve authorization activity into topics in |ak| clusters on |cp| using |csa|. Specifically, audit logs record the runtime decisions of the permission checks that occur as users attempt to take actions that are protected by ACLs and RBAC. Auditable events are recorded in chronological order, although it is possible for a consumer of audit log messages to fetch them out of order. Each auditable event includes information about who tried to do what, when they tried, and whether or not the system gave permission to proceed. By default, audit logs are enabled and are managed by the inter-broker principal (typically, the user ``kafka``), who has expansive permissions. The primary value of audit logs is that they provide data you can use to assess security risks in your local and remote |ak| clusters. They contain all of the information necessary to follow a user's interaction with your local or remote |ak| clusters, and provide a way to: * Track user and application access across the platform * Identify abnormal behavior and anomalies * Proactively monitor and resolve security risks You can use Splunk, S3, and other sink connectors to move your audit log data to a target platform for analysis. Prerequisite Before configuring and using audit logs, you must configure the :ref:`Confluent Server Authorizer `: :: authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer .. _auditable-events: Auditable events ---------------- .. note:: These are event authorizations, so at the time of logging *the event is about to occur*. Also, users may attempt to authorize a task solely to see if they can perform the task, but not follow through with it. In these instances, the authorization is still captured in the audit log. Each type of audit log event belongs to exactly one *event category*, and you can configure audit log routing rules to match specific event categories. Other and Authorize events are captured by default. You can configure audit logs to capture the following events: +------------------------------------+--------------------------------------------+--------------------+----------------------+ | Event name | Description | Category | Captured by Default | +====================================+============================================+====================+======================+ | mds.Authorize | An RBAC authorization is being requested | Authorize | Yes | | | using MDS. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.AlterConfigs | A |ak| configuration is being | Management | Yes | | | altered/updated | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.AlterPartitionReassignments | Reassignments for a topic partition are | Management | Yes | | | being altered. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.AlterReplicaLogDirs | Log directory of a replica of a partition | Management | Yes | | | is being altered. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ApiVersions | Validates supported API versions. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.CreateAcls | A |ak| broker ACL is being created. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.CreateDelegationToken | A delegation token is being created. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.CreatePartitions | Partitions are being added to a topic. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.CreateTopics | A topic is being created. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DeleteAcls | A |ak| broker ACL is being deleted | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DeleteGroups | A consumer group is being deleted. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DeleteRecords | Records are being deleted from a topic. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DeleteTopics | Topics are being deleted. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ElectLeaders | A replica is being elected as the leader | Management | Yes | | | of a topic partition. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ExpireDelegationToken | The delegation token is being marked as | Management | Yes | | | expired. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.IncrementalAlterConfigs | A dynamic configuration of a |ak| broker | Management | Yes | | | is being altered. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.OffsetDelete | Committed offset for a partition in a | Management | Yes | | | consumer group is being deleted. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.RenewDelegationToken | A delegation token is being renewed. | Management | Yes | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ReplicaStatus | Information about topic replication status | Management | No | | | is being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.AddPartitionsToTxn | A partition is being added to a | Produce | No | | | transaction. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.EndTxn | A transaction is being completed. | Produce | No | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.InitProducerId | A transaction or idempotent write is being | Produce | No | | | initialized by a |ak| producer. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.Produce | A |ak| producer is writing a batch of | Produce | No | | | records to a topic. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.AddOffsetsToTxn | A producer is sending offsets to the | Consume | No | | | consumer group coordinator and marking | | | | | those offsets as part of the current | | | | | transaction. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.FetchConsumer | A |ak| consumer is reading a batch of | Consume | No | | | records from a topic. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.JoinGroup | A |ak| consumer is joining a consumer | Consume | No | | | group. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.LeaveGroup | A |ak| consumer is leaving a group. | Consume | No | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ListOffsets | The offsets of a topic partition are being | Consume | No | | | requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.OffsetCommit | A consumer is committing offsets of a | Consume | No | | | partition that have been processed. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.OffsetFetch | Committed offsets of a consumer group are | Consume | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.SyncGroup | A |ak| consumer is participating in a | Consume | No | | | group rebalance. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.TxnOffsetCommit | Consumer offsets are being committed for a | Consume | No | | | consumer group within a transaction. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ControlledShutdown | A broker is being shut down. | Interbroker | No | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.FetchFollower | A broker with a follower replica of a | Interbroker | No | | | partition is fetching records for | | | | | replication. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.LeaderAndIsr | Controller is sending leader and ISR | Interbroker | No | | | (in-sync replica) state to a broker. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.StopReplica | Replication is being stopped for the | Interbroker | No | | | replica of a topic partition. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.UpdateMetadata | Controller is sending new metadata to a | Interbroker | No | | | broker. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.WriteTxnMarkers | A broker is writing transaction markers | Interbroker | No | | | to update transaction state. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DescribeAcls | Information about |ak| broker ACLs is | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DescribeConfigs | Information about broker configuration is | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DescribeDelegationToken | Information about a delegation token is | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DescribeGroups | Information about consumer groups is | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.DescribeLogDirs | Information about replica log directories | Describe | No | | | is being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.FindCoordinator | A |ak| consumer is requesting information | Describe | No | | | about its group coordinator. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ListGroups | Information about consumer groups is | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.ListPartitionReassignments | Current partition reassignments are | Describe | No | | | being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.Metadata | Topic metadata is being requested. | Describe | No | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.OffsetForLeaderEpoch | Last offsets corresponding to a leader | Describe | No | | | epoch are being requested. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ | kafka.Heartbeat | A consumer is letting the group know that | Heartbeat | No | | | it is still active. | | | +------------------------------------+--------------------------------------------+--------------------+----------------------+ .. _audit-log-content: Audit log content ----------------- The following example shows the contents of an audit log message that is returned from a cluster when a user creates a new topic: .. codewithvars:: json { "id": "889bdcd9-a378-4bfe-8860-180ef8efd208", "source": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "specversion": "0.3", "type": "io.confluent.kafka.server/authorization", "time": "2019-10-24T16:15:48.355Z", "datacontenttype": "application/json", "subject": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "confluentRouting": { "route": "confluent-audit-log-events" }, "data": { "serviceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "methodName": "kafka.CreateTopics", "resourceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "authenticationInfo": { "principal": "User:admin" }, "authorizationInfo": { "granted": true, "operation": "Create", "resourceType": "Topic", "resourceName": "app3-topic", "patternType": "LITERAL", "superUserAuthorization": true }, "request": { "correlation_id": "3", "client_id": "adminclient-6" }, "requestMetadata": { "client_address": "/127.0.0.1" } } } The first section of the audit log displays information about the log message itself. Think of it as the envelope in which the message is delivered; it contains the following audit log implementation details: +----------------------------------+-----------------------------------------------------------------+ | Audit log entry | Description | +==================================+=================================================================+ | id | A randomly-generated UUID that ensures uniqueness across all | | | sources. | +----------------------------------+-----------------------------------------------------------------+ | source | Identifies the |ak| cluster that made the decision about | | | whether or not the authorization was allowed. Always includes | | | the :ref:`CRN `, and is always the | | | same as the ``serviceName``. | +----------------------------------+-----------------------------------------------------------------+ | specversion | The version of CloudEvents in use | | | (`CloudEvents `__ is a vendor-neutral | | | specification that defines the format of event data). | +----------------------------------+-----------------------------------------------------------------+ | type | The type of event that occurred (authorization request). | +----------------------------------+-----------------------------------------------------------------+ | time | The time at which the event occurred. | +----------------------------------+-----------------------------------------------------------------+ | datacontenttype | Identifies the CloudEvent format that the audit data is | | | presented in (JSON). | +----------------------------------+-----------------------------------------------------------------+ | subject | Identifies the resource on which access has been allowed or | | | denied. | +----------------------------------+-----------------------------------------------------------------+ | confluentRouting | Identifies the topic to which the event is sent. Topic | | | routing is configurable. Refer to :ref:`configure-audit-logs` | | | for details. | +----------------------------------+-----------------------------------------------------------------+ The next section of the audit log displays data (``data``) you can use to assess security risks in your local and remote |ak| clusters. +----------------------------------+-----------------------------------------------------------------+ | Audit log entry | Description | +==================================+=================================================================+ | serviceName | Identifies the |ak| cluster that made the decision about | | | whether or not the authorization was allowed. Always includes | | | the :ref:`CRN `, and is always the | | | same as ``source``. | +----------------------------------+-----------------------------------------------------------------+ | methodName | The type of request that was made. For example, the | | | methodName above shows that the user attempted to create a | | | |ak| topic. This entry will include one of the event names | | | listed above, and display either ``mds`` (if it's an AUTHORIZE | | | event) or ``kafka`` (if any other event) before it. For | | | example: ``mds.Authorize``, ``kafka.Metadata``, | | | ``kafka.CreateTopics``. | +----------------------------------+-----------------------------------------------------------------+ | resourceName | The resource identifier (CRN) of the target of the request. For | | | example, the resourceName above indicates the user attempted to | | | create a topic in a uniquely-identified |ak| cluster. | +----------------------------------+-----------------------------------------------------------------+ | authenticationInfo | Identifies the user, service principal account, or principal | | | attempting the request. | +----------------------------------+-----------------------------------------------------------------+ | authorizationInfo | Includes details about the decision that was made to authorize | | | the request. The example above shows that the authorization | | | request to create a topic in a |ak| cluster was granted. | | | If RBAC is used, the resource and role (if any) are included. | | | If ACLs are used, the ACL information is also included. | +----------------------------------+-----------------------------------------------------------------+ | requestMetadata | Identifies how the authorization request was sent and shows the | | | requestor's IP address, if known. | +----------------------------------+-----------------------------------------------------------------+ .. _audit-log-content-examples: Audit log content examples ^^^^^^^^^^^^^^^^^^^^^^^^^^ The following example shows what an audit log message looks like when RBAC is enabled: .. codewithvars:: json { "id": "889bdcd9-a378-4bfe-8860-180ef8efd208", "source": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "specversion": "0.3", "type": "io.confluent.kafka.server/authorization", "time": "2019-10-24T16:15:48.355Z", "datacontenttype": "application/json", "subject": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "confluentRouting": { "route": "confluent-audit-log-events" }, "data": { "serviceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "methodName": "kafka.CreateTopics", "resourceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "authenticationInfo": { "principal": "User:resourceOwner1" }, "authorizationInfo": { "granted": true, "operation": "Create", "resourceType": "Topic", "resourceName": "app3-topic", "patternType": "LITERAL", "rbacAuthorization": { "role": "ResourceOwner", "scope": { "outerScope": [], "clusters": { "kafka-cluster": "j94C72q3Qpym0MJ9McEufQ" } } } } } } The following example shows what an audit log message looks like when you are using an ACL: .. codewithvars:: json { "id": "889bdcd9-a378-4bfe-8860-180ef8efd208", "source": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "specversion": "0.3", "type": "io.confluent.kafka.server/authorization", "time": "2019-10-24T16:15:48.355Z", "datacontenttype": "application/json", "subject": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "confluentRouting": { "route": "confluent-audit-log-events" }, "data": { "serviceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q", "methodName": "kafka.CreateTopics", "resourceName": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic", "authenticationInfo": { "principal": "User:alice" }, "authorizationInfo": { "granted": true, "operation": "DescribeConfigs", "resourceType": "Topic", "resourceName": "app3-topic", "patternType": "LITERAL", "aclAuthorization" : { "permissionType":"ALLOW", "host":"*" } }, } } .. _confluent-resource-name: Confluent Resource Name (CRN) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ CRNs provide a uniform way to uniquely identify resources referenced throughout |cp|, and are used in audit logs. Each CRN is a uniform resource identifier (URI) that uniquely identifies the associated Confluent resource cluster and is prefixed with ``crn://``. You can configure the authority (also known as host name) of a CRN at the cluster level. This setting is empty by default, even if using the Confluent |mds-long|. Unless you specify an authority name using the ``confluent.authorizer.authority.name=`` option in ``server.properties``, it will remain empty. If you have a single cluster or context, this approach could be sufficient. However, consider the scenario where you have multiple MDS production clusters that all refer to a single MDS for RBAC, and you also have a test group of clusters with its own MDS. In your production environment you are using the same MDS for RBAC, so on every cluster here you specify the authority as ``confluent.authorizer.authority.name=prod.mds.mycompany.com``. In your test environment you specify the authority as ``confluent.authorizer.authority.name=test.mds.mycompany.com``. The production cluster includes a topic named "travel" and your test cluster also has a topic named "travel" (the data in each is different). Because you configured an authority for each cluster, you can quickly and easily identify which cluster each "travel" topic resides in. Any time identical names are used in both your production and test clusters, the distinct authority provides a way to distinguish between the two. We recommend setting the authority as the DNS name associated with your installation, which is an easy way to make CRNs more meaningful. So all of the brokers in any logical group (for example, your company's staging environment) should specify the same ``authority.name`` (for example, ``confluent.authorizer.authority.name=mds.mycompany.com``) in ``server.properties``. For a logical group, such as clusters managed by a single MDS, you should specify the hostname used to reach the MDS. Do not specify different ``authority.name`` values for individual clusters unless you are deliberately keeping them separate (for example, a separate production cluster, separate staging cluster, or separate test cluster). CRN formats vary by resource: * |ak| cluster: ``/kafka=`` * Topic: ``/kafka=/topic=`` * Consumer group: ``/kafka=/group=`` * KSQL cluster: ``/kafka=/ksql=`` * Connect: ``/kafka=/connect=/connector=`` Here are examples of CRNs used to associate audit logs with specific Confluent resources: * |ak| cluster: ``crn://mds.mycompany.com/kafka=rKtuRNiDQb2k9NMml6rLfA`` * Topic: ``crn://mds.mycompany.com/kafka=rKtuRNiDQb2k9NMml6rLfA/topic=app1-topic`` * Consumer group: ``crn://mds.mycompany.com/kafka=rKtuRNiDQb2k9NMml6rLfA/group=app1-consumer-group`` * KQSL cluster: ``crn://mds.mycompany.com/kafka=rKtuRNiDQb2k9NMml6rLfA/ksql=default_`` * Connect: ``crn://mds.mycompany.com/kafka=rKtuRNiDQb2k9NMml6rLfA/connect=ydfk/connector=sink-connector`` .. _configure-audit-logs: Configuring audit logs ---------------------- Before setting up audit logs on your system, be aware of the following default settings: **Enabled by default:** * Topics: create and delete * ACLs: create and delete * Authorization requests related to RBAC * Every event in the :ref:`Management category ` **Disabled by default:** * Inter-broker communication * Produce and consume-related events * Heartbeat and DESCRIBE events .. _default-audit-log-config: Default audit log configuration ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Audit logs are enabled by default, and are managed by the inter-broker principal (typically, the user ``kafka``), who has expansive permissions. If you do not configure anything, then audit logs are sent to a single topic in the local cluster that captures "allowed" and "denied" authorization events. This topic is named ``confluent-audit-log-events``. Default audit logs capture :ref:`Management and Authorize categories of authorization events ` only. The following example shows the default audit log configuration: .. codewithvars:: json { "destinations": { "topics": { "confluent-audit-log-events": { "retention_ms": 7776000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } } For details about how to disable audit logging refer to :ref:`disable-audit-logs`. .. _custom-audit-log-config: Advanced audit log configuration ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Most organizations have strict and specific security governance requirements and policies. In such cases, you can configure audit logs on a more granular level. Specifically, you can configure: * The |ak| port over which to communicate with your audit log cluster * Multiple topics to capture logs of differing importance * Retention periods that serve the needs of your organization * Excluded principals, which ensures performance is not compromised by excessively high message volumes * Topic destination routes optimized for security and performance When enabling audit logging for produce and consume, be very selective about which events you want logged, and configure logging for only the most sensitive topics. .. figure:: ../images/audit-log-recommended-config.png :width: 600px After enabling audit logs in |ak| ``server.properties``, set the ``confluent.security.event.router.config`` to a JSON configuration similar to the one shown in the following example advanced audit log configuration: .. codewithvars:: json { "destinations": { "bootstrap_servers": [ "audit.example.com:9092" ], "topics": { "confluent-audit-log-events_denied": { "retention_ms": 2592000000 }, "confluent-audit-log-events_secure": { "retention_ms": 7776000000 }, "confluent-audit-log-events": { "retention_ms": 2592000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "excluded_principals": [ "User:Alice", "User:service_account_id" ], "routes": { "crn://mds1.example.com/kafka=*/topic=secure-*": { "produce": { "allowed": "confluent-audit-log-events_secure", "denied": "" }, "consume": { "allowed": "confluent-audit-log-events_secure", "denied": "confluent-audit-log-events_denied" } } } } Configure audit log destination """"""""""""""""""""""""""""""" The ``destinations`` option identifies the audit log cluster, which is provided by the bootstrap server. Use this setting to identify the communication channel between your audit log cluster and |ak|. You can use the ``bootstrap_server`` setting to deliver audit log messages to a specific cluster set aside for the sole purpose of retaining them. This ensures that no one can access or tamper with your organization’s audit logs, and enables you to selectively conduct more in-depth auditing of sensitive data, while keeping log volumes down for less sensitive data. If you deliver audit logs to another cluster, you must configure the connection to that cluster. Configure this connection as you would any producer writing to the cluster, using the prefix ``confluent.security.event.logger.exporter.kafka`` for the producer configuration keys, including the appropriate authentication information. For example, if you have a |ak| cluster listening on port 9092 of the host ``audit.example.com``, and that cluster accepts SCRAM-SHA-256 authentication and has a principal named ``confluent-audit`` that is allowed to connect and produce to the audit log topics, the configuration would look like the following: .. codewithvars:: json confluent.security.event.router.config=\ { \ "destinations": { \ "bootstrap_servers": ["audit.example.com:9092"], \ "topics": { \ "confluent-audit-log-events": { \ "retention_ms": 7776000000 } \ } \ }, \ "default_topics": { \ "allowed": "confluent-audit-log-events", \ "denied": "confluent-audit-log-events" \ } \ } confluent.security.event.logger.exporter.kafka.sasl.mechanism=SCRAM-SHA-256 confluent.security.event.logger.exporter.kafka.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="confluent-audit" \ password="secretP@ssword123"; Bootstrap servers may be provided in either the router configuration JSON or a producer configuration property; if they appear in both places, the router configuration takes precedence. Create audit log topics """"""""""""""""""""""" Most organizations create topics for specific logging purposes. You can send different kinds of audit log messages to multiple topics on a single |ak| cluster and specify a retention period for each topic. For example, if you send produce log messages and management log messages to the same topic, you cannot set the retention for the produce messages to one day while setting the retention for the management messages to 30 days. To achieve those retentions, you must send the messages to different topics. .. important:: The topic retention period you specify in the audit log configuration is enforced only if the audit log feature creates a topic that doesn't already exist. Do not attempt to edit this value in the configuration for a topic that already exists; any retention updates will not apply. See :ref:`audit-log-replication-retention` for information on how to use the CLI to revise the retention period for an existing audit log topic. You must use the prefix ``confluent-audit-log-events`` for all topics. The following topics are specified in the example: * ``confluent-audit-log-events_denied`` All log messages for denied consume requests to topics with names starting with the ``secure-`` prefix are sent to this topic. * ``confluent-audit-log-events_secure`` All log messages for allowed produce and consume requests to topics that use the ``secure-`` prefix are sent to this topic. * ``confluent-audit-log-events`` All log messages for allowed and denied Management and Authorization categories are sent to this topic. You can include topics that have not yet been created in the topic configuration. If you do so, they will be created automatically if or when the audit log principal has the necessary permissions, and will use the retention period specified in the configuration file. However, you cannot modify the retention period of existing topics merely by editing this configuration file. For details about changing the retention period for existing topics, see :ref:`audit-log-replication-retention`. Exclude principals """""""""""""""""" To configure an excluded principal, specify a user and/or service principal for whom audit logs will not be generated. In the configuration example, a trusted user, "Alice", whose sole responsibility is to read audit logs, is excluded. None of her authorization event requests will be logged. You can control the volume of audit log messages with excluded principals. If produce and consume audit logging is turned on, reads and writes conducted by principals generate additional audit logs, which means each log read and every log write results in yet another audit log message being created. This can quickly grow out of control and adversely impact cluster performance. To keep audit logs from accumulating into an unmanageable volume you should always include the following excluded principals in your configuration: - A trusted principal responsible for writing to audit log topics. Make audit log writes the only task this principal performs. Do not grant additional permissions to this principal, and do not allow other principals to write to audit log topics. - A trusted principal responsible for reading from audit log topics. Make audit log reads the only task this principal performs. Do not grant additional permissions to this principal, and do not allow other principals to read from audit log topics. Another approach to avoid unwanted audit log accumulation and the need to configure excluded principals altogether, is to direct audit logs to a secured cluster with limited access, and don’t enable audit logs on this cluster. Route topics """""""""""" You can flexibly route audit log messages to different topics based on subject, category, and whether authorizations were allowed or denied. You can also specify routes for specific subjects, or use prefixes (for example, ``_secure-*`` for sensitive authorization events) and wildcards to cover larger groups of resources. The routing example above shows a specific CRN that contains wildcard patterns (``crn://mds1.example.com/kafka=*/topic=secure-*``) for |ak| (``kafka=*``) that is using a prefix created for highly sensitive topics (``topic=secure-*``). All allowed produce authorization event logs matching this CRN pattern will be routed to the topic ``confluent-audit-log-events_secure``. All denied produce authorization event logs matching this CRN pattern will be routed nowhere, as specified by the empty string (``""``). All allowed consume authorization event logs matching this CRN pattern will be routed to the topic ``confluent-audit-log-events_secure``, and all denied consume authorization event logs matching this CRN pattern will be routed to the topic ``confluent-audit-log-events_denied``. .. _audit-log-security: Secure audit logs """"""""""""""""" Consider the following configuration recommendations for securing your audit logs: * For improved security, send audit logs to a different cluster. Limit permissions on the audit log cluster to ensure the audit log is trustworthy. * Create an audit log principal with limited permissions (for example, a user identified as ``confluent-audit``). The default is to use the broker principal, which has expansive permissions. To set up permissions for the audit log principal using an RBAC role binding: :: bin/confluent iam rolebinding create \ --principal User:confluent-audit \ --role DeveloperWrite \ --resource Topic:confluent-audit-log-events \ --prefix \ --kafka-cluster-id Alternatively, set the permissions using an ACL: :: bin/kafka-acls --bootstrap-server localhost:9092 \ --add \ --allow-principal User:confluent-audit \ --allow-host '*' \ --producer \ --topic confluent-audit-log-events \ --resource-pattern-type prefixed .. _audit-log-config-examples: Audit log configuration examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This section includes additional audit log configuration examples for specific use cases. Advanced topic routing use cases """""""""""""""""""""""""""""""" The following configuration example shows how to route billing and payroll data in clusters. In this use case, all of the billing topics use the prefix ``billing-`` and all of the payroll topics use the prefix ``payroll-``. These two departments have different auditors who require detailed audit log information. To ensure that audit log messages relating to the specific topics are routed only to the correct auditor, use: .. codewithvars:: json { "destinations": { "bootstrap_servers": [ "audit.example.com:9092" ], "topics": { "confluent-audit-log-events": { "retention_ms": 2592000000 }, "confluent-audit-log-events_payroll": { "retention_ms": 2592000000 }, "confluent-audit-log-events_billing": { "retention_ms": 2592000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "routes": { "crn://mds1.example.com/kafka=*/topic=billing-*": { "produce": { "allowed": "confluent-audit-log-events_billing", "denied": "confluent-audit-log-events_billing" }, "consume": { "allowed": "confluent-audit-log-events_billing", "denied": "confluent-audit-log-events_billing" }, "other": { "allowed": "confluent-audit-log-events_billing", "denied": "confluent-audit-log-events_billing" } }, "crn://mds1.example.com/kafka=*/topic=payroll-*": { "produce": { "allowed": "confluent-audit-log-events_payroll", "denied": "confluent-audit-log-events_payroll" }, "consume": { "allowed": "confluent-audit-log-events_payroll", "denied": "confluent-audit-log-events_payroll" }, "other": { "allowed": "confluent-audit-log-events_payroll", "denied": "confluent-audit-log-events_payroll" } } } } .. note:: Entries that display the ``"other"`` audit log event category should be interpreted as audit log ``"management"`` events. This example shows a configuration where users are frequently making random updates in the test cluster ``test-cluster-1``. To configure topic routing so that you don't see or receive any logs from this test cluster, but still receive logs from other clusters: .. codewithvars:: json { "destinations": { "bootstrap_servers": [ "audit.example.com:9092" ], "topics": { "confluent-audit-log-events": { "retention_ms": 2592000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "routes": { "crn://mds1.example.com/kafka=test-cluster-1": { "other": { "allowed": "", "denied": "" }, "authorize": { "allowed": "", "denied": "" } } } } .. note:: Alternatively, you could specify ``confluent.security.event.logger.enable=false`` for this test cluster. This configuration example is for a test automation cluster that creates and deletes topics using names like ``test-1234``. You do not want to create any audit log messages for these topics, but still wish to generate audit log messages from other topics: .. codewithvars:: json { "destinations": { "bootstrap_servers": [ "audit.example.com:9092" ], "topics": { "confluent-audit-log-events": { "retention_ms": 2592000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "routes": { "crn://mds1.example.com/kafka=*/topic=test-*": { "other": { "allowed": "", "denied": "" }, "authorize": { "allowed": "", "denied": "" } } } } .. note:: Entries that display the ``"other"`` audit log event category should be interpreted as audit log ``"management"`` events. In cases where you have frequent malicious attacks, you can create a specific topic for attackers. In this example that topic name is ``honeypot``. In this configuration, you set up audit logging exclusively for ``honeypot`` to see if you can find a usage pattern. While you might keep most logs for a day, in this case you are keeping logs with suspected hackers for one year: .. codewithvars:: json { "destinations": { "bootstrap_servers": [ "audit.example.com:9092" ], "topics": { "confluent-audit-log-events_users": { "retention_ms": 86400000 }, "confluent-audit-log-events_hackers": { "retention_ms": 31622400000 } } }, "default_topics": { "allowed": "confluent-audit-log-events_users", "denied": "confluent-audit-log-events_users" }, "routes": { "crn://mds1.example.com/kafka=*/topic=honeypot": { "produce": { "allowed": "confluent-audit-log-events_hackers", "denied": "confluent-audit-log-events_hackers" }, "consume": { "allowed": "confluent-audit-log-events_hackers", "denied": "confluent-audit-log-events_hackers" }, "other": { "allowed": "confluent-audit-log-events_hackers", "denied": "confluent-audit-log-events_hackers" } } } } .. _partition-audit-logs: Partitioning audit logs """"""""""""""""""""""" In cases where you want predictable ordering for messages on audit log topics, you can configure messages to go to a topic with a single partition: :: bin/kafka-topics --bootstrap-server localhost:9093 --command-config config/client.properties --create --topic confluent-audit-log-events-ordered --partitions 1 .. note:: If message volume is very high (particularly if produce or consume are enabled), this configuration may not be sustainable. In this case, you should direct audit log messages to one or more topics with additional partitions. Be aware that in such cases, consumers reading those messages may not receive them in the order in which they were sent. .. _supress-logs-internal-topics: Suppressing audit logs for internal topics """""""""""""""""""""""""""""""""""""""""" If you are considering audit logging for produce and consume, be sure to carefully consider the implications in terms of the number of audit log messages this may generate. If you can identify the topics or topic prefixes for which you require this level of detail, you can maintain a reasonable signal-to-noise ratio by specifying them and not auditing others. If you must enable audit logging for produce and consume, and are using a route similar to the following: .. codewithvars:: json "crn:///kafka=*/topic=*": { "produce": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "consume": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } } Be aware that |ak| and |cp| use internal topics for a variety of purposes that you may not need to capture. |ak| internal topics are prefixed with a ``_``, and |cp| internal topics are prefixed with ``_confluent-``. To exclude these prefixes, use the following configuration: .. codewithvars:: json "crn:///kafka=*/topic=_*": { "consume": { "allowed": "", "denied": "" }, "produce": { "allowed": "", "denied": "" }, "other": { "allowed": "", "denied": "" } } Optionally, you can use: .. codewithvars:: json "crn:///kafka=*/topic=_confluent-*": { "consume": { "allowed": "", "denied": "" }, "produce": { "allowed": "", "denied": "" }, "other": { "allowed": "", "denied": "" } } .. note:: In the preceding examples, entries that display the ``"other"`` audit log event category should be interpreted as audit log ``"management"`` events. If your produce or consume audit logs are using ``topic=*``, and are sending the audit logs to the same cluster (which we do not recommend), be sure to disable audit logging of the audit log topics themselves. Otherwise, your broker logs will return a large volume of warnings about feedback loops. .. codewithvars:: json "crn:///kafka=*/topic=confluent-audit-log-events*": { "consume": { "allowed": "", "denied": "" }, "produce": { "allowed": "", "denied": "" }, "other": { "allowed": "", "denied": "" } } .. _audit-log-replication-retention: Configuring replication factor and retention """""""""""""""""""""""""""""""""""""""""""" The following example shows how to use the CLI to create a topic that will be logged using a specific replication factor and retention period: :: bin/kafka-topics --bootstrap-server localhost:9092 \ --create --topic confluent-audit-log-events\ --replication-factor 3 \ --config retention.ms=7776000000 If you create a topic manually with a ``retention.ms`` that conflicts with the retention specified in the ``router.config`` then the manually-created setting is used. The default retention period is 90 days (7776000000 ms). .. note:: The retention period specified in the audit log configuration is only enforced if the audit log feature automatically creates a topic that doesn't already exist. Editing this value in the audit log configuration for a topic that already exists will not change the retention settings for that topic. The following example shows how to use the CLI to revise the retention period of an existing topic: :: bin/kafka-topics --bootstrap-server localhost:9092 \ --alter --topic confluent-audit-log-events\ --config retention.ms=12096000000000 .. _disable-audit-logs: Disabling audit logs -------------------- To disable audit logging in |ak| clusters, set the following in ``server.properties``: :: confluent.security.event.logger.enable=false Note that this disables all the structured audit logs, including for Management events. Authorization logs will still be written to log4j in |ak| format. For details, refer to :ref:`Kafka Logging `. .. _audit-logging-docker: Configuring audit logs in Docker -------------------------------- To configure Confluent Docker containers, pass properties to them using environment variables where the Docker container is run (typically in ``docker-compose.yml`` or in a Kubernetes configuration). For details about running the Confluent |ak| configuration in Docker refer to :ref:`kafka-config-docker`. To configure audit logging in Docker containers, determine which ``server.properties`` options you want to set in your audit logging configuration, and then modify them from lowercase dotted to uppercase with underscore characters, and prefix with ``KAFKA_``. The only difference between configuring audit logging in |cp| and Docker is the naming convention used. For example, the following fragment from ``docker-compose.yml`` shows how to configure ``confluent.security.event.router.config`` in a Docker container: :: version: '2' services: broker: image: confluentinc/cp-server:5.4.x-latest ... environment: KAFKA_AUTHORIZER_CLASS_NAME: io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer KAFKA_CONFLUENT_SECURITY_EVENT_ROUTER_CONFIG: "{\"routes\":{\"crn:///kafka=*/group=*\":{\"consume\":{\"allowed\":\"confluent-audit-log-events\",\"denied\":\"confluent-audit-log-events\"}},\"crn:///kafka=*/topic=*\":{\"produce\":{\"allowed\":\"confluent-audit-log-events\",\"denied\":\"confluent-audit-log-events\"},\"consume\":{\"allowed\":\"confluent-audit-log-events\",\"denied\":\"confluent-audit-log-events\"}}},\"destinations\":{\"topics\":{\"confluent-audit-log-events\":{\"retention_ms\":7776000000}}},\"default_topics\":{\"allowed\":\"confluent-audit-log-events\",\"denied\":\"confluent-audit-log-events\"},\"excluded_principals\":[\"User:kafka\",\"User:ANONYMOUS\"]}" .. _troubleshoot-audit-logs: Troubleshooting --------------- If the audit logging mechanism tries to write to the |ak| topic and doesn't succeed for any reason, it writes the JSON audit log message to log4j instead. You can specify where this message goes by configuring ``io.confluent.security.audit.log.fallback`` in |ak|'s broker ``log4j.properties`` in ``ce-kafka/config/``. Here is an example of the relevant ``log4j.properties`` content: :: log4j.appender.auditLogAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.auditLogAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.auditLogAppender.File=${kafka.logs.dir}/metadata-service.log log4j.appender.auditLogAppender.layout=org.apache.log4j.PatternLayout log4j.appender.auditLogAppender.layout.ConversionPattern=[%d] %p %m (%c)%n # Fallback logger for audit logging. Used when the Kafka topics are initializing. log4j.logger.io.confluent.security.audit.log.fallback=INFO, auditLogAppender log4j.additivity.io.confluent.security.audit.log.fallback=false If you find that audit log messages are not being created or routed to topics as you have configured them, check to ensure that the: - Audit log principal has been granted the required permissions - Principal can connect to the target cluster - Principal can create and produce to all of the topics You may need to temporarily increase log level: ``log4j.logger.io.confluent.security.audit=DEBUG``. .. _view-all-authorization-event-types: Viewing all authorization event types ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ If you need to view and debug authorization decisions on a cluster, you can configure audit logging to capture all authorization event types. You should only use this troubleshooting configuration for a single scenario because it will produce a large volume of messages very quickly. We recommend that you set the retention period to 30 minutes or less. This configuration does not capture heartbeats or traffic on internal topics (topics prefixed with ``_``). .. note:: In the following example, entries that display the ``"other"`` audit log event category should be interpreted as audit log ``"management"`` events. .. codewithvars:: json { "routes": { "crn://mds1.example.com/kafka=*": { "interbroker": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "describe": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "other": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } }, "crn://mds1.example.com/kafka=*/group=*": { "consume": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "describe": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "other": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } }, "crn://mds1.example.com/kafka=*/topic=*": { "produce": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "consume": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "describe": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" }, "other": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } }, "crn://mds1.example.com/kafka=*/topic=_*": { "produce": { "allowed": "", "denied": "" }, "consume": { "allowed": "", "denied": "" }, "describe": { "allowed": "", "denied": "" } } }, "destinations": { "bootstrap_servers": [ "host1:port" ], "topics": { "confluent-audit-log-events": { "retention_ms": 7776000000 } } }, "default_topics": { "allowed": "confluent-audit-log-events", "denied": "confluent-audit-log-events" } } .. _view-audit-logs-on-the-fly: Viewing audit logs on the fly ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ During initial setup or troubleshooting, you can quickly and iteratively examine recent audit log entries using simple command line tools. This can aid in audit log troubleshooting, and also when troubleshooting role bindings for RBAC. First pipe your audit log topics into a local file. Grep works faster on your local file system. If you have the ``kafka-console-consumer`` installed locally and can directly consume from the audit log destination |ak| cluster, your command should look similar to the following: :: ./kafka-console-consumer --bootstrap-server auditlog.example.com:9092 --consumer-config ~/auditlog-consumer.properties --whitelist '^confluent-audit-log-events.*' > /tmp/streaming.audit.log If you don’t have direct access and must instead connect using a “jump box” (a machine or server on a network that you use to access and manage devices in a separate security zone), use a command similar to the following: :: ssh -tt -i ~/.ssh/theuser.key theuser@jumpbox './kafka-console-consumer --bootstrap-server auditlog.example.com:9092 --consumer-config ~/auditlog-consumer.properties --whitelist '"'"'^confluent-audit-log-events.*'"'"' ' > /tmp/streaming.audit.log Regardless of which method you use, at this point you can open another terminal and locally run ``tail -f /tmp/streaming.audit.log`` to view audit log messages on the fly. After you’ve gotten the audit logs, you can use `grep `__ and `jq `_ (or another utility) to examine them. For example: :: tail -f /tmp/streaming.audit.log | grep 'connect' | jq -c '[.time, .data.authenticationInfo.principal, .data.authorizationInfo.operation, .data.resourceName]'