{ "cells": [ { "cell_type": "markdown", "source": [ "# TimeStream Quick Start" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "markdown", "source": [ "## Python SDK\n", "\n", "[boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) is the official AWS Python SDK. There are two important submodules:\n", "\n", "- [timestream-write](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/timestream-write.html): manage database / table, write data\n", "- [timestream-query](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/timestream-query.html): query the data" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 33, "outputs": [], "source": [ "import boto3\n", "\n", "boto_ses = boto3.session.Session()\n", "ts_write = boto_ses.client(\"timestream-write\")\n", "ts_query = boto_ses.client(\"timestream-query\")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "## Create Database and Table\n", "\n", "[EN]\n", "\n", "AWS TimeStream DB is a CLOUD NATIVE time series database. It is based on fully decoupled data ingestion, storage, query processing architect that can scale independently with very small latency. Since it is a fully managed database, you don't need to provision any VM and Hardware resource to create a database or table. Basically you could create a database and ready to use in seconds.\n", "\n", "[CN]\n", "\n", "AWS TimeStream DB 是云原生数据库, 使用的是 存, 算 完全分离的架构, 这样可以实现几乎无延迟无限制的伸缩. 而作为一个云原生的产品, 创建 Database 和 Table 完全不需要临时启用机器和集群, 所以你可以随时创建和删除, 随时使用.\n", "\n", "TimeStream DB 是一个时序数据库, 不是 Schema Enforce 的, 只有 Data Model 的概念. 所以你创建表的时候无需指定 Schema. 这就跟 Data Lake 中用 Parquet 文件来存储数据的思路差不多, 虽然没有固定的 Schema, 但我们组织数据的时候还是有 Data Model 的.\n", "\n", "下面的代码给出了如何检查 Database / Table 是否存在, 以及创建它们的示例." ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 34, "outputs": [ { "data": { "text/plain": "TimeStream database \u001B[32m'timestream-quick-start'\u001B[0m already exist, do nothing\n", "text/html": "
TimeStream database 'timestream-quick-start' already exist, do nothing\n
\n" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": "TimeStream table \u001B[32m'timestream-quick-start'\u001B[0m.\u001B[32m'weather-sensor'\u001B[0m already exist, do nothing\n", "text/html": "
TimeStream table 'timestream-quick-start'.'weather-sensor' already exist, do nothing\n
\n" }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from rich import print\n", "\n", "database = \"timestream-quick-start\"\n", "table = \"weather-sensor\"\n", "\n", "\n", "def is_database_exists(database: str) -> bool:\n", " try:\n", " ts_write.describe_database(\n", " DatabaseName=database,\n", " )\n", " return True\n", " except Exception as e:\n", " if str(e.__class__.__name__) == \"ResourceNotFoundException\":\n", " return False\n", " else:\n", " raise e\n", "\n", "\n", "if is_database_exists(database) is False:\n", " print(f\"TimeStream database {database!r} not exist, create one ...\")\n", " res = ts_write.create_database(\n", " DatabaseName=database,\n", " )\n", "else:\n", " print(f\"TimeStream database {database!r} already exist, do nothing\")\n", "\n", "\n", "def is_table_exists(database: str, table: str) -> bool:\n", " try:\n", " ts_write.describe_table(\n", " DatabaseName=database,\n", " TableName=table,\n", " )\n", " return True\n", " except Exception as e:\n", " if str(e.__class__.__name__) == \"ResourceNotFoundException\":\n", " return False\n", " else:\n", " raise e\n", "\n", "\n", "if is_table_exists(database, table) is False:\n", " print(f\"TimeStream table {database!r}.{table!r} not exist, create one ...\")\n", " res = ts_write.create_table(\n", " DatabaseName=database,\n", " TableName=table,\n", " RetentionProperties=dict(\n", " MemoryStoreRetentionPeriodInHours=1,\n", " MagneticStoreRetentionPeriodInDays=1,\n", " )\n", " )\n", "else:\n", " print(f\"TimeStream table {database!r}.{table!r} already exist, do nothing\")\n" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "# Data Generator\n", "\n", "下面的例子里我们假设一个气象数据公司有 5 个传感器位于美国的四个角和中部. 不断地收集着温度和湿度数据.\n", "\n", "我们实现了一个 ``put_records`` 函数. 每次运行该函数, 随机选择一个传感器, 生成 10 条观测数据, 每条观测数据间隔 100 毫秒, 总共时间区间为 1 秒. 之后我们可以用一个 Python 脚本无限循环, 每 1 秒运行一次这个函数来模拟数据采集过程. 这里有一个 ``timestream-data-generator.py`` Python 脚本, 我们可以在后台运行这个程序, 然后回到这个 Jupyter Notebook 来运行一些 Query.\n", "\n", "注意事项:\n", "\n", "- Dimension 的 Value 的数据类型只能是 ``VARCHAR``. 如果你的数值是 ``BIGINT``, ``DOUBLE`` 之类的数值型, 而你又有可能用到大于小于之类的比较计算符, 那么你需要自行将原始数据转化成 ``VARCHAR`` 并且确保他们定长和有序, 这样你在查询中才可以对 Dimension 中的数据进行比较查询. 如果你不需要这些数值型的 Dimension 参与筛选数据, 而只是作为一个参考, 那么你可以跳过这一步.\n", "- 所有的 Time 都是代表着整数的字符串, 含义是从 1970-01-01 的 EPOCH 开始的 秒数 (或 微秒, 毫秒, 纳秒).\n", "- 在 ``write_records`` API 中所有的数值都要被编码为 String." ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 36, "outputs": [], "source": [ "import time\n", "import random\n", "from datetime import datetime, timezone\n", "\n", "EPOCH = datetime(1970, 1, 1)\n", "\n", "\n", "def utc_now() -> str:\n", " time.sleep(random.randint(50, 150) / 1000)\n", " return str(int((datetime.utcnow() - EPOCH).total_seconds() * 1000))\n", "\n", "\n", "class DataTypeEnum:\n", " DOUBLE = \"DOUBLE\"\n", " BIGINT = \"BIGINT\"\n", " VARCHAR = \"VARCHAR\"\n", " BOOLEAN = \"BOOLEAN\"\n", " TIMESTAMP = \"TIMESTAMP\"\n", " MULTI = \"MULTI\"\n", "\n", "\n", "class FieldEnum:\n", " temperature = \"temperature\"\n", " humidity = \"humidity\"\n", "\n", "\n", "# 定义 5 个传感器的 Dimension Data\n", "device_list = [\n", " dict(\n", " device_id=\"device-KS\",\n", " device_lat=\"039.045167\",\n", " device_lng=\"-094.580552\",\n", " ),\n", " dict(\n", " device_id=\"device-WA\",\n", " device_lat=\"047.516842\",\n", " device_lng=\"-120.556755\",\n", " ),\n", " dict(\n", " device_id=\"device-CA\",\n", " device_lat=\"037.351811\",\n", " device_lng=\"-119.870587\",\n", " ),\n", " dict(\n", " device_id=\"device-NY\",\n", " device_lat=\"042.965073\",\n", " device_lng=\"-075.073632\",\n", " ),\n", " dict(\n", " device_id=\"device-FL\",\n", " device_lat=\"028.049414\",\n", " device_lng=\"-081.641238\",\n", " ),\n", "]\n", "\n", "device_dimension_list = [\n", " [\n", " dict(\n", " Name=key,\n", " Value=value,\n", " DimensionValueType=DataTypeEnum.VARCHAR,\n", " )\n", " for key, value in data.items()\n", " ]\n", " for data in device_list\n", "]\n", "\n", "\n", "def put_records():\n", " \"\"\"\n", " 每次运行该函数, 随机选择一个传感器, 生成 10 条观测数据, 每条观测数据间隔 100 毫秒, 总共时间区间为 1 秒.\n", " \"\"\"\n", " try:\n", " res = ts_write.write_records(\n", " DatabaseName=database,\n", " TableName=table,\n", " CommonAttributes=dict(\n", " Dimensions=random.choice(device_dimension_list),\n", " ),\n", " Records=[\n", " dict(\n", " Time=utc_now(),\n", " TimeUnit=\"MILLISECONDS\",\n", " MeasureName=\"observation\",\n", " MeasureValueType=DataTypeEnum.MULTI,\n", " MeasureValues=[\n", " dict(\n", " Name=FieldEnum.temperature,\n", " Value=str(random.randint(32, 102)),\n", " Type=DataTypeEnum.BIGINT,\n", " ),\n", " dict(\n", " Name=FieldEnum.humidity,\n", " Value=str(random.randint(20, 80) / 100),\n", " Type=DataTypeEnum.DOUBLE,\n", " )\n", " ]\n", " )\n", " for _ in range(10)\n", " ]\n", " )\n", " print(res)\n", " except ts_write.exceptions.RejectedRecordsException as err:\n", " print(\"RejectedRecords: \", err)\n", " for rr in err.response[\"RejectedRecords\"]:\n", " print(\"Rejected Index \" + str(rr[\"RecordIndex\"]) + \": \" + rr[\"Reason\"])\n", " print(\"Other records were written successfully. \")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "## Sample Query\n", "\n", "下面我们提供了两个用 SQL 进行查询的例子:\n", "\n", "1. 根据 Dimension 和 Measurement 用比较条件进行筛选\n", "2. 根据 DeviceId 做聚合查询, 计算区间内的温度最大值和湿度最小值" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "code", "execution_count": 38, "outputs": [ { "data": { "text/plain": " device_lat device_id device_lng measure_name \\\n0 037.351811 device-CA -119.870587 observation \n1 037.351811 device-CA -119.870587 observation \n2 047.516842 device-WA -120.556755 observation \n3 047.516842 device-WA -120.556755 observation \n4 047.516842 device-WA -120.556755 observation \n5 047.516842 device-WA -120.556755 observation \n6 047.516842 device-WA -120.556755 observation \n7 047.516842 device-WA -120.556755 observation \n8 047.516842 device-WA -120.556755 observation \n9 037.351811 device-CA -119.870587 observation \n\n time temperature humidity \n0 2022-08-13 18:21:44.477000000 91 0.69 \n1 2022-08-13 18:21:44.233000000 90 0.75 \n2 2022-08-13 18:21:42.718000000 93 0.33 \n3 2022-08-13 18:21:42.036000000 90 0.3 \n4 2022-08-13 18:21:21.422000000 101 0.66 \n5 2022-08-13 18:21:21.131000000 96 0.22 \n6 2022-08-13 18:21:11.410000000 101 0.21 \n7 2022-08-13 18:21:10.459000000 91 0.37 \n8 2022-08-13 18:21:10.393000000 95 0.76 \n9 2022-08-13 18:21:08.616000000 90 0.3 ", "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
device_latdevice_iddevice_lngmeasure_nametimetemperaturehumidity
0037.351811device-CA-119.870587observation2022-08-13 18:21:44.477000000910.69
1037.351811device-CA-119.870587observation2022-08-13 18:21:44.233000000900.75
2047.516842device-WA-120.556755observation2022-08-13 18:21:42.718000000930.33
3047.516842device-WA-120.556755observation2022-08-13 18:21:42.036000000900.3
4047.516842device-WA-120.556755observation2022-08-13 18:21:21.4220000001010.66
5047.516842device-WA-120.556755observation2022-08-13 18:21:21.131000000960.22
6047.516842device-WA-120.556755observation2022-08-13 18:21:11.4100000001010.21
7047.516842device-WA-120.556755observation2022-08-13 18:21:10.459000000910.37
8047.516842device-WA-120.556755observation2022-08-13 18:21:10.393000000950.76
9037.351811device-CA-119.870587observation2022-08-13 18:21:08.616000000900.3
\n
" }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "\n", "\n", "def run_query(query_str, limit=20) -> pd.DataFrame:\n", " res = ts_query.query(\n", " QueryString=query_str,\n", " MaxRows=limit,\n", " )\n", " columns = [dct[\"Name\"] for dct in res[\"ColumnInfo\"]]\n", " rows = [\n", " [\n", " dct[\"ScalarValue\"]\n", " for dct in row[\"Data\"]\n", " ]\n", " for row in res[\"Rows\"]\n", " ]\n", " df = pd.DataFrame(rows, columns=columns)\n", " return df\n", "\n", "# 获得 15 分钟以内, 位于美国西部 (经度小于 -100), 温度大于 88 F 的观测值\n", "query_str = \"\"\"\n", "SELECT *\n", "FROM \"timestream-quick-start\".\"weather-sensor\" t\n", "WHERE\n", " t.time between ago(15m) and now()\n", " AND t.device_lng >= '-100.000000'\n", " AND t.temperature >= 88\n", "ORDER BY time DESC\n", "LIMIT 10\n", "\"\"\"\n", "\n", "run_query(query_str)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 51, "outputs": [ { "data": { "text/plain": " device_id _col1 _col2\n0 device-FL 55 0.65\n1 device-WA 97 0.25\n2 device-CA 92 0.2\n3 device-KS 101 0.22", "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
device_id_col1_col2
0device-FL550.65
1device-WA970.25
2device-CA920.2
3device-KS1010.22
\n
" }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 获得 10 秒 以内, 每个传感器观测到的温度最大值和湿度最小值\n", "query_str = \"\"\"\n", "SELECT\n", " t.device_id,\n", " max(t.temperature),\n", " min(t.humidity)\n", "FROM \"timestream-quick-start\".\"weather-sensor\" t\n", "WHERE\n", " t.time between ago(10s) and now()\n", "GROUP BY t.device_id\n", "\"\"\"\n", "\n", "run_query(query_str)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.6" } }, "nbformat": 4, "nbformat_minor": 0 }