Apache Kafka tools
Mihalis Tsoukalos explains how to install and create utilities that use Apache Kafka, with its “producer” and “consumers” stream model.
Mihalis Tsoukalos explains how to install and create utilities that use Apache Kafka.
Nowadays, quite a few data architectures involve both a database and Apache Kafka, which is a distributed streaming platform and the subject of this tutorial. You can also find Kafka described as a publish-subscribe message system, which is a fancy way of saying the same thing as it being a distributed streaming platform. As Kafka is optimised for speed, it doesn’t offer too many facilities to users or administrators – like the ones offered by relational databases – and Kafka commands are long and difficult to remember, worried yet?
Don’t worry as here we’ll look at the basics of Kafka, including plenty of the commands and two Go utilities for working with it.
Installation
We’ll use Kafka from a Docker image. The main reason for this is that Kafka has many dependencies and parameters that complicate the installation process – but Docker images come ready to run and include all required dependencies.
For this tutorial we’ll use a Kafka Docker image offered by Landoop. This is one of the best that you can find because it enables you to work with Kafka as soon as you download it! Other Kafka Docker images, including the one offered by Confluent, require many customisations that demand time, which might discourage amateur users.
First, you will need to download the Kafka Docker image by executing the following command:
$ docker pull landoop/fast-data-dev:latest
If you ever want to update that image to the latest version, you can simply rerun the docker pull
command. Next, you will need to run the Kafka Docker image you downloaded in order to be able to connect to Kafka, its services and its ports:
$ docker run --rm --name=kafka-box -it -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -p 9581:9581 -p 9582:9582 -p 9583:9583 -p 9584:9584 -e ADV_HOST=127.0.0.1 landoop/fast-datadev:latest
This command makes Kafka and the desired port numbers available to the world; put simply, you can connect to some Kafka-related services from your local machine without the need to connect to the Docker image first. Kafka Broker listens to port number 9092, whereas Zookeeper listens to port number 2181. Last, the Schema Registry listens to port number 8081 and Kafka Connect to port number 8083.
Because this Docker image is part of a bigger Landoop project, you can visit http://127.0.0.1:3030
after running it to display lots of handy information that will make your life much easier.
The following commands show how to start a UNIX shell, which in this case is bash, in the Docker image:
$ docker exec -it kafka-box bash
Note that this command connects to a running Docker image named kafka-box , which was defined as a parameter of the docker run command. If you used a different name, you will need to change that string for the command to work.
The only disadvantage of using a Docker image is that after you shut down the Docker image, data and changes will be lost. The good thing is that there are workarounds to this problem; however, talking about them is beyond the scope of this tutorial. The biggest advantage of using a Docker image is that everything you do to that image, including deleting things and
stopping services, will not affect the image itself and your Linux machine. Thus, if you think there’s something wrong with the commands you’ve executed you can simply stop the running Docker image and restart a new one. If you really need data persistence and you aren’t familiar with Docker, you can always install Kafka on your Linux machine using your favourite package manager or download it and install it by following the instructions found at https://kafka. apache.org/quickstart.
You can list the available Kafka topics from your UNIX shell as follows:
$ docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list
This command can also be executed while being connected to the bash shell of the Docker image:
$ kafka-topics --zookeeper localhost:2181 --list
You can find the version of Kafka you’re using by executing kafka-topics --version from within the Docker image. Figure 1 (see page 88) shows the output of some of the commands presented in this section. As you can see, the default Kafka installation of the Docker image that we used comes with many Kafka topics. The next section will show you how to create your own Kafka topics and insert your own data in them.
Adding data to Kafka
For the presented commands to work you will need to be connected to a bash shell in the Docker image, or have Kafka installed on your Linux machine. You can create a new Kafka topic named NEW-LXF-TOPIC , with three partitions, as follows:
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic NEW-LXFTOPIC
The command might look a little complex at first but this is how Kafka works. You will have to connect to the Zookeeper server and you will have to define all required parameters as command line arguments. You can increase the partitions of an existing topic as follows:
$ kafka-topics --zookeeper localhost:2181 --alter --topic NEW-LXF-TOPIC --partitions 16
However, if the new value is smaller than the old value, the number of partitions will not be reduced. You can insert data into a Kafka topic from your Linux shell as follows:
$ kafka-console-producer --broker-list localhost:9092 --topic NEW-LXF-TOPIC
After you execute this command, you will have to type the data you want. Please bear in mind that all records should have the same format, based on the format of the first record that has been written to a Kafka topic – consistency is key here because otherwise you will have difficulties and failures. Count the number of records that exist in all partitions of a Kafka topic as follows:
$kafka-run-class kafka. tools. get offset shell--broker list local host :9092-- time-1--topic NEW-L X F-TOPIC| awk -F: ‘{print $3}’ | awk -F: ‘BEGIN{SUM=0} {sum+=$1} END {print sum}’
View that data in a topic on your screen as follows:
$ kafka-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic NEW-LXFTOPIC
Note that the aforementioned command will present the entire dataset from the selected topic and will keep waiting for new data; this means that you have to manually end the command by pressing Control+c. If you just want to display new data, you should omit the --from-beginning parameter from the kafka-consoleconsumer command. Lastly, if the topic is empty, you will see no output until you add records to that topic. Avoid manually terminating kafka-console-consumer ; you can use the --timeout-ms parameter to define a time-out period of waiting for new data as follows:
$ kafka-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic NEW-LXFTOPIC --timeout-ms 100
Delete all data from a Kafka topic without deleting the topic itself as follows:
$ kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entityname NEW-LXF-TOPIC
$ kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name NEW-LXF-TOPIC $ kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name NEW-LXF-TOPIC --delete-config retention.ms
The aforementioned commands need a little explaining because they don’t actually delete any data: Kafka purges data from topics. In order to purge all data from the Kafka topic, you need to change the retention time of that topic. The default retention time is seven days, so you need to change the retention time to one second, after which the messages from that topic will be deleted automatically. Then you can change the retention time of the topic back to seven days by deleting the existing value.
Should you wish to keep these messages longer, it would be better to create a Kafka consumer and store them to another database or place. Figure 2 (see page
89) shows the output of some of the commands presented in this section. Try adding and deleting topics and data to Kafka on your own.
Basic administration
It’s essential to know a few basic commands that enable you to administer a Kafka server. First we’ll learn how to delete an existing Kafka topic:
$ kafka-topics --zookeeper localhost:2181 --delete --topic LXF-TOPIC-TEST
Find more information about a specific Kafka topic as follows:
$ kafka-topics --describe --zookeeper localhost:2181 --topic NEW-LXF-TOPIC
The last command comes in very handy when you want to find out how many partitions a topic has. Enter the Zookeeper shell as follows:
$ zookeeper-shell localhost:2182
[zk: LOCALHOST:2182(CONNECTING) 0]
Lastly, you can check the performance of a Kafka topic as follows:
$ kafka-producer-perf-test --topic NEW-LXF-TOPIC --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap. servers=”localhost:9092”
The performance of a Kafka topic mainly depends on the number of partitions and the way these are distributed across different servers. Figure 3 (below) shows the output of some of the commands presented in this section. Note that the only way to safely backup Kafka is to have replication: there is no specific Kafka utility for backing up and restoring a topic. However, you can backup the Zookeeper State Data that is essential for Kafka to work; again, talking more about this is beyond the scope of this tutorial.
Creating a producer in Go
Now we’ll learn how to develop a Kafka producer written in Go. The Go producer will store data in Kafka using JSON records. As the Docker image makes some of its services and ports available to the rest of the world, you will not need to connect to the bash shell of the Docker image for the producer and the consumer Go utilities to work.
The producer.go utility writes a user-defined amount of random numbers that reside in a given range to a Kafka topic, which is also given as a parameter to the program. Therefore, you will need to provide the Go program with four parameters. The critical Go code of the producer.go utility is the following: conn, err := kafka.dialleader(context.background(), “tcp”, “localhost:9092”, topic, partition) conn.writemessages(
kafka.message{value: []byte(recordjson)}, )
The first statement defines the Kafka server that we want to connect to, and the second statement is what writes records to the desired Kafka topic. Note that the format of the records is JSON, which means that you will have to use the json.marshall() function in order to convert your data into JSON – this is the only difficult aspect of the producer.go utility.
As this point it’s a good idea to download the Go package that will be used for connecting to Kafka – without that package neither the Go producer nor the Go consumer will be able to work. This can be done by executing go get -u github.com/segmentio/kafka-go . You can find more information about the used Go library at https://github.com/segmentio/kafka-go.
After executing producer.go with the required parameters, you can go to your Kafka installation and look for the data, which can be done as follows:
$ docker exec -it kafka-box kafka-console-consumer --zookeeper localhost:2181 --topic
Creating a consumer in Go
Let’s turn our attention to creating a Kafka consumer written in Go (consumer.go) in order to read data from a Kafka topic. The critical Go code of the consumer.go utility is the following: r := kafka.newreader(kafka.readerconfig{ Brokers: []string{“localhost:9092”}, Topic: topic,
Partition: partition,
Minbytes: 10e3,
Maxbytes: 10e6, }) r.setoffset(0)
The kafka.readerconfig Go structure holds the details of the Kafka server we want to connect to, including the topic name and the partition number; as we are using a Kafka topic with a single partition, the partition number will be 0. Meanwhile, the Setoffset() function call specifies the offset from which the program will start reading data. If you want to read all data from a Kafka topic, use Setoffset(0) .
Figure 5 (above) shows a part of the output of
consumer.go as executed from the UNIX shell of the local machine. Because consumer.go keeps waiting for data, you will have to end it by pressing Control+c or using kill.
Figure 6 (right) shows the entire Go code of consumer.go. The simplicity of the code is pretty impressive for such a handy utility. Note that you will need to have the Record structure correctly defined for consumer.go to work, which means that you should know the format of the JSON data that you expect to get from the Kafka topic in advance. If you want your Go code to dynamically understand the JSON data, you will need to use Go reflection, which is an advanced feature of Go.
Admittedly, as you’ve seen so far, Kafka is pretty
different – but this is not necessarily a bad thing. You will need some time to get used to Kafka and learn how to administer, use and develop applications for it, but you will rewarded by its simplicity and speed.
When you have to store and process huge amounts of streaming messages, Kafka should be the first thing to come to mind – it’s great at storing and serving data fast. Additionally, Kafka is great at enabling you to get data from a supported source, filter it, process it and export it to another destination such as another database or service, an FTP site or a plain text file.
On the other hand, when you have to work with static data or small amounts of data, Kafka is still a viable solution that you should consider.