Run Remote Command on EC2 via SSM

Overview

在服务器上执行命令是一个非常普遍的需求. 通常我们有这么几种方法:

  • SSH 登录服务器, 然后在终端里敲命令.

  • 用远程执行工具, 例如 paramiko. 你需要管理好 SSH.

  • 用 Ansible 一类的自动化工具.

AWS 原生的 System Manager 服务可以用来来执行远程命令. 这种方法的好处有很多:

  • 无需管理 SSH.

  • 使用 IAM Role 权限管理, 非常安全.

  • 自动化程度高, 可以被嵌入或者编排成各种复杂的脚本.

  • 可以和 AWS 的其他服务联动.

本文我们就来看看如何用 AWS 的 System Manager 来执行远程命令.

How it Work

AWS 有一个历史悠久的服务 SSM (System Manager), 该服务对标的是 Ansible 之类的服务器运维工具, 用于批量管理虚拟机. 和 Ansible 用 SSH 来执行远程命令的方式不同, SSM 是通过在机器上安装 SSM Agent (一个由 AWS 维护的系统服务软件), 然后让 SSM Agent 将自己自动注册到 SSM Fleet Manager, 然后通过 IAM 鉴权, 然后用 AWS 内部的 API 与 SSM Agent 通信从而执行远程命令.

我们来看一看在启动一台由 SSM 管理的 EC2 的过程中, 到底发生了什么:

  • 启动机器, 启动操作系统以及系统服务, 其中系统服务就包括 SSM agent.

  • SSM gent 启动后就会调用 IAM 的权限, 尝试将自己注册到 SSM Fleet Manager 上.

  • 一旦注册成功, 你就可以用 SSM 来远程操纵 EC2 了.

从以上内容我们可以看出来, 安装 SSM Agent 至关重要. 所幸的事 AWS 官方提供的一些 AMI (主要是 Amazon Linux) 上会预装 SSM Agent. 包括 AWS 认证过的第三方软件提供商例如 RedHat, Ubuntu 等公司提供的 AMI 也会预装 SSM Agent 并开机自动启动. 但是你用的是你自己或是 Market place 上的 AMI, 里面没有预装 SSM Agent, 你就需要自己安装了. 我们这个项目用的是 Ubuntu Server 20.04, 里面已经预装了 SSM Agent, 所以我们无需做任何额外工作.

在你启动 EC2 的时候 (包括启动新的 EC2, 或是 Stop 之后再 Start, 或是 Reboot 都可以, 因为只要启动系统服务就可以了), 只要你的 IAM Role 里有这个 由 AWS 管理的 IAM Policy arn:aws:iam::aws:policy/service-role/AmazonSSMManagedInstanceCore, 或是你创建一个自己的 Policy 有同样的权限, 那么 SSM Agent 就会自动将自己注册到 SSM Fleet Manager. 虽然 Reference 中的官方文档用的 IAM Role 有特定的名字, 但其实什么名字都可以, 只要有对应的权限就可以.

Reference:

Manually Install SSM Agent on EC2

下面这些文档介绍了如何手动在 EC2 上安装 SSM Agent, 我并没有动手试过, 仅供参考.

一些有用的命令

你可以用 AWS CLI 来查看哪些 EC2 被注册到了 SSM 管理清单上, 你到 SSM Fleet Manager Console 中看也是一样的:

aws ssm describe-instance-information --output text --profile bmt_app_dev_us_east_1

你也可以 SSH 到 EC2 上运行如下命令来检查 SSM Agent 是否已经启用 (该项目基于 ubuntu server 20.04, 其他系统请参考 官方文档):

sudo systemctl status snap.amazon-ssm-agent.amazon-ssm-agent.service

用 SSM Agent 执行远程命令

下面这段代码展示了如何用 boto3 SDK 通过 SSM 运行远程命令.

 1# -*- coding: utf-8 -*-
 2
 3import boto3
 4
 5ssm_client = boto3.client("ssm")
 6
 7
 8def send_command(
 9    instance_id: str,
10    cmd: str,
11):
12    ssm_client.send_command(
13        InstanceIds=[
14            instance_id,
15        ],
16        DocumentName="AWS-RunShellScript",
17        DocumentVersion="1",
18        Parameters={
19            "commands": [
20                cmd,
21            ]
22        }
23    )
24
25
26send_command(
27    instance_id="i-1a2b3c4d",
28    cmd="echo 1a2b3c4d > ~/chore",
29)

