Run Remote Command on Ec2 Ultimate Solution

Keywords: AWS, EC2, System Manager, SSM, Python, Remote, Command

需求分析

出于多种原因, 往往是网络相关的原因, 很多时候代码必须要在 EC2 环境内执行. 而作为开发者, 如何像在本地电脑上执行 Python 自动化脚本一样在 EC2 环境内执行命令呢? 如果能做到这一点, 想象空间可以是无限的. 下面我们详细的拆解一下需求:

从具体执行的命令复杂度来看, 可以分为两类:

  1. 单条在 Terminal 内的命令. 例如 aws s3 ls.

  2. 以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.

从对反馈的要求来看, 可以分为三类:

  1. 我只需要执行, 不需要反馈.

  2. 我需要知道执行的返回码是成功 (0) 还是失败 (非 0).

  3. 我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据.

从命令的发起者来看,

  1. 只需要我的开发电脑能发起命令即可.

  2. 这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.

可以看出, 以上需求可以排列组合, 从而出现 2 * 3 * 2 = 12 种情况. 有没有一种解决方案能够同时满足这 12 种情况呢? 答案是肯定的, 我们将在下面的章节中详细的介绍.

探索可能的解决方案

我们对上面的需求来一条一条的分析, 看看这些需求后面的本质.

  • 单条在 Terminal 内的命令. 例如 aws s3 ls.

    这个没什么说的, 就是一条远程命令.

  • 以 Python 脚本形式存在的命令, 具体的命令的逻辑在 Python 脚本中被定义的. 这个脚本并不是事先准备好的, 换言之, 在执行脚本前我们要现将脚本上传到 EC2 环境内.

    这就意味着我们总得有一个简单, 可重复, 安全的方法将任意脚本上传到 EC2 环境内.

  • 我只需要执行, 不需要反馈

    这个没什么说的, 简单执行即可.

  • 我需要知道执行的返回码是成功 (0) 还是失败 (非 0)

    这就需要我们能捕获错误码 (return code)

  • 我不仅需要知道执行状态, 这个命令可能还会返回一些数据, 我还需要知道这个数据

    要么这个命令本身的设计就是会把返回数据写到 stdout, 那么我们只要能捕获 stdout 即可. 要么在运行时将数据上传到一个中间媒介, 例如 S3, 然后我们再从 S3 读取数据.

  • 只需要我的开发电脑能发起命令即可

    要么我的电脑能 SSH 到 EC2 上去. 要么我的电脑有一些相关的 AWS 权限. 这里的权限主要指的是 AWS System Manager Run Command 的权限. 这是一个 AWS 托管的服务器, 可以利用 SSM Agent 在 EC2 上执行任何命令.

  • 这个命令需要能被任何有权限的地方发起, 例如另一台 EC2, 一个 Lambda.

    这个发起方只要有上面说的 AWS System Manager Run Command 权限即可. 当然开发电脑也可以有这个权限.

好了, 我们现在对解决每一条需求都有个大概的概念了, 下一步我们来将这些方案组合成一个完整的解决方案. 但在这之前, 我们先来了解一下这里的核心技术 AWS SSM Run Command.

AWS SSM Run Command

AWS System Manager 是一个历史悠久的 AWS 服务, 主要用于批量管理 EC2 instance 虚拟机. 你可以将其理解为 AWS 版本的 Ansible. 而它的核心组件就是 System Manager Agent (SSM Agent), 本质上是一个服务端软件, 安装在 EC2 机器上, 属于系统服务的一部分. 而 AWS 内部对 EC2 的管理工作很多都是通过 SSM Agent 来进行的. 而”Run Command” 则是 SSM 的一项功能, 可以通过 SSM Agent 执行远程命令.

简单来说我们选择 SSM Run Command 作为我们解决方案的核心技术是出于以下几点考量:

  • SSM Run Command 是受 IAM Role 权限保护的, 非常安全且灵活, 兼容于各种 AWS 服务, 使得我们可以在任何 AWS 服务内发起 SSM Run Command.

  • SSM Run Command 功能 免费, 且支持非常高的并发量.

  • SSM Run Command 可以捕获 Return Code, Stdout, Stderr, 使得我们可以满足上面的所有需求.

