Linux Format

Apache Kafka tools

Mihalis Tsoukalos explains how to install and create utilities that use Apache Kafka, with its “producer” and “consumers” stream model.

- Mihalis Tsoukalos is a UNIX person and the author of Go Systems Programmin­g and Mastering Go. You can reach him at https://www. mtsoukalos.eu and @mactsouk.

Mihalis Tsoukalos explains how to install and create utilities that use Apache Kafka.

Nowadays, quite a few data architectu­res involve both a database and Apache Kafka, which is a distribute­d streaming platform and the subject of this tutorial. You can also find Kafka described as a publish-subscribe message system, which is a fancy way of saying the same thing as it being a distribute­d streaming platform. As Kafka is optimised for speed, it doesn’t offer too many facilities to users or administra­tors – like the ones offered by relational databases – and Kafka commands are long and difficult to remember, worried yet?

Don’t worry as here we’ll look at the basics of Kafka, including plenty of the commands and two Go utilities for working with it.

Installati­on

We’ll use Kafka from a Docker image. The main reason for this is that Kafka has many dependenci­es and parameters that complicate the installati­on process – but Docker images come ready to run and include all required dependenci­es.

For this tutorial we’ll use a Kafka Docker image offered by Landoop. This is one of the best that you can find because it enables you to work with Kafka as soon as you download it! Other Kafka Docker images, including the one offered by Confluent, require many customisat­ions that demand time, which might discourage amateur users.

First, you will need to download the Kafka Docker image by executing the following command:

$ docker pull landoop/fast-data-dev:latest

If you ever want to update that image to the latest version, you can simply rerun the docker pull

command. Next, you will need to run the Kafka Docker image you downloaded in order to be able to connect to Kafka, its services and its ports:

$ docker run --rm --name=kafka-box -it -p 2181:2181 -p 3030:3030 -p 8081:8081 -p 8082:8082 -p 8083:8083 -p 9092:9092 -p 9581:9581 -p 9582:9582 -p 9583:9583 -p 9584:9584 -e ADV_HOST=127.0.0.1 landoop/fast-datadev:latest

This command makes Kafka and the desired port numbers available to the world; put simply, you can connect to some Kafka-related services from your local machine without the need to connect to the Docker image first. Kafka Broker listens to port number 9092, whereas Zookeeper listens to port number 2181. Last, the Schema Registry listens to port number 8081 and Kafka Connect to port number 8083.

Because this Docker image is part of a bigger Landoop project, you can visit http://127.0.0.1:3030

after running it to display lots of handy informatio­n that will make your life much easier.

The following commands show how to start a UNIX shell, which in this case is bash, in the Docker image:

$ docker exec -it kafka-box bash

Note that this command connects to a running Docker image named kafka-box , which was defined as a parameter of the docker run command. If you used a different name, you will need to change that string for the command to work.

The only disadvanta­ge of using a Docker image is that after you shut down the Docker image, data and changes will be lost. The good thing is that there are workaround­s to this problem; however, talking about them is beyond the scope of this tutorial. The biggest advantage of using a Docker image is that everything you do to that image, including deleting things and

stopping services, will not affect the image itself and your Linux machine. Thus, if you think there’s something wrong with the commands you’ve executed you can simply stop the running Docker image and restart a new one. If you really need data persistenc­e and you aren’t familiar with Docker, you can always install Kafka on your Linux machine using your favourite package manager or download it and install it by following the instructio­ns found at https://kafka. apache.org/quickstart.

You can list the available Kafka topics from your UNIX shell as follows:

$ docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --list

This command can also be executed while being connected to the bash shell of the Docker image:

$ kafka-topics --zookeeper localhost:2181 --list

You can find the version of Kafka you’re using by executing kafka-topics --version from within the Docker image. Figure 1 (see page 88) shows the output of some of the commands presented in this section. As you can see, the default Kafka installati­on of the Docker image that we used comes with many Kafka topics. The next section will show you how to create your own Kafka topics and insert your own data in them.

Adding data to Kafka

For the presented commands to work you will need to be connected to a bash shell in the Docker image, or have Kafka installed on your Linux machine. You can create a new Kafka topic named NEW-LXF-TOPIC , with three partitions, as follows:

$ kafka-topics --create --zookeeper localhost:2181 --replicatio­n-factor 1 --partitions 3 --topic NEW-LXFTOPIC

The command might look a little complex at first but this is how Kafka works. You will have to connect to the Zookeeper server and you will have to define all required parameters as command line arguments. You can increase the partitions of an existing topic as follows:

$ kafka-topics --zookeeper localhost:2181 --alter --topic NEW-LXF-TOPIC --partitions 16

However, if the new value is smaller than the old value, the number of partitions will not be reduced. You can insert data into a Kafka topic from your Linux shell as follows:

$ kafka-console-producer --broker-list localhost:9092 --topic NEW-LXF-TOPIC

After you execute this command, you will have to type the data you want. Please bear in mind that all records should have the same format, based on the format of the first record that has been written to a Kafka topic – consistenc­y is key here because otherwise you will have difficulti­es and failures. Count the number of records that exist in all partitions of a Kafka topic as follows:

$kafka-run-class kafka. tools. get offset shell--broker list local host :9092-- time-1--topic NEW-L X F-TOPIC| awk -F: ‘{print $3}’ | awk -F: ‘BEGIN{SUM=0} {sum+=$1} END {print sum}’

View that data in a topic on your screen as follows:

$ kafka-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic NEW-LXFTOPIC

Note that the aforementi­oned command will present the entire dataset from the selected topic and will keep waiting for new data; this means that you have to manually end the command by pressing Control+c. If you just want to display new data, you should omit the --from-beginning parameter from the kafka-consolecon­sumer command. Lastly, if the topic is empty, you will see no output until you add records to that topic. Avoid manually terminatin­g kafka-console-consumer ; you can use the --timeout-ms parameter to define a time-out period of waiting for new data as follows:

$ kafka-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic NEW-LXFTOPIC --timeout-ms 100

Delete all data from a Kafka topic without deleting the topic itself as follows:

$ kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entityname NEW-LXF-TOPIC

$ kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name NEW-LXF-TOPIC $ kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name NEW-LXF-TOPIC --delete-config retention.ms

The aforementi­oned commands need a little explaining because they don’t actually delete any data: Kafka purges data from topics. In order to purge all data from the Kafka topic, you need to change the retention time of that topic. The default retention time is seven days, so you need to change the retention time to one second, after which the messages from that topic will be deleted automatica­lly. Then you can change the retention time of the topic back to seven days by deleting the existing value.

Should you wish to keep these messages longer, it would be better to create a Kafka consumer and store them to another database or place. Figure 2 (see page

89) shows the output of some of the commands presented in this section. Try adding and deleting topics and data to Kafka on your own.

Basic administra­tion

It’s essential to know a few basic commands that enable you to administer a Kafka server. First we’ll learn how to delete an existing Kafka topic:

$ kafka-topics --zookeeper localhost:2181 --delete --topic LXF-TOPIC-TEST

Find more informatio­n about a specific Kafka topic as follows:

$ kafka-topics --describe --zookeeper localhost:2181 --topic NEW-LXF-TOPIC

The last command comes in very handy when you want to find out how many partitions a topic has. Enter the Zookeeper shell as follows:

$ zookeeper-shell localhost:2182

[zk: LOCALHOST:2182(CONNECTING) 0]

Lastly, you can check the performanc­e of a Kafka topic as follows:

$ kafka-producer-perf-test --topic NEW-LXF-TOPIC --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap. servers=”localhost:9092”

The performanc­e of a Kafka topic mainly depends on the number of partitions and the way these are distribute­d across different servers. Figure 3 (below) shows the output of some of the commands presented in this section. Note that the only way to safely backup Kafka is to have replicatio­n: there is no specific Kafka utility for backing up and restoring a topic. However, you can backup the Zookeeper State Data that is essential for Kafka to work; again, talking more about this is beyond the scope of this tutorial.

Creating a producer in Go

Now we’ll learn how to develop a Kafka producer written in Go. The Go producer will store data in Kafka using JSON records. As the Docker image makes some of its services and ports available to the rest of the world, you will not need to connect to the bash shell of the Docker image for the producer and the consumer Go utilities to work.

The producer.go utility writes a user-defined amount of random numbers that reside in a given range to a Kafka topic, which is also given as a parameter to the program. Therefore, you will need to provide the Go program with four parameters. The critical Go code of the producer.go utility is the following: conn, err := kafka.dialleader(context.background(), “tcp”, “localhost:9092”, topic, partition) conn.writemessa­ges(

kafka.message{value: []byte(recordjson)}, )

The first statement defines the Kafka server that we want to connect to, and the second statement is what writes records to the desired Kafka topic. Note that the format of the records is JSON, which means that you will have to use the json.marshall() function in order to convert your data into JSON – this is the only difficult aspect of the producer.go utility.

