Optimize write throughput for Amazon Kinesis Information Streams


Amazon Kinesis Information Streams is utilized by many shoppers to seize, course of, and retailer information streams at any scale. This stage of unparalleled scale is enabled by dividing every information stream into a number of shards. Every shard in a stream has a 1 Mbps or 1,000 data per second write throughput restrict. Whether or not your information streaming software is gathering clickstream information from an online software or recording telemetry information from billions of Web of Issues (IoT) units, streaming purposes are extremely vulnerable to a various quantity of information ingestion. Typically such a big and sudden quantity of information may very well be the factor we least anticipate. As an example, think about software logic with a retry mechanism when writing data to a Kinesis information stream. In case of a community failure, it’s widespread to buffer information domestically and write them when connectivity is restored. Relying on the speed that information is buffered and the period of connectivity subject, the native buffer can accumulate sufficient information that might saturate the obtainable write throughput quota of a Kinesis information stream.

When an software makes an attempt to jot down extra information than what’s allowed, it should obtain write throughput exceeded errors. In some cases, not with the ability to deal with these errors in a well timed method can lead to information loss, sad prospects, and different undesirable outcomes. On this publish, we discover the everyday causes behind write throughput exceeded errors, together with strategies to establish them. We then information you on swift responses to those occasions and supply a number of options for mitigation. Lastly, we delve into how on-demand capability mode may be beneficial in addressing these errors.

Why will we get write throughput exceeded errors?

Write throughput exceeded errors are typically brought on by three totally different situations:

  • The best is the case the place the producer software is producing extra information than the throughput obtainable within the Kinesis information stream (the sum of all shards).
  • Subsequent, we’ve the case the place information distribution will not be even throughout all shards, often called scorching shard subject.
  • Write all through errors can be brought on by an software selecting a partition key to jot down data at a price exceeding the throughput supplied by a single shard. This case is considerably just like scorching shard subject, however as we see later on this publish, not like a scorching shard subject, you possibly can’t remedy this downside by including extra shards to the info stream. This conduct is usually often called a scorching key subject.

Earlier than we focus on the best way to diagnose these points, let’s take a look at how Kinesis information streams manage information and its relationship to jot down throughput exceeded errors.

A Kinesis information stream has a number of shards to retailer information. Every shard is assigned a key vary in 128-bit integer area. When you view the small print of an information stream utilizing the describe-stream operation within the AWS Command Line Interface (AWS CLI), you possibly can truly see this key vary project:

$ aws kinesis describe-stream --stream-name my-data-stream
"StreamDescription": {
  "Shards": [
    {
      "ShardId": "shardId-000000000000",
      "HashKeyRange": {
        "StartingHashKey": "0",
        "EndingHashKey": 
        "85070591730234615865843651857942052863"
       }
    },
    {
       "ShardId": "shardId-000000000001",
       "HashKeyRange": {
       "StartingHashKey": 
          "85070591730234615865843651857942052864",
       "EndingHashKey": 
         "170141183460469231731687303715884105727"
       }
    }
  ]
}

When a producer software invokes the PutRecord or PutRecords API, the service calculates a MD5 hash for the PartitionKey specified within the report. The ensuing hash is used to find out which shard to retailer that report. You’ll be able to take extra management over this course of by setting the ExplicitHashKey property within the PutRecord request to a hash key that falls inside a particular shard’s key vary. As an example, setting ExplicitHashKey to 0 will assure that report is written to shard ID shardId-0 within the stream described within the previous code snippet.

How partition keys are distributed throughout obtainable shards performs a significant function in maximizing the obtainable throughput in a Kinesis information stream. When the partition key getting used is repeated incessantly in a means that some keys are extra frequent than the others, shards storing these data will likely be utilized extra. We additionally get the identical web impact if we use ExplicitHashKey and our logic for selecting the hash secret’s biased in the direction of a subset of shards.

