Introduction
Apache Kafka is a high throughput auto sharded/replicated data streaming technology. It emerged from the deep tech minds at Linkedin and is now maintained and developed by startup Confluent.
Reading through their many excellent blog posts and documentation, it's clear that they had physical hardware in mind when designing and implementing a Kafka cluster. I've found no guidelines about running Kafka in the cloud, so I though I'd share my experiences with you here. Some (but hopefully not all) of this is only useful to AWS and/or Kafka noobs.
AWS Topology
As you probably know, AWS spans geographic regions with failure zones (availability zones) within each region. My infrastructure is based in eu-west-1 which has the three availability zones a, b and c. Availability zone failures are quite rear and region failures rarer (see AWS Service Health Dashboard). From the AWS Architecture Center we have this useful schematic:
The service I designed was bound to one region and spread over 3 AZs, such that one AZ can go down with affecting availability.
Deploying Kafka
So how do we deploy a Kafka cluster such that one AZ can go down without affecting it's availability? Kafka has two ways of replicating data, sharding/replicating and mirroring. Sharding/replicating is Kafka's natural running mode and pretty much it's raison d'etre. Mirroring (mirrormaker) replicates whole data from a source to target cluster, much like classic RDBMS master-slave replication. So which to choose? (For the sake of brevity I will not discuss the deployment of zookeeper which has been deployed as a 3 node cluster with 1 node in each AZ.)- Each zone has a full copy of data at 3x the cost.
- Consumers (Kafka speak for subscriber) would need reconfiguring during a zone failure and grapple with consumer offsets.
On the other hand, sharding/replication over zones doesn't seem to fit as it should be used in closely proximate racks. In order to decide which way to go, we need to understand a little more about AWS and Kafka.
It's all about latency!
Kafka has an interesting and rather clever replication strategy called the ISR, or in-sync replica. The ISR is a flexible replica set which determines which replica nodes are in-sync based upon how long a partition follower has been lagging behind a leader. If a follower is not fully in sync for the last replica.lag.time.max.ms seconds, it is ejected from the ISR. By default this is 10 seconds. NB the ISR contains the leader in its number which is somewhat confusing as the leader is insync by definition!
Apart from sickly machines and other local issues, latency across availability zones is key to understanding if replicating over AZs is feasible. According to AWS, availability zones within a region are connected by low latency links. In tests (hours, not days unfortunately) I found latency to be of the order of milliseconds and in a couple of instances, over a second. A nice study dating back some years can be found here. One can be confident that things have only get better since then. In effect, you can treat AZs as if they are racks in a DC.
How many brokers?
For my cluster, we have a message rate of about 50,000 messages a second with multiple consumer and producer jobs plus a mirrormaker data archiver job. Your choice is to have fewer more powerful nodes or more lower power nodes. Kafka will take advantage of either. In a sharded/replicated model, the more broker nodes the better (in a n node cluster, 1 node failure leads to 1/(n^2-n) additional load on surviving nodes). My cluster is thus 6 lower power nodes. Kafka loves memory and exploits the unix filesystem cache. Thus the instance type chosen was r3.xlarge. I will describe my experience of cluster expansion in a future post, as initally we started out with a 3 node cluster.
Beware - Not Yet Rack Aware!
Unfortunately, Kafka has no rack-awareness (yet, see this proposed fix) meaning that partition leaders and followers could end up in the same AZ which would be a bad thing in the event of a zone failure! Here I equate rack to AZ as already discussed.
There are two strategies around this. The first is to over replicate such that NR(replicas) > NM(max no. nodes per zone). With 6 nodes, there are 2 nodes per zone, so NR=3 is required.
The second strategy is to monitor and manually assign your replicas to brokers in different AZs. The advantage here is that you can run with RF:2 if you really want to. With this strategy, you'll need to run as a scheduled job to pick up new topics and any cluster changes (e.g. expansion) and then shuffle partitions in order to achieve the desired anti-affinity.
As a corollary, it would not make sense to have 4 nodes with the over replicating strategy. Why? Because NR has increased 50% (i.e. 2 to 3) but the cluster has only increased by 33% (i.e. 3 to 4). You might as well rule 5 out as well and go straight to 6.
The second strategy is to monitor and manually assign your replicas to brokers in different AZs. The advantage here is that you can run with RF:2 if you really want to. With this strategy, you'll need to run as a scheduled job to pick up new topics and any cluster changes (e.g. expansion) and then shuffle partitions in order to achieve the desired anti-affinity.
As a corollary, it would not make sense to have 4 nodes with the over replicating strategy. Why? Because NR has increased 50% (i.e. 2 to 3) but the cluster has only increased by 33% (i.e. 3 to 4). You might as well rule 5 out as well and go straight to 6.
Chosen Design
RF:3, Producer ACKS:-1, Min Insync replicas: 2. In other words, there must be at least 2 nodes in the ISR to acknowledge the writes and single node failure is tolerable. Monitoring ISR is paramount and you should aim to keep the ISR at 3, or stated another way min.insync.replicas+1. With this set up, a node dropping out of the ISR is tantamount to a node crashing, hence the caution about monitoring.
Code Appropriately For Your Chosen Design
With this set up we have message delivery as follows:
- Under normal running conditions: Once
- Under node/zone failure: For some messages possibly more than once.
Since offset management and message delivery are not atomic, it is possible to re-read a message via a replica during a failure. This should not affect many messages.
Code appropriately for your chosen design!
EBS
It has become something of a trend in the world of cloud computing / clustered databases (e.g. nosql etc) to treat data as a second class citizen. You are actively discouraged from persisting data and told to reconstruct node data from surviving replicas in the event of failure.Wrong, wrong, wrong! Without digressing too much, it is more sensible to come up with a infrastructure build pattern that persists data volumes (EBS in the case of AWS). The two big drivers for doing this are:
- Guarding against user error. Unless you police your data system carefully someone is going to create a data structure RF:1!
- Reducing mean time to recovery. Recover your nodes in seconds/minutes not hours/days.
Pulling it all together
Our initial design looks like this :
Kafka nodes = 6 (2 per AZ)
Zookeeper nodes = 3 (1 per AZ)
Producer acks = -1 (min insync replicas acknowledge write requests)
Replication Factor = 3 (which includes the leader)
Min in-sync replicas = 2 (to be implemented)
offset management = kafka (preferable to zookeeper)
I hope you've found this post useful and I'd be happy to receive your feedback. In future posts I'll talk about my other Kafka experience such as performance, consumer lag, expanding a kafka cluster and mirrormaking!
Nice work, your blog is concept-oriented, kindly share more blogs like this
ReplyDeleteAWS Online Course