Apache Kafka Confluent cluster Dockerization (Part-2)

Ankur Garg
5 min readAug 1, 2020

You can find earlier part of this article here:

Now, Kafka Broker

The most common definition of Apache Kafka says it is a distributed streaming platform developed by the Apache Software Foundation, It was originally developed by LinkedIn, and was subsequently open-sourced in early 2011.
It is a publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another

A streaming platform key capabilities:

- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.

And distributed streaming platform means its data can be distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. As each partition is replicated across a configurable number of servers, this makes it Fault Tolerant, a highly available and Horizontally scalable system.

Let’s set up Kafka broker docker cluster:
Similar to Zookeeper, We are setting up 3 nodes Kafka cluster.
Let’s 3 servers be
server1 (10.0.0.101)
server2 (10.0.0.102)
server3 (10.0.0.103)

Server1 (10.0.0.101) docker-compose file should be like this:

---
version: '3.2'
services:
kafka:
image: <kafka image>
hostname: kafka1
ports:
- 9092:9092
environment:
KAFKA_HEAP_OPTS: "-Xms1024M -Xmx1024M"
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.0.0.101:9092
KAFKA_ZOOKEEPER_CONNECT: 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092
KAFKA_LOG_RETENTION_HOURS: 48
volumes:
- /home/centos/kafka/:/var/lib/kafka/data/

Explanation of the properties used:

KAFKA_BROKER_ID: The broker id for this server. If unset, a unique broker id will be generated. Each node of a cluster should have unique broker id.KAFKA_LISTENERS: It is a comma-separated list of listeners, and the host/IP and port to which Kafka binds to.KAFKA_ADVERTISED_LISTENERS: Listeners to publish to ZooKeeper for clients to use, If this is not set, the value for listeners will be used.KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: This option allows us to enable or disable support metric delivery.KAFKA_METRIC_REPORTERS: The Metrics Reporter collects various metrics from the Kafka cluster. The Confluent Metrics Reporter is necessary for health monitoring and Confluent Auto Data Balancer to operate.KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: Bootstrap servers for the Kafka cluster for which metrics will be published. The metrics cluster may be different from the cluster(s) whose metrics are being collected.

KAFKA_LOG_RETENTION_HOURS: The number of hours to keep a log file before deleting it. The default is 168 hours.
Volumes: Volumes are the mechanism for persisting data generated by and used by Docker containers. Inside the container kafka generates data in /var/lib/kafka/data/ directory, now we are syncing it with /home/centos/kafka/ directory of host machine.

Server2 (10.0.0.102) Docker compose file:

---
version: '3.2'
services:
kafka:
image: <kafka image>
hostname: kafka2
ports:
- 9092:9092
environment:
KAFKA_HEAP_OPTS: "-Xms1024M -Xmx1024M"
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.0.0.102:9092
KAFKA_ZOOKEEPER_CONNECT: 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092
KAFKA_LOG_RETENTION_HOURS: 48
volumes:
- /home/centos/kafka/:/var/lib/kafka/data/

Server3 (10.0.0.103) Docker compose file:

---
version: '3.2'
services:
kafka:
image: <kafka image>
hostname: kafka2
ports:
- 9092:9092
environment:
KAFKA_HEAP_OPTS: "-Xms1024M -Xmx1024M"
KAFKA_BROKER_ID: 3
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://10.0.0.103:9092
KAFKA_ZOOKEEPER_CONNECT: 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 1
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092
KAFKA_LOG_RETENTION_HOURS: 48
volumes:
- /home/centos/kafka/:/var/lib/kafka/data/

Next is Schema registry,

Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.

Let’s Setup Schema registry in docker on server1 (10.0.0.101)
docker-compose.yml looks like:

---
version: '3.2'
services:
schema-registry:
image: <schema-registry image>
container_name: schema-registry
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HEAP_OPTS: "-Xmx512M"
SCHEMA_REGISTRY_KAFKASTORE.TOPIC: _schemas
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 10.0.0.101:2181,100.0.0.102:2181,10.0.0.103:2181
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://10.0.0.101:9092,PLAINTEXT://10.0.0.102:9092,PLAINTEXT://10.0.0.103:9092

Properties used:

SCHEMA_REGISTRY_KAFKASTORE.TOPIC: Schema Registry stores all schemas in a Kafka topic defined by kafkastore.topic. It should be of type string.SCHEMA_REGISTRY_LISTENERS: Comma-separated list of listeners that listen for API requests. SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: This is ZooKeeper leader election, If the Schema Registry Security Plugin is installed and configured to use ACLs, it must use this property.SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: It is used for kafka leader election.

Next, we have KSQLdb cluster:

KSQL is the powerful SQL streaming engine for Apache Kafka. Using SQL statements, you can build powerful stream processing applications. It is an event streaming database purpose-built to help developers create stream processing applications on top of Apache Kafka. KSQL is a distributed service, we can have multiple nodes in a cluster of Kafka and each node will process a portion of the input data from the input topic(s) as well as generate portions of the output data to output topic. KSQL doesn’t work on a quorum-based approach so, in this case, we can have an even number of nodes also, depending on traffic.

Let’s Setup KSQLdb in docker on server1 (10.0.0.101) and server2 (10.0.0.102)

docker-compose file on server1 (10.0.0.101) looks like:

---
version: '3.2'
services:
ksql:
image: <image ksql>
container_name: ksql
ports:
- 8088:8088
environment:
KSQL_HEAP_OPTS: "-Xmx2048M"
KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SCHEMA_REGISTRY_URL: 10.0.0.101:8081
KSQL_SERVICE_ID: _confluent-ksql-cluster_
KSQL_BOOTSTRAP_SERVERS: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092

Properties used are:

KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: This will create the log topic automatically at startup if set to true. KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: If set to true, SQL DB automatically creates a processing log stream when it starts up.KSQL_LISTENERS: A list of URIs, including the protocol, that the 
broker listens on.
KSQL_KSQL_SCHEMA_REGISTRY_URL: The Schema Registry URL path to connect ksqlDB to.KSQL_SERVICE_ID: This is the service ID of the ksqlDB server. This is used to define the ksqlDB cluster membership. If multiple ksqlDB servers connect to the same Kafka cluster (i.e. the same bootstrap servers and having same KSQL_SERVICE_ID) they will form a ksqlDB cluster and share the workload.

docker-compose file on server2 (10.0.0.102):

---
version: '3.2'
services:
ksql:
image: <image ksql>
container_name: ksql
ports:
- 8088:8088
environment:
KSQL_HEAP_OPTS: "-Xmx2048M"
KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SCHEMA_REGISTRY_URL: 10.0.0.101:8081
KSQL_SERVICE_ID: _confluent-ksql-cluster_
KSQL_BOOTSTRAP_SERVERS: 10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092

In the next part of this article, we would be setting up Kafka Rest proxy and Kafka connect worker cluster in docker.

You can find the remaining parts of this article here:

References:

https://docs.confluent.io/current/installation/configuration/broker-configs.htmlhttps://docs.confluent.io/current/schema-registry/installation/config.html

Until next time…

--

--