Think about you could have a fleet of internet servers logging efficiency metrics for every internet request served right into a Kinesis information stream with two shards and also you used a request URL because the partition key. Every time a request is served, the appliance makes a name to the PutRecord API carrying a 10-bytes report. Let’s say that you’ve a complete of 10 URLs and every receives 10 requests per second. Below these circumstances, complete throughput required for the workload is 1,000 bytes per second and 100 requests per second. If we assume good distribution of 10 URLs throughout the 2 shards, every shard will obtain 500 bytes per second and 50 requests per second.

Now think about one in all these URLs went viral and it began receiving 1,000 requests per second. Though the scenario is optimistic from a enterprise perspective, you’re now on the point of making customers sad. After the web page gained recognition, you’re now counting 1,040 requests per second for the shard storing the favored URL (1000 + 10 * 4). At this level, you’ll obtain write throughput exceeded errors from that shard. You’re throttled based mostly on the requests per second quota as a result of even with elevated requests, you’re nonetheless producing roughly 11 KB of information.

You’ll be able to remedy this downside both through the use of a UUID for every request because the partition key so that you just share the overall load throughout each shards, or by including extra shards to the Kinesis information stream. The strategy you select depends upon the way you need to devour information. Altering the partition key to a UUID can be problematic if you need efficiency metrics from a given URL to be all the time processed by the identical shopper occasion or if you wish to preserve the order of data on a per-URL foundation.

Understanding the precise reason for write all through exceeded errors is a vital step in remediating them. Within the subsequent sections, we focus on the best way to establish the basis trigger and remediate this downside.

Figuring out the reason for write throughput exceeded errors

Step one in fixing an issue is that understanding that it exists. You should use the WriteProvisionedThrougputExceeded metric in Amazon CloudWatch on this case. You’ll be able to correlate the spikes within the WriteProvisionedThrougputExceeded metric to the IncomingBytes and IncomingRecords metrics to establish whether or not an software is getting throttled as a result of dimension of information or the variety of data written.

Let’s take a look at a couple of exams we carried out in a stream with two shards for example numerous situations. On this occasion, with two shards in our stream, complete throughput obtainable to our producer software is both 2 Mbps or 2,000 data per second.

Within the first check, we ran a producer to jot down batches of 30 data, every being 100 KB, utilizing the PutRecords API. As you possibly can see within the graph on the left of the next determine, our WriteProvisionedThroughputExceedded errors rely went up. The graph on the suitable exhibits that we’re reaching the two Mbps restrict, however our incoming data price is way decrease than the two,000 data per second restrict (Kinesis metrics are revealed at 1-minute intervals, therefore 125.8 and 120,000 as higher limits).Record size based throttling example

The next figures present how the identical three metrics modified after we modified the producer to jot down batches of 500 data, every being 50 bytes, within the second check. This time, we exceeded the two,000 data per second throughput restrict, however our incoming bytes price is nicely underneath the restrict.

Record count based throttling

Now that we all know that downside exists, we should always search for clues to see if we’re exceeding the general throughput obtainable within the stream or if we’re having a scorching shard subject attributable to an imbalanced partition key distribution as mentioned earlier. One strategy to that is to make use of enhanced shard-level metrics. Previous to our exams, we enabled enhanced shard-level metrics, and we are able to see within the following determine that each shards equally reached their quota in our first check.

Enhanced shard level metrics

Now we have seen Kinesis information streams containing 1000’s of shards harnessing the ability of infinite scale in Kinesis information streams. Nonetheless, plotting enhanced shard-level metrics on a such massive stream might not present a straightforward to technique to discover out which shards are over-utilized. In that occasion, it’s higher to make use of CloudWatch Metrics Insights to run queries to view top-n objects, as proven within the following code (modify the LIMIT 5 clause accordingly):

-- Present high 5 shards with highest incoming bytes
SELECT
SUM(IncomingBytes)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

-- Present high 5 shards with highest incoming data
SELECT
SUM(IncomingRecords)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