有了概念之后, 我们来看一个更高级的模块, 适用于生产环境的代码:

  1# -*- coding: utf-8 -*-
  2
  3"""
  4This module allow you to run remote command on EC2 instance via SSM in 'sync' mode.
  5The original ssm_client.send_command() is 'async' call, which means you have to
  6poll the status of the command execution via ssm_client.get_command_invocation().
  7This module hides the complexity of polling and provide a simple interface.
  8
  9Example:
 10
 11.. code-block:: python
 12
 13    import boto3
 14    from s3pathlib import S3Path
 15
 16    instance_id = "i-1a2b3c"
 17    commands = [
 18        "echo hello"
 19    ]
 20    ssm_client = boto3.client("ssm")
 21
 22    # make sure your EC2 has the IAM permission to write to this location
 23    s3dir_command_output = S3Path(f"s3://my-bucket/ssm-command-output/").to_dir()
 24
 25    res = ssm_client.send_command(
 26        InstanceIds=[instance_id],
 27        DocumentName="AWS-RunShellScript",
 28        DocumentVersion="1",
 29        Parameters={
 30            "commands": commands
 31        },
 32        OutputS3BucketName=s3dir_command_output.bucket,
 33        OutputS3KeyPrefix=s3dir_command_output.key,
 34    )
 35    command_id = res["Command"]["CommandId"]
 36
 37    wait_until_command_succeeded(
 38        ssm_client=ssm_client,
 39        command_id=command_id,
 40        instance_id=instance_id,
 41        delays=3,
 42        timeout=60,
 43        verbose=True,
 44    )
 45
 46    for s3path in (
 47        s3dir_command_output.joinpath(
 48            command_id,
 49            instance_id,
 50            "awsrunShellScript",
 51        )
 52        .to_dir()
 53        .iter_objects()
 54    ):
 55        print(f"--- {s3path.uri} ---")
 56        print(f"{s3path.read_text()}")
 57
 58
 59.. _send_command: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/send_command.html
 60.. _get_command_invocation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_command_invocation.html
 61"""
 62
 63import typing as T
 64import sys
 65import enum
 66import time
 67import itertools
 68import dataclasses
 69
 70from func_args import resolve_kwargs, NOTHING
 71
 72if T.TYPE_CHECKING:
 73    from mypy_boto3_ssm.client import SSMClient
 74
 75
 76class Waiter:
 77    """
 78    Simple retry / poll with progress.
 79    """
 80
 81    def __init__(
 82        self,
 83        delays: T.Union[int, float],
 84        timeout: T.Union[int, float],
 85        indent: int = 0,
 86        verbose: bool = True,
 87    ):
 88        self.delays = itertools.repeat(delays)
 89        self.timeout = timeout
 90        self.tab = " " * indent
 91        self.verbose = verbose
 92
 93    def __iter__(self):
 94        start = time.time()
 95        end = start + self.timeout
 96        for attempt, delay in enumerate(self.delays, 1):
 97            now = time.time()
 98            remaining = end - now
 99            if remaining < 0:
