Kinesis Firehose Delivery Stream Transformation¶
Keywords: AWS Kinesis, Firehose, Delivery Stream, Lambda, Transform Transformation.
我们来看一个典型的实时数据流 Transformation 案例:
数据以 Batch 的形式, 用 put_records API 发送给 Kinesis Data Stream
Kinesis Firehose Delivery Stream 订阅了 Kinesis Data Stream, 收到了数据
Firehose 触发了用于做 Transformation 的 AWS Lambda, Lambda 接受到了数据
Lambda 对数据进行了 Transformation, 并整理成了 Firehose Programming Model 认识的格式
Firehose 将处理好的数据 dump 到 S3 或是其他的 Destination
这里我们重点研究下 #1, #3, #4 三步, 这三步你需要在 SDK 中准备好数据结构和格式, 格式不对前后是接不起来的.
1. Raw Data¶
原始数据用 Python Dictionary 的形式保存. 一条数据就是一个 Dict.
{"id": "1", "value": random.randint(1, 100)}
{"id": "2", "value": random.randint(1, 100)}
{"id": "3", "value": random.randint(1, 100)}
用 put_records
把数据发送给 Data Stream:
# -*- coding: utf-8 -*-
import boto3
import json
import random
from rich import print as rprint
boto_ses = boto3.session.Session()
k_client = boto_ses.client("kinesis")
stream_name = "sanhe-dev"
# Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
raw_records = [
{"id": str(i), "value": random.randint(1, 100)}
for i in range(21, 21+10)
]
kin_records = [
dict(
Data=(json.dumps(raw_record) + "\n").encode("utf-8"),
PartitionKey=raw_record["id"],
)
for raw_record in raw_records
]
response = k_client.put_records(
Records=kin_records,
StreamName=stream_name,
)
rprint(response)
3, 4. Transformation Lambda Function¶
# -*- coding: utf-8 -*-
"""
The lambda function that transform records pass through kinesis firehouse
"""
import json
import base64
def lambda_handler(event, context):
"""
``event`` example::
{
"invocationId": "invocationIdExample",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"region": "us-east-1",
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
},
...
]
}
return example::
{
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"result": "OK",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
}
]
}
"""
output = []
for record in event["records"]:
print(record["recordId"])
# convert the data back to raw data
raw_record = json.loads(base64.b64decode(record["data"].encode("utf-8")))
# Do custom processing on the payload here
transformed_record = raw_record
transformed_record["value"] = transformed_record["value"] * 100
# convert transformed data to output
output_record = {
"recordId": record["recordId"],
"result": "Ok", # "OK" | "Dropped" | "ProcessingFailed"
"data": base64.b64encode((json.dumps(transformed_record) + "\n").encode('utf-8'))
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {"records": output}
5. Data in Destination¶
最终 Delivery Stream 会把每个 record 恢复成 binary, 然后连续拼接在一起, 最终形成一个大的 binary 文件.
由于我们这里的数据其实是 JSON encoded string, 所以最后的文件可以被解读为 pure text json file.
{"id": "1", "value": 3100}{"id": "2", "value": 2800}{"id": "3", "value": 4200}
Summary¶
Serialization Protocol
Kinesis 用的是 binary protocol. 虽然我们可以用 json string protocol, 但是在转换过程中会有效率问题, 例如 binary 数据你要用 json serialize 就得先转化成 base64 encode. 然后 dump 之后变成了 binary, 又要经过一次 base64 encode. 这样会造成较大的资源浪费. 如果能用基于 binary 的 serialization protocol, 例如 pickle, google 的 protocol buffer / flatbuffer 等格式, 效率会更高, 不过编程起来会麻烦一点.