Enhanced shard-level metrics should not enabled by default. When you didn’t allow them and also you need to carry out root trigger evaluation after an incident, this feature isn’t very useful. As well as, you possibly can solely question the most up-to-date 3 hours of information. Enhanced shard-level metrics additionally incur further prices for CloudWatch metrics and it could be value prohibitive to have it all the time on in information streams with plenty of shards.

One attention-grabbing state of affairs is when the workload is bursty, which might make the ensuing CloudWatch metrics graphs slightly baffling. It is because Kinesis publishes CloudWatch metric information aggregated at 1-minute intervals. Consequently, though you possibly can see write throughput exceeded errors, your incoming bytes/data graphs could also be nonetheless throughout the limits. For example this state of affairs, we modified our check to create a burst of writes exceeding the bounds after which sleep for a couple of seconds. Then we repeated this cycle for a number of minutes to yield the graphs within the following determine, which present write throughput exceeded errors on the left, however the IncomingBytes and IncomingRecords graphs on the suitable appear tremendous.

Effect of one data aggregated at 1-minute intervals

To boost the method of figuring out write throughput exceeded errors, we developed a CLI device known as Kinesis Sizzling Shard Advisor (KHS). With KHS, you possibly can view shard utilization when shard-level metrics should not enabled. That is significantly helpful for investigating a difficulty retrospectively. It will possibly additionally present most incessantly written keys to a specific shard. KHS stories shard utilization by studying data and aggregating them per second intervals based mostly on the ApproximateArrivalTimestamp within the report. Due to this, you may as well perceive shard utilization drivers throughout bursty write workloads.

By working the next command, we are able to get KHS to examine the info that arrived in 1 minute throughout our first check and generate a report:

khs -stream my-data-stream -from "2023-06-22 17:35:00" -to "2023-06-22 17:36:00"

For the given time window, the abstract part within the generated report exhibits the utmost bytes per second price noticed, complete bytes ingested, most data per second noticed, and the overall variety of data ingested for every shard.

KHS report summary

Selecting a shard ID within the first column will show a graph of incoming bytes and data for that shard. That is just like the graph you get in CloudWatch metrics, besides the KHS graph stories on a per-second foundation. As an example, within the following determine, we are able to see how the producer was going by way of a collection of bursty writes adopted by a throttling occasion throughout our check case.

KHS shard level metrics display

Operating the identical command with the -aggregate-key choice allows partition key distribution evaluation. It generates a further graph for every shard displaying the important thing distribution, as proven within the following determine. For our check state of affairs, we are able to solely see every key getting used one time as a result of we used a brand new UUID for every report.

KHS key distribution graph

As a result of KHS stories based mostly on information saved in streams, it creates an enhanced fan-out shopper at startup to forestall utilizing the learn throughput quota obtainable for different shoppers. When the evaluation is full, it deletes that enhanced fan-out shopper.

Due its nature of studying information streams, KHS can switch plenty of information throughout evaluation. As an example, assume you could have a stream with 100 shards. If all of them are absolutely utilized throughout a minute window specified utilizing -from and -to arguments, the host working KHS will obtain at the very least 1 MB * 100 * 60 = 6000 MB = roughly 6 GB information. To keep away from this type of extreme information switch and velocity up the evaluation course of, we advocate first utilizing the WriteProvisionedThroughoutExceeded CloudWatch metric to establish a time interval while you skilled throttling and use a small window (resembling 10 seconds) with KHS. It’s also possible to run KHS in an Amazon Elastic Compute Cloud (Amazon EC2) occasion in the identical AWS Area as your Kinesis information stream to reduce community latency throughout reads.

KHS is designed to run in a single machine to diagnose large-scale workloads. Utilizing a naive in-memory-based counting algorithm (resembling a hash map storing the partition key and rely) for partition key distribution evaluation might simply exhaust the obtainable reminiscence within the host system. Due to this fact, we use a probabilistic information construction known as count-min-sketch to estimate the variety of occasions a key has been used. In consequence, the quantity you see within the report ought to be taken as an approximate worth slightly than an absolute worth. In any case, with this report, we simply need to discover out if there’s an imbalance within the keys written to a shard.

