Kinesis - Path to Master¶
Keywords: AWS Kinesis
Basic¶
Kinesis Architect¶
Kinesis vs Kafka¶
Advance¶
Throughput Optimization¶
Optimize Producer - Choose the right Partition Key¶
choose the key high cardinality, like order id, device id if N >> n_shard
if you have to use low cardinality key for strong ordering purpose, you can append suffix
if ordering doesn’t matter, you can append random suffix
Optimize Producer - Batch¶
without batch:
events = [
{"device_id": 1, "time": "2022-01-01 00:01:00", "measurement": "temperature", "value": 76.1},
{"device_id": 1, "time": "2022-01-01 00:02:00", "measurement": "temperature", "value": 76.2},
{"device_id": 1, "time": "2022-01-01 00:03:00", "measurement": "temperature", "value": 76.3},
...
]
# many API call
for event in events:
kinesis_client.put_record(
...
Data=json.dumps(event).encode("utf-8"),
PartitionKey=event["device_id"],
)
with batch:
# single API call
kinesis_client.put_records(
...
Records=[
{
"Data": json.dumps(event).encode("utf-8"),
"PartitionKey": event["device_id"],
}
for event in events
],
)
Optimize Producer - Aggregation¶
without aggregation:
temperature_measurement_events = ...
humidity_measurement_events = ...
...
kinesis_client.put_records(
...
Records=[
{
"Data": json.dumps(event).encode("utf-8"),
"PartitionKey": event["device_id"],
}
for event in temperature_measurement_events
],
)
kinesis_client.put_records(
...
Records=[
{
"Data": json.dumps(event).encode("utf-8"),
"PartitionKey": event["device_id"],
}
for event in humidity_measurement_events
],
)
with aggregation:
import gzip
kinesis_client.put_records(
...
Records=[
{
"Data": gzip.compress(json.dumps(temperature_measurement_events).encode("utf-8")),
"PartitionKey": temperature_measurement_events[0]["device_id"],
},
{
"Data": gzip.compress(json.dumps(humidity_measurement_events).encode("utf-8")),
"PartitionKey": humidity_measurement_events[0]["device_id"],
},
...
],
)
Optimize Consumer - Slow Consumer¶
Trade off: you have to ensure “atomic” for each read
Optimize Consumer - Aggregation First¶
Aggregate many records into one consumption action. Example: each event is a data change event of Dynamodb stream (update event), you can merge before writing to target database.
Multi Tenant¶
What is Multi Tenant in the Context of Message Streaming Middleware System:
Why Multi Tenant:
smooth the traffic.
n_tenant is too large, it is impossible to maintain an independent infrastructure for each tenant.
Multi Tenant Challenge:
one tenant’s consumer should not consume data owned by other consumer
one tenant’s slow consumer should not impact the entire system
Kinesis Quota:
There is no upper quota on the number of streams with the provisioned mode that you can have in an account.
The default shard quota is 500 shards per AWS account for the following AWS regions: US East (N. Virginia), US West (Oregon), and Europe (Ireland). For all other regions, the default shard quota is 200 shards per AWS account. This limit is only applicable for data streams with the provisioned capacity mode.
Solutions:
- Per Tenant Stream
- pro:
physical isolation
easy to add / remove tenant
producer / consumer of different tenants can be maintained and deployed independently
- con:
read / write IO waste
cannot handle large number of tenant
- use case:
all tenant has small traffic, or big but predictable traffic
n tenant is not too big >= 500
- Per Tenant Consumer
- pro:
producer / consumer of different tenants can be maintained and deployed independently
- con:
read IO waste
- get records API has 5 TPS limit
you cannot have >= 5 consumer read at the same time
if you have 300 consumer, then the buffer time (delay time) is 300 / 5 = 60
cannot fully utilize “aggregation”
use case:
- One Stream, One Consumer, logical isolation
- pro:
no waste
easy to scale for both producer / consumer
- con:
stickiness of the code
update for one tenant “May” have global impact
cannot fully utilize “aggregation”
Reference:
Kinesis Quota and Limit: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
Stream Scalability¶
Failure Handling¶
Keys:
Store shad iterator in Dynamodb
Store failed records in dead-letter kinesis stream
Persist failed data in s3
Copy records in dead-letter stream back to app stream after fix