TimeStream Quick Start

Python SDK

boto3 is the official AWS Python SDK. There are two important submodules:

[33]:
import boto3

boto_ses = boto3.session.Session()
ts_write = boto_ses.client("timestream-write")
ts_query = boto_ses.client("timestream-query")

Create Database and Table

[EN]

AWS TimeStream DB is a CLOUD NATIVE time series database. It is based on fully decoupled data ingestion, storage, query processing architect that can scale independently with very small latency. Since it is a fully managed database, you don’t need to provision any VM and Hardware resource to create a database or table. Basically you could create a database and ready to use in seconds.

[CN]

AWS TimeStream DB 是云原生数据库, 使用的是 存, 算 完全分离的架构, 这样可以实现几乎无延迟无限制的伸缩. 而作为一个云原生的产品, 创建 Database 和 Table 完全不需要临时启用机器和集群, 所以你可以随时创建和删除, 随时使用.

TimeStream DB 是一个时序数据库, 不是 Schema Enforce 的, 只有 Data Model 的概念. 所以你创建表的时候无需指定 Schema. 这就跟 Data Lake 中用 Parquet 文件来存储数据的思路差不多, 虽然没有固定的 Schema, 但我们组织数据的时候还是有 Data Model 的.

下面的代码给出了如何检查 Database / Table 是否存在, 以及创建它们的示例.

[34]:
from rich import print

database = "timestream-quick-start"
table = "weather-sensor"


def is_database_exists(database: str) -> bool:
    try:
        ts_write.describe_database(
            DatabaseName=database,
        )
        return True
    except Exception as e:
        if str(e.__class__.__name__) == "ResourceNotFoundException":
            return False
        else:
            raise e


if is_database_exists(database) is False:
    print(f"TimeStream database {database!r} not exist, create one ...")
    res = ts_write.create_database(
        DatabaseName=database,
    )
else:
    print(f"TimeStream database {database!r} already exist, do nothing")


def is_table_exists(database: str, table: str) -> bool:
    try:
        ts_write.describe_table(
            DatabaseName=database,
            TableName=table,
        )
        return True
    except Exception as e:
        if str(e.__class__.__name__) == "ResourceNotFoundException":
            return False
        else:
            raise e


if is_table_exists(database, table) is False:
    print(f"TimeStream table {database!r}.{table!r} not exist, create one ...")
    res = ts_write.create_table(
        DatabaseName=database,
        TableName=table,
        RetentionProperties=dict(
            MemoryStoreRetentionPeriodInHours=1,
            MagneticStoreRetentionPeriodInDays=1,
        )
    )
else:
    print(f"TimeStream table {database!r}.{table!r} already exist, do nothing")

TimeStream database 'timestream-quick-start' already exist, do nothing
TimeStream table 'timestream-quick-start'.'weather-sensor' already exist, do nothing

Data Generator

下面的例子里我们假设一个气象数据公司有 5 个传感器位于美国的四个角和中部. 不断地收集着温度和湿度数据.

我们实现了一个 put_records 函数. 每次运行该函数, 随机选择一个传感器, 生成 10 条观测数据, 每条观测数据间隔 100 毫秒, 总共时间区间为 1 秒. 之后我们可以用一个 Python 脚本无限循环, 每 1 秒运行一次这个函数来模拟数据采集过程. 这里有一个 timestream-data-generator.py Python 脚本, 我们可以在后台运行这个程序, 然后回到这个 Jupyter Notebook 来运行一些 Query.

注意事项:

  • Dimension 的 Value 的数据类型只能是 VARCHAR. 如果你的数值是 BIGINT, DOUBLE 之类的数值型, 而你又有可能用到大于小于之类的比较计算符, 那么你需要自行将原始数据转化成 VARCHAR 并且确保他们定长和有序, 这样你在查询中才可以对 Dimension 中的数据进行比较查询. 如果你不需要这些数值型的 Dimension 参与筛选数据, 而只是作为一个参考, 那么你可以跳过这一步.

  • 所有的 Time 都是代表着整数的字符串, 含义是从 1970-01-01 的 EPOCH 开始的 秒数 (或 微秒, 毫秒, 纳秒).

  • write_records API 中所有的数值都要被编码为 String.

[36]:
import time
import random
from datetime import datetime, timezone

EPOCH = datetime(1970, 1, 1)


def utc_now() -> str:
    time.sleep(random.randint(50, 150) / 1000)
    return str(int((datetime.utcnow() - EPOCH).total_seconds() * 1000))


class DataTypeEnum:
    DOUBLE = "DOUBLE"
    BIGINT = "BIGINT"
    VARCHAR = "VARCHAR"
    BOOLEAN = "BOOLEAN"
    TIMESTAMP = "TIMESTAMP"
    MULTI = "MULTI"