Now that we perceive what causes scorching shards and the best way to establish them, let’s take a look at the best way to cope with this in producer purposes and remediation steps.

Remediation steps

Having producers retry writes is a step in the direction of making our producers resilient to jot down throughput exceeded errors. Contemplate our earlier pattern software logging efficiency metrics information for every internet request served by a fleet of internet servers. When implementing this retry mechanism, it’s best to do not forget that data that aren’t written to the Kinesis stream are going to be in host system’s reminiscence. The primary subject with that is, if the host crashes earlier than the data may very well be written, you’ll expertise information loss. Situations resembling monitoring internet request efficiency information is perhaps extra forgiving for the sort of information loss than situations like monetary transactions. You need to consider sturdiness ensures required on your software and make use of methods to realize them.

The second subject is that data ready to be written to the Kinesis information stream are going to devour the host system’s reminiscence. While you begin getting throttled and have some retry logic in place, it’s best to discover that your reminiscence utilization goes up. A retry mechanism ought to have a technique to keep away from exhausting the host system’s reminiscence.

With the suitable retry logic in place, if you happen to obtain write throughput exceeded errors, you should utilize the strategies we mentioned earlier to establish the trigger. After you establish the basis trigger, you possibly can select the suitable remediation step:

  • If the producer software is exceeding the general stream’s throughput, you possibly can add extra shards to the stream to extend its write throughput capability. When including shards, the Kinesis information stream makes the brand new shards obtainable incrementally, minimizing the time that producers expertise write throughput exceeded errors. So as to add shards to a stream, you should utilize the Kinesis console, the update-shard-count operation within the AWS CLI, the UpdateShardCount API by way of the AWS SDK, or the ShardCount property within the AWS CloudFormation template used to create the stream.
  • If the producer software is exceeding the throughput restrict of some shards (scorching shard subject), decide one of many following choices based mostly on shopper necessities:
    • If locality of information is required (data with the identical partition key are all the time processed by the identical shopper) or an order based mostly on partition secret’s required, use the split-shard operation within the AWS CLI or the SplitShard API within the AWS SDK to separate these shards.
    • If locality or order based mostly on the present partition key will not be required, change the partition key scheme to extend its distribution.
  • If the producer software is exceeding the throughput restrict of a shard attributable to a single partition key (scorching key subject), change the partition key scheme to extend its distribution.

Kinesis Information Streams additionally has an on-demand capability mode. In on-demand capability mode, Kinesis Information Streams mechanically scales streams when wanted. Moreover, you possibly can swap between on-demand and provisioned capability modes with out inflicting an outage. This may very well be significantly helpful while you’re experiencing write throughput exceeded errors however require rapid response to maintain your software obtainable to your customers. In such cases, you possibly can swap a provisioned capability mode information stream to an on-demand information stream and let Kinesis Information Streams deal with the required scale appropriately. You’ll be able to then carry out root trigger evaluation within the background and take corrective actions. Lastly, if obligatory, you possibly can change the capability mode again to provisioned.

Conclusion

You need to now have a stable understanding of the widespread causes of write throughput exceeded errors in Kinesis information streams, the best way to diagnose them, and what actions to take to appropriately cope with them. We hope that this publish will assist you to make your Kinesis Information Streams purposes extra sturdy. In case you are simply beginning with Kinesis Information Streams, we advocate referring to the Developer Information.

When you’ve got any questions or suggestions, please depart them within the feedback part.


In regards to the Authors

Buddhike de Silva is a Senior Specialist Options Architect at Amazon Internet Companies. Buddhike helps prospects run massive scale streaming analytics workloads on AWS and make the very best out of their cloud journey.

Nihar Sheth is a Senior Product Supervisor at Amazon Internet Companies. He’s keen about creating intuitive product experiences that remedy complicated buyer issues and allow prospects to realize their enterprise objectives.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox