以下過去記事の続き。この時はメール本文がJSON生データで送信された。これを、人間が状況を判別可能な状態までもっていきたい。
AWSイベント監視 - CloudTrail + EventBridge + Lambdaでメールカスタマイズ
とういことで、再度検証。使用したAWS各種サービスのリソースは前回と同様で、Lambda関数のコードだけ入れ替え。何度も同じようなことをやっていて何がなんだか分からなくなっているがもうヤケクソ。
lambda_function.py (イベント監視メール通知用)
import boto3
import json
import os
import re
from botocore.exceptions import ClientError
from datetime import datetime, timezone, timedelta
from dateutil import parser
print('Loading function')
sns_arn = os.environ['SNS_TOPIC_ARN']
def lambda_handler(event, context):
data = event
s = json.dumps(data)
e = json.loads(s)
print(e)
# eventから項目を抽出
dtl = e['detail'] #詳細(detail)を定義
e_type = e['detail-type'] # イベントタイプ 例:'AWS API Call via CloudTrail'
t = e['time'] # 発生時刻
evt_name = dtl['eventName'] # イベント名 例:DeleteBucket
evt_src = dtl['eventSource'] # イベントソース 例:s3.amazonaws.com
evt_usr = dtl['userIdentity']['arn'] # 実行ユーザ 例:arn:aws:iam::012345678910:user/hoge_user
evt_ip = dtl['sourceIPAddress'] # ソースIPアドレス #123.123.123.123
# 時刻変換
JST = timezone(timedelta(hours=+9), 'JST')
utcstr_parsed = parser.parse(t)
ux_time = utcstr_parsed.timestamp()
epoch = int(ux_time)
# unixタイムスタンプをJSTに変換。dtはこの時点でdatetime.datetime型
dt = datetime.fromtimestamp(epoch).replace(tzinfo=timezone.utc).astimezone(tz=JST)
# dtを整形
dt_str = dt.strftime('%Y-%m-%d %H:%M:%S')
# イベントリクエスト 例:{'bucketName': 'hoge-test-bucket-0001', 'Host': 's3-ap-northeast-1.amazonaws.com'}
evt_req_d = e['detail']['requestParameters']
req_str = json.dumps(evt_req_d)
evt_req = re.sub(r"[{}\"]", "", req_str)
# 件名整形
subject_str = "本番環境イベント監視 " + evt_name + " - " + evt_src
# メッセージ本文整形
fix_msg = "以下のイベントが発生しました" + "\n"
type_msg = "イベントタイプ:" + "\n" + e_type
time_msg = "発生時刻(JST):" + "\n" + dt_str
ename_msg = "イベント名:" + "\n" + evt_name
esrc_msg ="イベントソース:" "\n" + evt_src
ereq_msg ="イベントリクエスト:" "\n" + evt_req
eusr_msg ="実行ユーザ:" "\n" + evt_usr
eip_msg ="ソースIPアドレス:" "\n" + evt_ip
msg = fix_msg + "\n\n" + type_msg + "\n\n" + time_msg + "\n\n" + ename_msg + "\n\n" + esrc_msg + "\n\n" + ereq_msg + "\n\n" + eusr_msg + "\n\n" + eip_msg
try:
sns = boto3.client('sns')
#SNS Publish
publishResponse = sns.publish(
TopicArn = os.environ['SNS_TOPIC_ARN'],
Message = msg,
Subject = subject_str
)
except Exception as e:
print(e)
テスト用にサンプルデータを貼り付けて、エラーが出ない状態にまでもっていった。これでもういいかと思ったがやはりちゃんとしたメールが届くのが見たい…ということで、再びCloudTrailを有効にしてイベントを発生させることにした。前回無効にしたCloudTrailログ用の残骸バケットが残っていたのでそれを削除したら、まさにその行為がイベントとして検知され、メールが届いた。よしよし。
上記はテストデータ残骸の内容が表示されているが、実際のイベント発生時のメールも別途ちゃんと届いている。上記画面では「APIタイプ」が含まれているがこれはいらないと判断したため、この後コードから削除した。
ちなみに、JSON生データからkey:valueの抽出を行うときはこんなんでやった。JSONデータをファイルに保存してから:
>>> import json
>>> event_file = open('event_msg_sample.json','r')
>>> event = json.load(event_file)
>>> for k,v in event.items():
... print(k,v)
(追記)以下でもよい。先にこれをやって、階層が深くて分かりにくい場合は上記を併用すればいいかと。
>>> msg = json.dumps(event, indent=3)
>>> print(msg)