SSM Run Command 本身有一些限制.

  • 通过 API 发送的 Run Command 也是有限制的, 不能超过 100KB. 如果你需要发送大量数据, 那么你需要修改你的远程命令程序, 让它接受 S3 uri 为参数, 然后到 S3 uri 去读输入数据.

  • Stdout 是有大小限制的, API 最多显示 24000 个字符. 如果需要捕获大量数据, 那么你需要修改你的远程命令程序, 将结果保存在 S3 上.

这里我不详细展开说 SSM Run Command 这个功能, 建议先看看一下 Run Remote Command on EC2 via SSM 这边博文, 对其有个简单的了解

最终解决方案

  1. 对于运行单条 Terminal Command, 就直接用 SSM Run Command 即可.

  2. 对于运行复杂的 Python 脚本呢, 我们可以将在本地的 Python 脚本先上传到 S3, 然后用 Run Command 运行第一条命令 aws s3 cp s3://... /tmp/...script.py 将其下载到 EC2 上, 然后再指定 Python 解释器来执行该脚本. 如果该脚本是个命令行工具, 我们还能带上参数. 注意, 我们要确保这个 EC2 上预装了 aws cli.

  3. 如果我们需要捕获命令返回的结果, 那么我们要么自己能保证这条命令能在 Stdout 中返回一个结构化的数据 (注意, logging 可能会干扰到返回值), 例如 JSON, 要么能运行过程中的数据上传到 S3. 然后我们再从 S3 读取数据.

实际案例

script.py 这是我们想要在 EC2 上执行的命令

 1# -*- coding: utf-8 -*-
 2
 3import sys
 4import json
 5
 6
 7def run() -> dict:
 8    print("start")
 9    print("done")
10    return {
11        "python": sys.executable,
12        "weird_string": "\\a\nb\tc\"d'e@f#g:h/i"
13    }
14
15
16if __name__ == "__main__":
17    print(json.dumps(run()))

