Learn EventBridge by Doing

我个人推荐的方式是用 CodeCommit repository 中的 Git 事件来触发, 然后编写 Event Pattern 对其进行过滤, 然后将其发送给 Lambda Function. 这样可以很方便地通过操作 Repo 来触发事件, 然后在 Lambda 中查看 event.

 1import json
 2
 3
 4def jprint(data):
 5    text = "\n".join([
 6        "| " + line
 7        for line in json.dumps(data, indent=4).split("\n")
 8    ])
 9    print(text)
10
11
12def lambda_handler(event, context):
13    print("received event:")
14    jprint(event)
15    return {"status": "success"}
  1# -*- coding: utf-8 -*-
  2
  3import subprocess
  4import json
  5from datetime import datetime, timezone, timedelta
  6
  7from s3pathlib import S3Path, context
  8from pathlib_mate import Path
  9from boto_session_manager import BotoSesManager
 10from aws_console_url import AWSConsole
 11from rich import print as rprint
 12
 13# ------------------------------------------------------------------------------
 14# Put your config here
 15profile_name = "bmt_app_dev_us_east_1"
 16bsm = BotoSesManager(profile_name=profile_name)
 17codecommit_repo = "learn_event_bridge-project"
 18lambda_function_name = "learn_event_bridge"
 19lambda_function_role = f"arn:aws:iam::{bsm.aws_account_id}:role/lambda-power-user-role"
 20# ------------------------------------------------------------------------------
 21
 22
 23bsm = BotoSesManager(profile_name=profile_name)
 24context.attach_boto_session(bsm.boto_ses)
 25aws_console = AWSConsole(
 26    aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm
 27)
 28
 29bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-artifacts"
 30s3dir_root = S3Path(bucket, "projects", "learn_event_bridge").to_dir()
 31s3path_source_zip = s3dir_root.joinpath("lambda", "source.zip")
 32
 33dir_here = Path.dir_here(__file__)
 34path_source_zip = dir_here.joinpath("source.zip")
 35
 36log_group_name = f"/aws/lambda/{lambda_function_name}"  # cloudwatch log group name
 37
 38
 39def build_source():
 40    path_source_zip.remove_if_exists()
 41    with path_source_zip.parent.temp_cwd():
 42        args = [
 43            "zip",
 44            f"{path_source_zip}",
 45            f"lambda_function.py",
 46            "-q",
 47        ]
 48        subprocess.run(args, check=True)
 49    s3path_source_zip.upload_file(path_source_zip, overwrite=True)
 50
 51
 52def deploy_lambda_function():
 53    console_url = aws_console.awslambda.get_function(lambda_function_name)
 54    print(f"preview lambda function: {console_url}")
 55    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/get_function.html
 56    try:
 57        bsm.lambda_client.get_function(FunctionName=lambda_function_name)
 58        exists = True
 59    except Exception as e:
 60        if "Function not found" in str(e):
 61            exists = False
 62        else:
 63            raise e
 64
 65    if exists is False:
 66        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html
 67        bsm.lambda_client.create_function(
 68            FunctionName=lambda_function_name,
 69            Description="for aws event bridge testing",
 70            Runtime="python3.8",
 71            Code=dict(
 72                S3Bucket=s3path_source_zip.bucket,
 73                S3Key=s3path_source_zip.key,
 74            ),
 75            Handler="lambda_function.lambda_handler",
 76            MemorySize=128,
 77            Timeout=3,
 78            Role=lambda_function_role,
 79        )
 80    else:
 81        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/update_function_code.html
 82        bsm.lambda_client.update_function_code(
 83            FunctionName=lambda_function_name,
 84            S3Bucket=s3path_source_zip.bucket,
 85            S3Key=s3path_source_zip.key,
 86        )
 87
 88
 89def deploy_event_rule():
 90    event_pattern = {
 91        "source": ["aws.codecommit"],
 92        "resources": [f"arn:aws:codecommit:us-east-1:{bsm.aws_account_id}:{codecommit_repo}"]
 93    }
 94    bsm.eventbridge_client.put_rule(
 95        Name="string",
 96        ScheduleExpression="string",
 97        EventPattern=json.dumps(event_pattern),
 98        State="ENABLED",
 99        Tags=[
100            {"Key": "string", "Value": "string"},
101        ],
102    )
103
104
105def print_lambda_log():
106    response = bsm.cloudwatchlogs_client.describe_log_streams(
107        logGroupName=log_group_name,
108        orderBy="LastEventTime",
109        descending=True,
110        limit=3,
111    )
112    logStreams = response.get("logStreams", [])
113    if len(logStreams) == 0:
114        print("no log stream found")
115        return
116    log_stream_name = logStreams[0]["logStreamName"]
117    print(f"log stream name: {log_stream_name}")
118
119    utcnow = datetime.utcnow().replace(tzinfo=timezone.utc)
120    fifteen_minutes_ago = utcnow - timedelta(minutes=15)
121
122    response = bsm.cloudwatchlogs_client.get_log_events(
123        logGroupName=log_group_name,
124        logStreamName=log_stream_name,
125        startTime=int(1000 * fifteen_minutes_ago.timestamp()),
126        limit=100,
127        startFromHead=True,
128    )
129    events = response.get("events", [])
130    text = "".join([event["message"] for event in events])
131    print(text)
132
133
134# build_source()
135# deploy_lambda_function()
136# print_lambda_log()