March 19, 2021
How we scaled our logging stack by creating per-team budgets
The ELK Stack is a vital component of real-time observability at Plaid. All Plaid services can send logs to an AWS hosted ElasticSearch cluster (ES for brevity), so these logs are available for querying in near real-time. Many teams rely on these logs to do their daily work. If the ElasticSearch cluster is unavailable, or logs are delayed, engineers can’t deploy their changes or debug existing issues, and the support team can’t address tickets filed by customers. One of the data infrastructure team’s key mandates is to maintain high availability of the ES cluster, and ensure that logs are up to date.
Our strategy designing the logging stack was to optimize for a simple, seamless experience from the point of view of engineers writing and querying logs: engineers should be able to log the data they want, query it how they want, and assume that everything will just work. In the summer of 2019, we realized our original laissez faire policy would not scale, and implemented safety rails to prevent resource intensive queries from crashing ES. While these measures prevented query related downtime, we had nothing protecting the cluster from surges in log ingestion volume. In early 2020, during a flurry of ElasticSearch outages, we decided we needed to address this problem in order to get ES reliability under control.
The Log Volume Problem
The above diagram shows our previous logging setup in action. In this set up, all services write logs to a single kinesis stream. A single Logstash deployment then reads from that stream and writes to the ES cluster. The upstream services log at very different rates, with some large services logging up to 20k messages per second, while small services log only 100s of messages per second.
This second diagram shows what happens during a significant logging spike. If Application B begins logging at a significantly higher volume than usual, Kinesis and Logstash will attempt to deliver that high log volume to ES. Under the significantly higher indexing burden, ES will be unable to keep up, causing an indexing slowdown for all application logs that share nodes with Application B’s logs, including Application A and Application C. Furthermore, ES nodes carrying Application B’s writable indices may drop out of the cluster, further delaying ingestion and lengthening the time to recovery. Since the majority of logs coming from Logstash now belongs to Application B, and ES can’t index those logs quickly, Logstash’s output slows down or in extreme cases blocks completely. Even if we were to stop Application B from writing additional logs, it could take the cluster hours to clear the backlog and catch up on log ingestion, especially if nodes were lost during the surge.
One poorly placed log line in a high traffic service could cause this kind of volume increase and accidental DDoSing of our logging cluster. With no logging volume safety rails, we had to rely on the engineer’s judgement to anticipate how much volume a new log line would produce. A single engineer’s miscalculation could take down the logging stack for the whole company.
Exploring a Solution
In the early spring of 2020, it became clear that we had to do something. We began exploring solutions to prevent these accidental logging volume surges from taking down the entire ES cluster.
We were looking for a solution that would allow us to protect the ES cluster by putting a hard limit on the log volume it would be expected to ingest at any given time. This would allow us to scale the cluster appropriately, and provide reasonable guarantees on ingestion speed and uptime. We also wanted to sandbox services so that they couldn’t interfere with each other - if one service is experiencing a log volume spike, this shouldn’t affect other services. We considered two classes of solutions that would meet these requirements. I’ll be referring to them as the dropping solution and the queueing solution. Both solutions involve establishing a budget per service of N messages/second.
The dropping solution: track the log volume over time of each service and drop logs as necessary to ensure the volume does not on average exceed N messages/second.
The queueing solution: only emit N messages/second to ES, and queue any remaining messages with one queue per service. While the service’s total messages/second > N, the queue will grow, and once the total rate < N the queue will get emptied over time.
These solutions each have their benefits and tradeoffs:
The dropping solution provides the benefit that manual intervention is never required to keep log volume under budget. In cases where the log volume will not reduce naturally, the queueing solution will require engineers to remove log lines in order to get the total log volume under budget and allow the queue to empty.
The queueing solution prevents unintentional data loss. If a spike in application traffic causes a service to go somewhat over budget, the dropping solution will drop logs. However, it’s not uncommon for services to temporarily spike in log volume - moderate, temporary increases in log volume most likely would not harm the ES cluster. Under the queueing solution, logs would become slightly delayed during a small surge and catch up afterwards.
Ultimately, we chose the queuing solution, as we felt it best met our needs. For many applications, delaying logs causes much less potential harm than dropping logs.
Implementing the Queueing Solution
Fortunately, the queueing solution was much simpler to implement and less dangerous to deploy. The dropping solution would have required introducing a new service into the logging stack to monitor volume and initiate data drops. This service’s dropping conditions would doubtless need to be tuned after deployment, and would make some erroneous log deletions in the meantime. The queueing solution can be implemented leveraging by configuring Logstash and Kinesis, and risked only log delay if the tuning was incorrect.
The above diagram shows the implementation of the queueing solution. Each service writes to a kinesis stream provisioned with enough shards to handle its average throughput with leeway for a surge. Each kinesis stream is read by a separate Logstash deployment. The number of pods in the Logstash deployment is less than or equal to the number of Kinesis shards, tuned such that Logstash can comfortably keep up with the stream. We use the Logstash Kinesis input plugin’s configurable value maxRecords, which sets a limit on how many records a consuming thread can retrieve per getRecords call, to cap the number of records Logstash can pull from kinesis per second. Because Logstash runs one consuming thread per Kinesis shard, and maxRecords applies per thread, the budget for each stream is equal to maxRecords * number of Kinesis shards.
If the stream is re-sharded, or we want to adjust the budget, we tune maxRecords. If Logstash is having trouble keeping up when the stream is under budget, we tune the number of Logstash pods, and if the number of pods is already equal to the number of Kinesis shards, we re-shard the stream.
This diagram shows the budgeting in action. Application B’s logging volume can increase drastically, but its Logstash deployment will only pull 15k records at a time from Kinesis and send them to ES. The Kinesis stream’s queue will begin to back up. Since it is only receiving the maximum expected amount of logs, the ES cluster can index as normal. Once the log that caused the surge is removed, the queue will drain and the logs for Application B will no longer be delayed. No other services will be impacted, and no logs need to be dropped.
In practice, our implementation of the queueing solution was not quite as simple as one-stream-per-service. Plaid has a lot of microservices, the majority of which only log a couple hundred messages per second. Total volume is dominated by a small number of mature, business critical high traffic services. Giving every service its own Logstash deployment and Kinesis stream would’ve been wasteful and greatly slowed down service deployment. Therefore, we bundled services by team: services belonging to the same team write to the same kinesis stream and share a single net budget. While this breaks the rule that a logging spike in one service must not interfere with other services, it does minimize the blast radius of a logging spike to the services owned by a single team.
Since the rollout of the log budgeting project, no log volume spike has caused ES downtime or resulted in log delay that impacted more than one team. We solved the problem we set out to solve. But, like any new system, it has some drawbacks as well.
By using separate Kinesis and Logstash deployments per service group, we get less mileage out of each shard/pod we have deployed. In the old world, we provisioned the Logstash and Kinesis stack with enough ‘leeway’ to allow some services to increase their logging volume significantly, so long as they didn’t all spike at the same time. With each service group having their own Logstash and Kinesis stack, they also need their own ‘leeway’ provisioning. Unused Logstash pods in Application A’s deployment can no longer soak up a spike in Application B.
As a result, we are now running more Kinesis shards and Logstash pods than we were previously. Also, by exploding the number of Logstash and Kinesis systems from 1 to N, we also had to multiply the number of Logstash and Kinesis monitoring and alerting rules by N.
Overall, the reliability pros of the log budgeting project well outweigh the cons. With the increased reliability of the ELK stack, engineers are able to move faster and deploy with more certainty. The project represents a solid step towards better observability in Plaid engineering.