Install Confluent Operator and Confluent Platform

Complete the following steps to deploy Confluent Operator and Confluent Platform.

Important

  • Components must be installed in the order provided in the following steps.
  • Wait for all component services to start before installing the next component.

The examples in this guide use the following assumptions:

  • $VALUES_FILE refers to the configuration file you set up in Create the global configuration file.

  • To present simple and clear examples in the Operator documentation, all the configuration parameters are specified in the config file ($VALUES_FILE). However, in your production deployments, use the --set or --set-file option when applying sensitive data with Helm. For example:

    helm upgrade --install kafka \
     --set kafka.services.mds.ldap.authentication.simple.principal=”cn=mds,dc=test,dc=com” \
     --set kafka.services.mds.ldap.authentication.simple.credentials=”Developer!” \
     --set kafka.enabled=true
    
  • operator is the namespace that Confluent Platform is deployed in.

  • All commands are executed in the helm directory under the directory Confluent Operator was downloaded to.

Step 1. Install Confluent Operator

  1. Go to the helm directory under the directory where you downloaded Confluent Operator bundle.

  2. Enter the following command (using the example operator namespace):

    helm upgrade --install \
      operator \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set operator.enabled=true
    

    --set operator.enabled=true in the above command is a modification of the default setting false. This parameter change enables and starts components immediately after the component is installed and is necessary because Operator and Confluent Platform components must be started in a specific order.

    You should see output similar to the following example:

    NAME: operator
    LAST DEPLOYED: Tue Jan  7 17:47:04 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    The Confluent Operator
    
    The Confluent Operator interacts with Kubernetes API to create statefulsets
    resources. The Confluent Operator runs three controllers, two component
    specific controllers for kubernetes by providing components specific Custom
    Resource Definition (CRD) (for Kafka and Zookeeper) and one controller for
    creating other statefulsets resources.
    
    1. Validate if Confluent Operator is running.
    
    kubectl get pods -n operator | grep cc-operator
    
    2. Validate if custom resource definition (CRD) is created.
    
    kubectl get crd | grep confluent
    

Step 2. Install ZooKeeper

  1. Enter the following command to verify that Operator is running:

    kubectl get pods -n operator
    
  2. After verifying that Operator is running, enter the following command:

    helm upgrade --install \
      zookeeper \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set zookeeper.enabled=true
    
    NAME: zookeeper
    LAST DEPLOYED: Wed Jan  8 14:51:26 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Zookeeper Cluster Deployment
    
    Zookeeper cluster is deployed through CR.
    
      1. Validate if Zookeeper Custom Resource (CR) is created
    
         kubectl get zookeeper -n operator | grep zookeeper
    
      2. Check the status/events of CR: zookeeper
    
         kubectl describe zookeeper zookeeper -n operator
    
      3. Check if Zookeeper cluster is Ready
    
         kubectl get zookeeper zookeeper -ojson -n operator
    
         kubectl get zookeeper zookeeper -ojsonpath='{.status.phase}' -n operator
    
      4. Update/Upgrade Zookeeper Cluster
    
         The upgrade can be done either through the ``helm upgrade`` command or by editing the CR directly as below;
    
         kubectl edit zookeeper zookeeper  -n operator
    

Step 3. Install Kafka brokers

  1. Enter the following command to verify that all ZooKeeper services are running:

    kubectl get pods -n operator
    
  2. After verifying that all ZooKeeper services are running, enter the following command:

    helm upgrade --install \
      kafka \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set kafka.enabled=true
    
    NAME: kafka
    LAST DEPLOYED: Wed Jan  8 15:07:46 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Kafka Cluster Deployment
    
    Kafka Cluster is deployed to kubernetes through CR Object
    
    
      1. Validate if Kafka Custom Resource (CR) is created
    
         kubectl get kafka -n operator | grep kafka
    
      2. Check the status/events of CR: kafka
    
         kubectl describe kafka kafka -n operator
    
      3. Check if Kafka cluster is Ready
    
         kubectl get kafka kafka -ojson -n operator
    
         kubectl get kafka kafka -ojsonpath='{.status.phase}' -n operator
    
    ... output omitted
    

Step 4. Install Schema Registry

  1. Enter the following command to verify that all Kafka services are running:

    kubectl get pods -n operator
    
  2. After verifying that all Kafka services are running, enter the following command:

    helm upgrade --install \
      schemaregistry \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set schemaregistry.enabled=true
    
    NAME: schemaregistry
    LAST DEPLOYED: Thu Jan  9 15:51:21 2020
    NAMESPACE: operator
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    Schema Registry is deployed through PSC. Configure Schema Registry through REST Endpoint
    
      1. Validate if schema registry cluster is running
    
         kubectl get pods -n operator | grep schemaregistry
    
      2. Access
    
        Internal REST Endpoint : http://schemaregistry:8081  (Inside kubernetes)
    
        OR
    
        http://localhost:8081 (Inside Pod)
    
        More information about schema registry REST API can be found here,
    
        https://docs.confluent.io/current/schema-registry/docs/api.html
    

Step 5. Install Kafka Connect

  1. Enter the following command to verify that all Schema Registry services are running:

    kubectl get pods -n operator
    
  2. After verifying that all Schema Registry services are running, enter the following command:

    helm upgrade --install \
      connectors \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set connect.enabled=true
    

Step 6. Install Confluent Replicator

  1. Enter the following command to verify that all Connect services are running:

    kubectl get pods -n operator
    
  2. After verifying that all Connect services are running, enter the following command:

    helm upgrade --install \
      replicator \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set replicator.enabled=true
    

Step 7. Install Confluent Control Center

  1. Enter the following command to verify that all Replicator services are running:

    kubectl get pods -n operator
    
  2. After verifying that all Replicator services are running, enter the following command:

    helm upgrade --install \
      controlcenter \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set controlcenter.enabled=true
    

Step 8. Install ksqlDB

  1. Enter the following command to verify that all Confluent Control Center services are running:

    kubectl get pods -n operator
    
  2. After verifying that all Confluent Control Center services are running, enter the following command:

    helm upgrade --install \
      ksql \
      ./confluent-operator \
      --values $VALUES_FILE \
      --namespace operator \
      --set ksql.enabled=true
    

All components should be successfully installed and running.

kubectl get pods -n operator

NAME                           READY   STATUS    RESTARTS   AGE
cc-operator-54f54c694d-qjb7w   1/1     Running   0          4h31m
connectors-0                   1/1     Running   0          4h15m
connectors-1                   1/1     Running   0          4h15m
controlcenter-0                1/1     Running   0          4h18m
kafka-0                        1/1     Running   0          4h20m
kafka-1                        1/1     Running   0          4h20m
kafka-2                        1/1     Running   0          4h20m
ksql-0                         1/1     Running   0          21m
ksql-1                         1/1     Running   0          21m
replicator-0                   1/1     Running   0          4h18m
replicator-1                   1/1     Running   0          4h18m
schemaregistry-0               1/1     Running   0          4h18m
schemaregistry-1               1/1     Running   0          4h18m
zookeeper-0                    1/1     Running   0          4h30m
zookeeper-1                    1/1     Running   0          4h30m
zookeeper-2                    1/1     Running   0          4h30m

Step 9. Test the deployment

Complete the following steps to test and validate your deployment.

Internal access validation

