Learn EventBridge by Doing¶
我个人推荐的方式是用 CodeCommit repository 中的 Git 事件来触发, 然后编写 Event Pattern 对其进行过滤, 然后将其发送给 Lambda Function. 这样可以很方便地通过操作 Repo 来触发事件, 然后在 Lambda 中查看 event.
1import json
2
3
4def jprint(data):
5 text = "\n".join([
6 "| " + line
7 for line in json.dumps(data, indent=4).split("\n")
8 ])
9 print(text)
10
11
12def lambda_handler(event, context):
13 print("received event:")
14 jprint(event)
15 return {"status": "success"}
1# -*- coding: utf-8 -*-
2
3import subprocess
4import json
5from datetime import datetime, timezone, timedelta
6
7from s3pathlib import S3Path, context
8from pathlib_mate import Path
9from boto_session_manager import BotoSesManager
10from aws_console_url import AWSConsole
11from rich import print as rprint
12
13# ------------------------------------------------------------------------------
14# Put your config here
15profile_name = "bmt_app_dev_us_east_1"
16bsm = BotoSesManager(profile_name=profile_name)
17codecommit_repo = "learn_event_bridge-project"
18lambda_function_name = "learn_event_bridge"
19lambda_function_role = f"arn:aws:iam::{bsm.aws_account_id}:role/lambda-power-user-role"
20# ------------------------------------------------------------------------------
21
22
23bsm = BotoSesManager(profile_name=profile_name)
24context.attach_boto_session(bsm.boto_ses)
25aws_console = AWSConsole(
26 aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, bsm=bsm
27)
28
29bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-artifacts"
30s3dir_root = S3Path(bucket, "projects", "learn_event_bridge").to_dir()
31s3path_source_zip = s3dir_root.joinpath("lambda", "source.zip")
32
33dir_here = Path.dir_here(__file__)
34path_source_zip = dir_here.joinpath("source.zip")
35
36log_group_name = f"/aws/lambda/{lambda_function_name}" # cloudwatch log group name
37
38
39def build_source():
40 path_source_zip.remove_if_exists()
41 with path_source_zip.parent.temp_cwd():
42 args = [
43 "zip",
44 f"{path_source_zip}",
45 f"lambda_function.py",
46 "-q",
47 ]
48 subprocess.run(args, check=True)
49 s3path_source_zip.upload_file(path_source_zip, overwrite=True)
50
51
52def deploy_lambda_function():
53 console_url = aws_console.awslambda.get_function(lambda_function_name)
54 print(f"preview lambda function: {console_url}")
55 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/get_function.html
56 try:
57 bsm.lambda_client.get_function(FunctionName=lambda_function_name)
58 exists = True
59 except Exception as e:
60 if "Function not found" in str(e):
61 exists = False
62 else:
63 raise e
64
65 if exists is False:
66 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/create_function.html
67 bsm.lambda_client.create_function(
68 FunctionName=lambda_function_name,
69 Description="for aws event bridge testing",
70 Runtime="python3.8",
71 Code=dict(
72 S3Bucket=s3path_source_zip.bucket,
73 S3Key=s3path_source_zip.key,
74 ),
75 Handler="lambda_function.lambda_handler",
76 MemorySize=128,
77 Timeout=3,
78 Role=lambda_function_role,
79 )
80 else:
81 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda/client/update_function_code.html
82 bsm.lambda_client.update_function_code(
83 FunctionName=lambda_function_name,
84 S3Bucket=s3path_source_zip.bucket,
85 S3Key=s3path_source_zip.key,
86 )
87
88
89def deploy_event_rule():
90 event_pattern = {
91 "source": ["aws.codecommit"],
92 "resources": [f"arn:aws:codecommit:us-east-1:{bsm.aws_account_id}:{codecommit_repo}"]
93 }
94 bsm.eventbridge_client.put_rule(
95 Name="string",
96 ScheduleExpression="string",
97 EventPattern=json.dumps(event_pattern),
98 State="ENABLED",
99 Tags=[
100 {"Key": "string", "Value": "string"},
101 ],
102 )
103
104
105def print_lambda_log():
106 response = bsm.cloudwatchlogs_client.describe_log_streams(
107 logGroupName=log_group_name,
108 orderBy="LastEventTime",
109 descending=True,
110 limit=3,
111 )
112 logStreams = response.get("logStreams", [])
113 if len(logStreams) == 0:
114 print("no log stream found")
115 return
116 log_stream_name = logStreams[0]["logStreamName"]
117 print(f"log stream name: {log_stream_name}")
118
119 utcnow = datetime.utcnow().replace(tzinfo=timezone.utc)
120 fifteen_minutes_ago = utcnow - timedelta(minutes=15)
121
122 response = bsm.cloudwatchlogs_client.get_log_events(
123 logGroupName=log_group_name,
124 logStreamName=log_stream_name,
125 startTime=int(1000 * fifteen_minutes_ago.timestamp()),
126 limit=100,
127 startFromHead=True,
128 )
129 events = response.get("events", [])
130 text = "".join([event["message"] for event in events])
131 print(text)
132
133
134# build_source()
135# deploy_lambda_function()
136# print_lambda_log()