send_command.py 这是一个库, 能让我们方便的调用 run command 命令

  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import sys
  5import enum
  6import time
  7import itertools
  8import dataclasses
  9
 10if T.TYPE_CHECKING:
 11    from mypy_boto3_ssm.client import SSMClient
 12
 13
 14class Waiter:
 15    """
 16    Simple retry / polling with progressing status. Usage, it is common to check
 17    if a long-running job is done every X seconds and timeout in Y seconds.
 18    This class allow you to customize the polling interval and timeout,.
 19
 20    Example:
 21
 22    .. code-block:: python
 23
 24        print("before waiter")
 25
 26        for attempt, elapse in Waiter(
 27            delays=1,
 28            timeout=10,
 29            verbose=True,
 30        ):
 31            # check if should jump out of the polling loop
 32            if elapse >= 5:
 33                print("")
 34                break
 35
 36        print("after waiter")
 37    """
 38
 39    def __init__(
 40        self,
 41        delays: T.Union[int, float],
 42        timeout: T.Union[int, float],
 43        indent: int = 0,
 44        verbose: bool = True,
 45    ):
 46        self._delays = delays
 47        self.delays = itertools.repeat(delays)
 48        self.timeout = timeout
 49        self.tab = " " * indent
 50        self.verbose = verbose
 51
 52    def __iter__(self):
 53        if self.verbose:  # pragma: no cover
 54            sys.stdout.write(
 55                f"start waiter, polling every {self._delays} seconds, "
 56                f"timeout in {self.timeout} seconds.\n"
 57            )
 58            sys.stdout.flush()
 59            sys.stdout.write(
 60                f"\r{self.tab}on 0 th attempt, "
 61                f"elapsed 0 seconds, "
 62                f"remain {self.timeout} seconds ..."
 63            )
 64            sys.stdout.flush()
 65        start = time.time()
 66        end = start + self.timeout
 67        yield 0, 0
 68        for attempt, delay in enumerate(self.delays, 1):
 69            now = time.time()
 70            remaining = end - now
 71            if remaining < 0:
 72                raise TimeoutError(f"timed out in {self.timeout} seconds!")
 73            else:
 74                time.sleep(min(delay, remaining))
 75                elapsed = int(now - start + delay)
 76                if self.verbose:  # pragma: no cover
 77                    sys.stdout.write(
 78                        f"\r{self.tab}on {attempt} th attempt, "
 79                        f"elapsed {elapsed} seconds, "
 80                        f"remain {self.timeout - elapsed} seconds ..."
 81                    )
 82                    sys.stdout.flush()
 83                yield attempt, int(elapsed)
 84
 85
 86def send_command(
 87    ssm_client,
 88    instance_id: str,
 89    commands: T.List[str],
 90) -> str:
 91    """
 92    Reference:
 93
 94    - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
 95    """
 96    res = ssm_client.send_command(
 97        InstanceIds=[
 98            instance_id,
 99        ],
100        DocumentName="AWS-RunShellScript",
101        DocumentVersion="1",
102        Parameters={"commands": commands},
103    )
104    command_id = res["Command"]["CommandId"]
105    return command_id
106
107
108class CommandInvocationStatusEnum(str, enum.Enum):
109    """
110    Reference:
111
112    - get_command_invocation_
113    """
114
115    Pending = "Pending"
116    InProgress = "InProgress"
117    Delayed = "Delayed"
118    Success = "Success"
119    Cancelled = "Cancelled"
120    TimedOut = "TimedOut"
121    Failed = "Failed"
122    Cancelling = "Cancelling"
123
124
125@dataclasses.dataclass
126class CommandInvocation:
127    """
128    Represents a Command Invocation details returned from a
129    get_command_invocation_ API call.
130    """
131
132    CommandId: T.Optional[str] = dataclasses.field(default=None)
133    InstanceId: T.Optional[str] = dataclasses.field(default=None)
134    Comment: T.Optional[str] = dataclasses.field(default=None)
135    DocumentName: T.Optional[str] = dataclasses.field(default=None)
136    DocumentVersion: T.Optional[str] = dataclasses.field(default=None)
137    PluginName: T.Optional[str] = dataclasses.field(default=None)
138    ResponseCode: T.Optional[int] = dataclasses.field(default=None)
139    ExecutionStartDateTime: T.Optional[str] = dataclasses.field(default=None)
140    ExecutionElapsedTime: T.Optional[str] = dataclasses.field(default=None)
141    ExecutionEndDateTime: T.Optional[str] = dataclasses.field(default=None)
142    Status: T.Optional[str] = dataclasses.field(default=None)
143    StatusDetails: T.Optional[str] = dataclasses.field(default=None)
144    StandardOutputContent: T.Optional[str] = dataclasses.field(default=None)
145    StandardOutputUrl: T.Optional[str] = dataclasses.field(default=None)
146    StandardErrorContent: T.Optional[str] = dataclasses.field(default=None)
147    StandardErrorUrl: T.Optional[str] = dataclasses.field(default=None)
148    CloudWatchOutputConfig: T.Optional[dict] = dataclasses.field(default=None)
149
150    @classmethod
151    def from_get_command_invocation_response(
152        cls,
153        response: dict,
154    ) -> "CommandInvocation":
155        """
156        Reference:
157
158        - get_command_invocation_
159        """
160        kwargs = {
161            field.name: response.get(field.name) for field in dataclasses.fields(cls)
162        }
163        return cls(**kwargs)
164
165    @classmethod
166    def get(
167        cls,
168        ssm_client: "SSMClient",
169        command_id: str,
170        instance_id: str,
171    ) -> "CommandInvocation":
172        """
173        A wrapper around get_command_invocation_ API call.
174
175        Reference:
176
177        - get_command_invocation_
178        """
179        response = ssm_client.get_command_invocation(
180            CommandId=command_id,
181            InstanceId=instance_id,
182        )
183        return cls.from_get_command_invocation_response(response)
184
185    def to_dict(self) -> dict:
186        return dataclasses.asdict(self)
187
188
189def wait_until_command_succeeded(
190    ssm_client: "SSMClient",
191    command_id: str,
192    instance_id: str,
193    delays: int = 3,
194    timeout: int = 60,
195    verbose: bool = True,
196) -> CommandInvocation:
197    """
198    After you call send_command_ API, you can use this function to wait until
199    it succeeds. If it fails, it will raise an exception.
200
201    Reference:
202
203    - get_command_invocation_
204    """
205    for _ in Waiter(delays=delays, timeout=timeout, verbose=verbose):
206        command_invocation = CommandInvocation.get(
207            ssm_client=ssm_client,
208            command_id=command_id,
209            instance_id=instance_id,
210        )
211        if command_invocation.Status == CommandInvocationStatusEnum.Success.value:
212            sys.stdout.write("\n")
213            return command_invocation
214        elif command_invocation.Status in [
215            CommandInvocationStatusEnum.Cancelled.value,
216            CommandInvocationStatusEnum.TimedOut.value,
217            CommandInvocationStatusEnum.Failed.value,
218            CommandInvocationStatusEnum.Cancelling.value,
219        ]:
220            raise Exception(f"Command failed, status: {command_invocation.Status}")
221        else:
222            pass