As this point it’s a good idea to download the Go package that will be used for connecting to Kafka – without that package neither the Go producer nor the Go consumer will be able to work. This can be done by executing go get -u github.com/segmentio/kafka-go . You can find more informatio­n about the used Go library at https://github.com/segmentio/kafka-go.

After executing producer.go with the required parameters, you can go to your Kafka installati­on and look for the data, which can be done as follows:

$ docker exec -it kafka-box kafka-console-consumer --zookeeper localhost:2181 --topic The output of the producer.go program itself is pretty simple, as it prints a dot character on the screen for each 50 records it writes to Kafka. Figure 4 (see page 90, top right) shows more about that process, as well as the output of the kafka-console-consumer command itself.

Creating a consumer in Go

Let’s turn our attention to creating a Kafka consumer written in Go (consumer.go) in order to read data from a Kafka topic. The critical Go code of the consumer.go utility is the following: r := kafka.newreader(kafka.readerconf­ig{ Brokers: []string{“localhost:9092”}, Topic: topic,

Partition: partition,

Minbytes: 10e3,

Maxbytes: 10e6, }) r.setoffset(0)

The kafka.readerconf­ig Go structure holds the details of the Kafka server we want to connect to, including the topic name and the partition number; as we are using a Kafka topic with a single partition, the partition number will be 0. Meanwhile, the Setoffset() function call specifies the offset from which the program will start reading data. If you want to read all data from a Kafka topic, use Setoffset(0) .

Figure 5 (above) shows a part of the output of

consumer.go as executed from the UNIX shell of the local machine. Because consumer.go keeps waiting for data, you will have to end it by pressing Control+c or using kill.

Figure 6 (right) shows the entire Go code of consumer.go. The simplicity of the code is pretty impressive for such a handy utility. Note that you will need to have the Record structure correctly defined for consumer.go to work, which means that you should know the format of the JSON data that you expect to get from the Kafka topic in advance. If you want your Go code to dynamicall­y understand the JSON data, you will need to use Go reflection, which is an advanced feature of Go.

Admittedly, as you’ve seen so far, Kafka is pretty

different – but this is not necessaril­y a bad thing. You will need some time to get used to Kafka and learn how to administer, use and develop applicatio­ns for it, but you will rewarded by its simplicity and speed.

When you have to store and process huge amounts of streaming messages, Kafka should be the first thing to come to mind – it’s great at storing and serving data fast. Additional­ly, Kafka is great at enabling you to get data from a supported source, filter it, process it and export it to another destinatio­n such as another database or service, an FTP site or a plain text file.

On the other hand, when you have to work with static data or small amounts of data, Kafka is still a viable solution that you should consider.

 ??  ?? Figure 1: How you can list the topics in a Kafka server that runs on a Docker image in two ways, as well as how to find the version of the Kafka server you are using.
Figure 1: How you can list the topics in a Kafka server that runs on a Docker image in two ways, as well as how to find the version of the Kafka server you are using.
 ??  ??
 ??  ?? Figure 2: The use of some basic Kafka commands that enable you to list all available topics, define a new topic with the desired number of partitions, write data to a topic and read from a topic.
Figure 2: The use of some basic Kafka commands that enable you to list all available topics, define a new topic with the desired number of partitions, write data to a topic and read from a topic.
 ??  ?? Figure 3: The use of some administra­tive commands that enable you to delete an existing Kafka topic, print informatio­n about a topic and connect to the Zookeeper shell.
Figure 3: The use of some administra­tive commands that enable you to delete an existing Kafka topic, print informatio­n about a topic and connect to the Zookeeper shell.
 ??  ?? Figure 4: The use of the producer.go utility as well as the results of its execution. As we a using a Docker image, all changes will be lost when we stop the Docker image from running.
Figure 4: The use of the producer.go utility as well as the results of its execution. As we a using a Docker image, all changes will be lost when we stop the Docker image from running.
 ??  ?? Figure 6: The entire Go code of consumer.go, which is a utility that reads data from a Kafka topic using the JSON format. Note that the program already knows the format of the JSON records it will receive.
Figure 6: The entire Go code of consumer.go, which is a utility that reads data from a Kafka topic using the JSON format. Note that the program already knows the format of the JSON records it will receive.
 ??  ?? Figure 5: The consumer.go utility in action. As consumer.go does not use Go reflection, it should know in advance the format of the data.
Figure 5: The consumer.go utility in action. As consumer.go does not use Go reflection, it should know in advance the format of the data.

Newspapers in English

Newspapers from Australia