Complete the following steps to validate internal communication.

  1. On your local machine, enter the following command to display cluster namespace information (using the example namespace operator). This information contains the bootstrap endpoint you need to complete internal validation.

    kubectl get kafka -n operator -oyaml
    

    The bootstrap endpoint is shown on the bootstrap.servers line.

    ... omitted
    
       internalClient: |-
          bootstrap.servers=kafka:9071
    
  2. On your local machine, use kubectl exec to start a bash session on one of the pods in the cluster. The example uses the default pod name kafka-0 on a Kafka cluster using the default name kafka.

    kubectl -n operator exec -it kafka-0 bash
    
  3. On the pod, create and populate a file named kafka.properties. There is no text editor installed in the containers, so you use the cat command as shown below to create this file. Use CTRL+D to save the file.

    Note

    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configure Security with Confluent Operator for additional information.

    cat << EOF > kafka.properties
    bootstrap.servers=kafka:9071
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    EOF
    
  4. On the pod, query the bootstrap server using the following command:

    kafka-broker-api-versions --command-config kafka.properties --bootstrap-server kafka:9071
    

    You should see output for each of the three Kafka brokers that resembles the following:

    kafka-1.kafka.operator.svc.cluster.local:9071 (id: 1 rack: 0) -> (
       Produce(0): 0 to 7 [usable: 7],
       Fetch(1): 0 to 10 [usable: 10],
       ListOffsets(2): 0 to 4 [usable: 4],
       Metadata(3): 0 to 7 [usable: 7],
       LeaderAndIsr(4): 0 to 1 [usable: 1],
       StopReplica(5): 0 [usable: 0],
       UpdateMetadata(6): 0 to 4 [usable: 4],
       ControlledShutdown(7): 0 to 1 [usable: 1],
       OffsetCommit(8): 0 to 6 [usable: 6],
       OffsetFetch(9): 0 to 5 [usable: 5],
       FindCoordinator(10): 0 to 2 [usable: 2],
       JoinGroup(11): 0 to 3 [usable: 3],
       Heartbeat(12): 0 to 2 [usable: 2],
    
    ... omitted
    

    This output validates internal communication within your cluster.

External access validation

Take the following steps to validate external communication after you have enabled an external load balancer for Kafka and added DNS entries as described in Configure Kafka External Load Balancer. Outside access to the Kafka brokers is only available through an external load balancer.

Note

The examples use default Confluent Platform component names.

  1. On your local machine, download and start the Confluent Platform.

    You use the Confluent CLI running on your local machine to complete external validation. The Confluent CLI is included with the Confluent Platform.

  2. On your local machine, run the command to get the bootstrap servers endpoint for external clients.

    kubectl get kafka -n operator -oyaml
    

    In the example output below, the bootstrap server endpoint is kafka.<providerdomain>:9092.

    ... omitted
    
    externalClient: |-
       bootstrap.servers=kafka.<providerdomain>:9092
       sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
       sasl.mechanism=PLAIN
       security.protocol=SASL_PLAINTEXT
    
  3. On your local machine where you have the Confluent Platform running locally, create and populate a file named kafka.properties with the following content.

    bootstrap.servers=kafka.<providerdomain>:9092
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test123";
    sasl.mechanism=PLAIN
    security.protocol=SASL_PLAINTEXT
    

    Note

    The example shows default SASL/PLAIN security parameters. A production environment requires additional security. See Configure Security with Confluent Operator for additional information.

  4. On your local machine, create a topic using the bootstrap endpoint kafka<providerdomain>:9092. The example below creates a topic with 1 partition and 3 replicas.

    kafka-topics --bootstrap-server kafka.<providerdomain>:9092 \
    --command-config kafka.properties \
    --create --replication-factor 3 \
    --partitions 1 --topic example
    
  5. On your local machine, produce to the new topic using the bootstrap endpoint kafka.<providerdomain>:9092. Note that the bootstrap server load balancer is the only Kafka broker endpoint required because it provides gateway access to the load balancers for all Kafka brokers.

    seq 10000 | kafka-console-producer \
    --topic example --broker-list kafka.<providerdomain>:9092 \
    --producer.config kafka.properties
    
  6. In a new terminal on your local machine, from the directory you put kafka.properties, issue the Confluent CLI command to consume from the new topic.

    kafka-console-consumer --from-beginning \
    --topic example --bootstrap-server kafka.<providerdomain>:9092 \
    --consumer.config kafka.properties
    

Successful completion of these steps validates external communication with your cluster.

External access validation of Confluent Control Center

Complete the following steps to access your Confluent Platform cluster using Control Center. Prior to the steps, enable an external load balancer for Confluent Control Center and add a DNS entry as described in Configure External Load Balancer.

  1. On your local machine, enter the following command to set up port forwarding to the default Confluent Control Center endpoint.

    kubectl port-forward svc/controlcenter 9021:9021 -n operator
    
  2. Connect to Control Center in a browser:

    http://localhost:9021/
    
  3. Log in to Control Center. Basic authorization credentials are set in the configuration file ($VALUES_FILE). In the example below, the userID is admin and the password is Developer1.

    ##
    ## C3 authentication
    ##
    auth:
      basic:
        enabled: true
        ##
        ## map with key as user and value as password and role
        property:
          admin: Developer1,Administrators
          disallowed: no_access
    

Important

Basic authentication to Confluent Control Center can be used for development testing. Typically, this authentication type is disabled for production environments and LDAP is configured for user access. LDAP parameters are provided in the Control Center values file.