Kinesis Firehose Delivery Stream Transformation

Keywords: AWS Kinesis, Firehose, Delivery Stream, Lambda, Transform Transformation.

我们来看一个典型的实时数据流 Transformation 案例:

  1. 数据以 Batch 的形式, 用 put_records API 发送给 Kinesis Data Stream

  2. Kinesis Firehose Delivery Stream 订阅了 Kinesis Data Stream, 收到了数据

  3. Firehose 触发了用于做 Transformation 的 AWS Lambda, Lambda 接受到了数据

  4. Lambda 对数据进行了 Transformation, 并整理成了 Firehose Programming Model 认识的格式

  5. Firehose 将处理好的数据 dump 到 S3 或是其他的 Destination

这里我们重点研究下 #1, #3, #4 三步, 这三步你需要在 SDK 中准备好数据结构和格式, 格式不对前后是接不起来的.

1. Raw Data

原始数据用 Python Dictionary 的形式保存. 一条数据就是一个 Dict.

{"id": "1", "value": random.randint(1, 100)}
{"id": "2", "value": random.randint(1, 100)}
{"id": "3", "value": random.randint(1, 100)}

put_records 把数据发送给 Data Stream:

# -*- coding: utf-8 -*-

import boto3
import json
import random
from rich import print as rprint

boto_ses = boto3.session.Session()
k_client = boto_ses.client("kinesis")

stream_name = "sanhe-dev"

# Ref: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
raw_records = [
    {"id": str(i), "value": random.randint(1, 100)}
    for i in range(21, 21+10)
]

kin_records = [
    dict(
        Data=(json.dumps(raw_record) + "\n").encode("utf-8"),
        PartitionKey=raw_record["id"],
    )
    for raw_record in raw_records
]

response = k_client.put_records(
    Records=kin_records,
    StreamName=stream_name,
)

rprint(response)

3, 4. Transformation Lambda Function

# -*- coding: utf-8 -*-

"""
The lambda function that transform records pass through kinesis firehouse
"""

import json
import base64


def lambda_handler(event, context):
    """
    ``event`` example::

        {
            "invocationId": "invocationIdExample",
            "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
            "region": "us-east-1",
            "records": [
                {
                    "recordId": "49546986683135544286507457936321625675700192471156785154",
                    "approximateArrivalTimestamp": 1495072949453,
                    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
                },
                ...
            ]
        }

    return example::

        {
            "records": [
                {
                    "recordId": "49546986683135544286507457936321625675700192471156785154",
                    "result": "OK",
                    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
                }
            ]
        }
    """
    output = []

    for record in event["records"]:
        print(record["recordId"])
        # convert the data back to raw data
        raw_record = json.loads(base64.b64decode(record["data"].encode("utf-8")))

        # Do custom processing on the payload here
        transformed_record = raw_record
        transformed_record["value"] = transformed_record["value"] * 100

        # convert transformed data to output
        output_record = {
            "recordId": record["recordId"],
            "result": "Ok", # "OK" | "Dropped" | "ProcessingFailed"
            "data": base64.b64encode((json.dumps(transformed_record) + "\n").encode('utf-8'))
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {"records": output}

5. Data in Destination

最终 Delivery Stream 会把每个 record 恢复成 binary, 然后连续拼接在一起, 最终形成一个大的 binary 文件.

由于我们这里的数据其实是 JSON encoded string, 所以最后的文件可以被解读为 pure text json file.

{"id": "1", "value": 3100}{"id": "2", "value": 2800}{"id": "3", "value": 4200}

Summary

Serialization Protocol

Kinesis 用的是 binary protocol. 虽然我们可以用 json string protocol, 但是在转换过程中会有效率问题, 例如 binary 数据你要用 json serialize 就得先转化成 base64 encode. 然后 dump 之后变成了 binary, 又要经过一次 base64 encode. 这样会造成较大的资源浪费. 如果能用基于 binary 的 serialization protocol, 例如 pickle, google 的 protocol buffer / flatbuffer 等格式, 效率会更高, 不过编程起来会麻烦一点.