100                raise TimeoutError(f"timed out in {self.timeout} seconds!")
101            else:
102                time.sleep(min(delay, remaining))
103                elapsed = int(now - start + delay)
104                if self.verbose:
105                    sys.stdout.write(
106                        f"\r{self.tab}on {attempt} th attempt, "
107                        f"elapsed {elapsed} seconds, "
108                        f"remain {self.timeout - elapsed} seconds ..."
109                    )
110                    sys.stdout.flush()
111                yield attempt, int(elapsed)
112
113
114class CommandInvocationStatusEnum(str, enum.Enum):
115    Pending = "Pending"
116    InProgress = "InProgress"
117    Delayed = "Delayed"
118    Success = "Success"
119    Cancelled = "Cancelled"
120    TimedOut = "TimedOut"
121    Failed = "Failed"
122    Cancelling = "Cancelling"
123
124
125@dataclasses.dataclass
126class CommandInvocation:
127    """
128    Reference:
129
130    - get_command_invocation_
131    """
132
133    CommandId: T.Optional[str] = dataclasses.field(default=None)
134    InstanceId: T.Optional[str] = dataclasses.field(default=None)
135    Comment: T.Optional[str] = dataclasses.field(default=None)
136    DocumentName: T.Optional[str] = dataclasses.field(default=None)
137    DocumentVersion: T.Optional[str] = dataclasses.field(default=None)
138    PluginName: T.Optional[str] = dataclasses.field(default=None)
139    ResponseCode: T.Optional[int] = dataclasses.field(default=None)
140    ExecutionStartDateTime: T.Optional[str] = dataclasses.field(default=None)
141    ExecutionElapsedTime: T.Optional[str] = dataclasses.field(default=None)
142    ExecutionEndDateTime: T.Optional[str] = dataclasses.field(default=None)
143    Status: T.Optional[str] = dataclasses.field(default=None)
144    StatusDetails: T.Optional[str] = dataclasses.field(default=None)
145    StandardOutputContent: T.Optional[str] = dataclasses.field(default=None)
146    StandardOutputUrl: T.Optional[str] = dataclasses.field(default=None)
147    StandardErrorContent: T.Optional[str] = dataclasses.field(default=None)
148    StandardErrorUrl: T.Optional[str] = dataclasses.field(default=None)
149    CloudWatchOutputConfig: T.Optional[dict] = dataclasses.field(default=None)
150
151    @classmethod
152    def from_get_command_invocation_response(
153        cls, response: dict
154    ) -> "CommandInvocation":
155        """
156        Reference:
157
158        - get_command_invocation_
159        """
160        kwargs = {
161            field.name: response.get(field.name) for field in dataclasses.fields(cls)
162        }
163        return cls(**kwargs)
164
165    @classmethod
166    def get(
167        cls,
168        ssm_client: "SSMClient",
169        command_id: str,
170        instance_id: str,
171        plugin_name: T.Optional[str] = NOTHING,
172    ) -> "CommandInvocation":
173        """
174        Reference:
175
176        - get_command_invocation_
177        """
178        response = ssm_client.get_command_invocation(
179            **resolve_kwargs(
180                CommandId=command_id,
181                InstanceId=instance_id,
182                PluginName=plugin_name,
183            )
184        )
185        return cls.from_get_command_invocation_response(response)
186
187
188def wait_until_command_succeeded(
189    ssm_client: "SSMClient",
190    command_id: str,
191    instance_id: str,
192    plugin_name: T.Optional[str] = NOTHING,
193    delays: int = 3,
194    timeout: int = 60,
195    verbose: bool = True,
196):
197    """
198    Reference:
199
200    - get_command_invocation_
201    """
202    for _ in Waiter(delays=delays, timeout=timeout, verbose=verbose):
203        command_invocation = CommandInvocation.get(
204            ssm_client=ssm_client,
205            command_id=command_id,
206            instance_id=instance_id,
207            plugin_name=plugin_name,
208        )
209        if command_invocation.Status == CommandInvocationStatusEnum.Success.value:
210            if verbose:
211                print("")
212            break
213        elif command_invocation.Status in [
214            CommandInvocationStatusEnum.Cancelled.value,
215            CommandInvocationStatusEnum.TimedOut.value,
216            CommandInvocationStatusEnum.Failed.value,
217            CommandInvocationStatusEnum.Cancelling.value,
218        ]:
219            raise Exception(f"Command failed, status: {command_invocation.Status}")
220        else:
221            pass
222
223
224if __name__ == "__main__":
225    import boto3
226    from s3pathlib import S3Path
227
228    instance_id = "i-1a2b3c"
229    commands = ["echo hello"]
230    ssm_client = boto3.client("ssm")
231
232    # make sure your EC2 has the IAM permission to write to this location
233    s3dir_command_output = S3Path(f"s3://my-bucket/ssm-command-output/").to_dir()
234
235    res = ssm_client.send_command(
236        InstanceIds=[instance_id],
237        DocumentName="AWS-RunShellScript",
238        DocumentVersion="1",
239        Parameters={"commands": commands},
240        OutputS3BucketName=s3dir_command_output.bucket,
241        OutputS3KeyPrefix=s3dir_command_output.key,
242    )
243    command_id = res["Command"]["CommandId"]
244
245    wait_until_command_succeeded(
246        ssm_client=ssm_client,
247        command_id=command_id,
248        instance_id=instance_id,
249        delays=3,
250        timeout=60,
251        verbose=True,
252    )
253
254    for s3path in (
255        s3dir_command_output.joinpath(
256            command_id,
257            instance_id,
258            "awsrunShellScript",
259        )
260        .to_dir()
261        .iter_objects()
262    ):
263        print(f"--- {s3path.uri} ---")
264        print(f"{s3path.read_text()}")

Reference:

总结

  1. 在创建 EC2 之前就要配置好你的 IAM Role.

  2. 确保你给 EC2 的 IAM Role 有这个 AmazonSSMManagedInstanceCore IAM Policy.

  3. 启动 EC2 的时候使用这个 IAM Role. 如果启动的时候忘记给 IAM Role, 那么你可以启动后指定 IAM Role 然后重启即可.

  4. 然后就可以用 SSM 的 API 来远程执行命令了.

Remote Command 还能用来干什么

很多自动化脚本由于网络连接的缘故是必须要在 EC2 上运行的. 所以我们可以在世界的任意地点用 SSM agent 来执行远程命令. 而而关于传输数据, 我建议通过 S3 做媒介, 让 EC2 将命令执行后的数据写入到 S3 上. 这样你就可以在任意地点读取这些数据了.