class FieldEnum:
    temperature = "temperature"
    humidity = "humidity"


# 定义 5 个传感器的 Dimension Data
device_list = [
    dict(
        device_id="device-KS",
        device_lat="039.045167",
        device_lng="-094.580552",
    ),
    dict(
        device_id="device-WA",
        device_lat="047.516842",
        device_lng="-120.556755",
    ),
    dict(
        device_id="device-CA",
        device_lat="037.351811",
        device_lng="-119.870587",
    ),
    dict(
        device_id="device-NY",
        device_lat="042.965073",
        device_lng="-075.073632",
    ),
    dict(
        device_id="device-FL",
        device_lat="028.049414",
        device_lng="-081.641238",
    ),
]

device_dimension_list = [
    [
        dict(
            Name=key,
            Value=value,
            DimensionValueType=DataTypeEnum.VARCHAR,
        )
        for key, value in data.items()
    ]
    for data in device_list
]


def put_records():
    """
    每次运行该函数, 随机选择一个传感器, 生成 10 条观测数据, 每条观测数据间隔 100 毫秒, 总共时间区间为 1 秒.
    """
    try:
        res = ts_write.write_records(
            DatabaseName=database,
            TableName=table,
            CommonAttributes=dict(
                Dimensions=random.choice(device_dimension_list),
            ),
            Records=[
                dict(
                    Time=utc_now(),
                    TimeUnit="MILLISECONDS",
                    MeasureName="observation",
                    MeasureValueType=DataTypeEnum.MULTI,
                    MeasureValues=[
                        dict(
                            Name=FieldEnum.temperature,
                            Value=str(random.randint(32, 102)),
                            Type=DataTypeEnum.BIGINT,
                        ),
                        dict(
                            Name=FieldEnum.humidity,
                            Value=str(random.randint(20, 80) / 100),
                            Type=DataTypeEnum.DOUBLE,
                        )
                    ]
                )
                for _ in range(10)
            ]
        )
        print(res)
    except ts_write.exceptions.RejectedRecordsException as err:
        print("RejectedRecords: ", err)
        for rr in err.response["RejectedRecords"]:
            print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
        print("Other records were written successfully. ")

Sample Query

下面我们提供了两个用 SQL 进行查询的例子:

  1. 根据 Dimension 和 Measurement 用比较条件进行筛选

  2. 根据 DeviceId 做聚合查询, 计算区间内的温度最大值和湿度最小值

[38]:
import pandas as pd


def run_query(query_str, limit=20) -> pd.DataFrame:
    res = ts_query.query(
        QueryString=query_str,
        MaxRows=limit,
    )
    columns = [dct["Name"] for dct in res["ColumnInfo"]]
    rows = [
        [
            dct["ScalarValue"]
            for dct in row["Data"]
        ]
        for row in res["Rows"]
    ]
    df = pd.DataFrame(rows, columns=columns)
    return df

# 获得 15 分钟以内, 位于美国西部 (经度小于 -100), 温度大于 88 F 的观测值
query_str = """
SELECT *
FROM "timestream-quick-start"."weather-sensor" t
WHERE
    t.time between ago(15m) and now()
    AND t.device_lng >= '-100.000000'
    AND t.temperature >= 88
ORDER BY time DESC
LIMIT 10
"""

run_query(query_str)
[38]:
device_lat device_id device_lng measure_name time temperature humidity
0 037.351811 device-CA -119.870587 observation 2022-08-13 18:21:44.477000000 91 0.69
1 037.351811 device-CA -119.870587 observation 2022-08-13 18:21:44.233000000 90 0.75
2 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:42.718000000 93 0.33
3 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:42.036000000 90 0.3
4 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:21.422000000 101 0.66
5 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:21.131000000 96 0.22
6 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:11.410000000 101 0.21
7 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:10.459000000 91 0.37
8 047.516842 device-WA -120.556755 observation 2022-08-13 18:21:10.393000000 95 0.76
9 037.351811 device-CA -119.870587 observation 2022-08-13 18:21:08.616000000 90 0.3
[51]:
# 获得 10 秒 以内, 每个传感器观测到的温度最大值和湿度最小值
query_str = """
SELECT
    t.device_id,
    max(t.temperature),
    min(t.humidity)
FROM "timestream-quick-start"."weather-sensor" t
WHERE
    t.time between ago(10s) and now()
GROUP BY t.device_id
"""

run_query(query_str)
[51]:
device_id _col1 _col2
0 device-FL 55 0.65
1 device-WA 97 0.25
2 device-CA 92 0.2
3 device-KS 101 0.22
[ ]: