Surviving Data Loss

by Ian Duffy, Nina Hanzlikova - 19 Dec 2017

Backing up Apache Kafka and Zookeeper to S3

What is Apache Kafka?

Apache Kafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, and wicked fast. It runs in production in many companies.

Backups are important with any kind of data. Apache Kafka lowers this risk of data loss with replication across brokers. However, it is still necessary to have protection in place in the event of user error.

This post will demo and introduce tools that our small team of three at Zalando uses to backup and restore Apache Kafka and Zookeeper.

Backing up Apache Kafka

Getting started with Kafka Connect
Kafka Connect is a framework for connecting Kafka with external systems. Its purpose is to make it easy to add new systems to scalable and secure stream data pipelines.

By using a connector by Spredfast.com, backing up and restoring the contents of a topic to S3 becomes a trivial task.

Demo
Download the prerequisite

Checkout the following repository:

$ git clone https://github.com/imduffy15/kafka-env.git 


It contains a docker-compose file for bring up a Zookeeper, Kafka, and Kafka Connect locally.

Kafka Connect will load all jars put in the ./kafka-connect/jars directory. Go ahead and download the Spredfast.com kafka-connect-s3.jar

$ wget "http://dl.bintray.com/iduffy/maven/com/spredfast/kafka/connect/s3/kafka-connect-s3/0.4.2-zBuild/kafka-connect-s3-0.4.2-zBuild-shadow.jar" -O kafka-connect-s3.jar


Bring up the stack
To boot the stack, use docker-compose up

Create some data
Using the Kafka command line utilities, create a topic and a console producer:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".$ kafka-console-producer --topic example-topic --broker-list localhost:9092>hello world


Using a console consumer, confirm the data is successfully written:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning hello world


Backing-up
Create a bucket on S3 to store the backups:

$ aws s3api create-bucket --create-bucket-configuration LocationConstraint=eu-west-1 --region eu-west-1 --bucket example-kafka-backup-bucket



Create a bucket on S3 to store the backups:


$ cat << EOF > example-topic-backup-tasks.json
{
 "name": "example-topic-backup-tasks",
 "config": {
   "connector.class": "com.spredfast.kafka.connect.s3.sink.S3SinkConnector",
   "format.include.keys": "true",
   "topics": "example-topic",
   "tasks.max": "1",
   "format": "binary",
 "s3.bucket": "example-kafka-backup-bucket",
   "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
   "local.buffer.dir": "/tmp"
 }
}
EOF
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-topic-backup-tasks.json /api/kafka-connect-1/connectors


(Check out the Spredfast documentation for more configuration options.)


After a few moments the backup task will begin. By listing the Kafka Consumer groups, one can identify the consumer group related to the backup task and query for its lag to determine if the backup is finished.

$ kafka-consumer-groups --bootstrap-server localhost:9092 --listNote: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

connect-example-topic-backup-task
$ kafka-consumer-groups --describe --bootstrap-server localhost:9092 --group connect-example-topic-backup-tasks


Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
example-topic                  0          1               1               0          consumer-5-e95f5858-5c2e-4474-bab9-8edfa722db21   /172.22.0.4                    consumer-5


The backup is completed when the lag reaches 0. On inspecting the S3 bucket, a folder of the raw backup data will be present.


Restoring
Let’s destroy all of the containers and start fresh:

$ docker-compose rm -f -v
$ docker-compose up


Re-create the topic:

$ kafka-topics --zookeeper localhost:2181 --create --topic example-topic --replication-factor 1 --partitions 1Created topic "example-topic".


Create a source with Kafka Connect:

$ cat << EOF > example-restore.json
{
"name": "example-restore",
"config": {
"connector.class": "com.spredfast.kafka.connect.s3.source.S3SourceConnector",
"tasks.max": "1",
"topics": "example-topic",
"s3.bucket": "imduffy15-example-kafka-backup-bucket",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"format": "binary",
"format.include.keys": "true"
}
}
EOF
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @example-restore.json /api/kafka-connect-1/connectors


(Check out the Spredfast documentation for more configuration options.)

The restore process should begin, in some time, the ‘hello world’ message will display when you run the Kafka console consumer again:

$ kafka-console-consumer --topic example-topic --bootstrap-server localhost:9092 --from-beginning 
hello world

Backing Up Zookeeper

Zookeeper’s role
Newer versions of Kafka ( >= version 0.10.x) use ZooKeeper in a small but still important coordination role. When new Kafka Brokers join the cluster, they use ZooKeeper to discover and connect to the other brokers. The cluster also uses ZooKeeper to elect the controller and track the controller epoch. Finally and perhaps most importantly, ZooKeeper stores the Kafka Broker topic partition mappings, which tracks the information stored on each broker. The data will still persist without these mappings, but won't be accessible or replicated.

Exhibitor
Exhibitor
is a popular supervisor system for ZooKeeper. It provides a number of housekeeping facilities for ZooKeeper as well as exposing a nice UI for exploring the stored data. It also provides some backup and restore capabilities for ZooKeeper out of the box. However, we should make sure we understand these features before relying on them.

On AWS, we run our Exhibitor brokers under the same stack. In this setup, when the stack auto scaling group is responsible for controlling when any of the Exhibitor instances are removed, it is relatively easy for multiple (or even all) Exhibitor brokers to be terminated at the same time. That’s why for our tests we set up an Exhibitor cluster and connected Kafka to it. We then indexed all the Kafka znodes to create an exhibitor backup. Finally, we tore down the Exhibitor stack and re-deployed it with the same config.

Unfortunately, after re-deploy, while the backup folder was definitely in S3, the new Exhibitor appliance did not recognise it as an existing index. With a bit of searching we found that this is actually the expected behaviour and the suggested solution is to read the S3 index and apply changes by hand.

Backing-up
Creating, deleting and re-assigning topics in Kafka is an uncommon occurrence for us, so we estimated that a daily backup task would be sufficient for our needs.

We came across Burry. Burry is a small tool which allows for snapshotting and restoring of a number of system critical stores, including ZooKeeper. It can save the snapshot dump locally or to various cloud storage options. Its backup dump is also conveniently organized along the znode structure making it very easy to work with manually if need be. Using this tool we set up a daily cron job on our production to get a full daily ZooKeeper snapshot and upload the resultant dump to an S3 bucket on AWS.

Restoring
Conveniently, Burry also works as a restore tool using a previous ZooKeeper snapshot. It will try to recreate the full snapshot znode structure and znode data. It also tries to be careful to preserve existing data, so if a znode exists it will not overwrite it.

But there is a catch. Some of the Kafka-created znodes are ephemeral and expected to expire when the Kafka Brokers disconnect. Currently Burry snapshots these as any other znodes, so restoring to a fresh Exhibitor cluster will recreate them. If we were to restore Zookeeper before restarting our Kafka brokers, we'd restore from the snapshot of the ephemeral znodes with information about the Kafka brokers in our previous cluster. If we then bring up our Kafka cluster, our new broker node IDs, which must be unique, would conflict with the IDs restored from our Zookeeper backup. In other words, we'd be unable to start up our Kafka brokers.

We can easily get around this problem by starting our new Zookeeper and new Kafka clusters before restoring the Zookeeper content from our backup. By doing this, the Kafka brokers will create their ephemeral znodes, and the Zookeeper restore will not overwrite these, and will go on to recreate the topic and partition assignment information. After restarting the Kafka brokers, the data stored on their persisted disks will once again be correctly mapped and available, and consumers will be able to resume processing from their last committed position.

Be part of Zalando Tech. We're hiring!

Similar blog posts