test.py 这是我们的最终代码, 实现了我们的解决方案.

 1# -*- coding: utf-8 -*-
 2
 3import typing as T
 4import time
 5import json
 6import uuid
 7from pathlib_mate import Path
 8from s3pathlib import S3Path
 9from boto_session_manager import BotoSesManager
10from send_command import (
11    send_command,
12    wait_until_command_succeeded,
13)
14from rich import print as rprint
15
16bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")
17
18
19def run(
20    bsm: BotoSesManager,
21    instance_id: str,
22    path_python: Path,
23    code: str,
24    s3_path: S3Path,
25    args: T.List[str],
26):
27    """
28    这是我们解决方案的主函数
29
30    :param bsm: boto session manager 对象 (你可以不要这个对象, 直接用 ssm_client, s3_client 即可)
31    :param instance_id: EC2 instance id
32    :param path_python: 位于 EC2 上的 Python 解释器路径, 你可以选择用哪个 Python 解释器来运行这个命令
33    :param code: 你要在 EC2 上执行的脚本的源代码的字符串
34    :param s3_path: 你要将这个源代码上传到 S3 的哪里
35    :param args: 这个 Python 脚本有没有额外的参数, 如果有, 请用列表的形式列出来, 就像你
36        写 subprocess.run([...]) 一样.
37    """
38    s3path.write_text(code)
39
40    # 生成一个随机的路径, 用于存放代码
41    path_code = f"/tmp/{uuid.uuid4().hex}.py"
42    # 用 aws cli 将代码下载到本地, 并且过滤掉日志
43    command1 = f"/home/ubuntu/.pyenv/shims/aws s3 cp {s3_path.uri} {path_code} 2>&1 > /dev/null"
44    # 组装最终命令
45    args_ = [
46        f"{path_python}",
47        f"{path_code}",
48    ]
49    args_.extend(args)
50    command2 = " ".join(args_)
51    print(command1)
52    print(command2)
53    # 用 SSM 远程执行该命令
54    command_id = send_command(
55        ssm_client=bsm.ssm_client,
56        instance_id=instance_id,
57        commands=[
58            command1,
59            command2,
60        ],
61    )
62    time.sleep(1) # 一定要等待 1 秒, 不然你立刻 get 是 get 不到的
63    command_invocation = wait_until_command_succeeded(
64        ssm_client=bsm.ssm_client,
65        command_id=command_id,
66        instance_id=instance_id,
67    )
68    rprint(command_invocation)
69    # 解析 return code 和 standard output, parse 我们脚本输出的 JSON
70    print(command_invocation.ResponseCode)
71    lines = command_invocation.StandardOutputContent.splitlines()
72    output_data = json.loads(lines[-1])
73    rprint(output_data)
74
75
76instance_id = "i-00f591fc972902fc5"
77path_python = Path("/home/ubuntu/.pyenv/shims/python")
78code = Path("script.py").read_text()
79s3path = S3Path(
80    f"s3://{bsm.aws_account_id}-{bsm.aws_region}-data/projects/dev-exp-share/script.py"
81)
82args = []
83run(
84    bsm=bsm,
85    instance_id=instance_id,
86    path_python=path_python,
87    code=code,
88    s3_path=s3path,
89    args=[],
90)