以下過去記事の続き。この時はメール本文がJSON生データで送信された。これを、人間が状況を判別可能な状態までもっていきたい。
AWSイベント監視 - CloudTrail + EventBridge + Lambdaでメールカスタマイズ

 

とういことで、再度検証。使用したAWS各種サービスのリソースは前回と同様で、Lambda関数のコードだけ入れ替え。何度も同じようなことをやっていて何がなんだか分からなくなっているがもうヤケクソ。

 

lambda_function.py (イベント監視メール通知用)

import boto3
import json
import os
import re
from botocore.exceptions import ClientError
from datetime import datetime, timezone, timedelta
from dateutil import parser

print('Loading function')

sns_arn = os.environ['SNS_TOPIC_ARN']

def lambda_handler(event, context):
    data = event
    s = json.dumps(data)
    e = json.loads(s)
    print(e)
    
    # eventから項目を抽出
    dtl = e['detail'] #詳細(detail)を定義
    e_type = e['detail-type'] # イベントタイプ 例:'AWS API Call via CloudTrail'
    t = e['time'] # 発生時刻
    evt_name = dtl['eventName'] # イベント名 例:DeleteBucket
    evt_src = dtl['eventSource'] # イベントソース 例:s3.amazonaws.com
    evt_usr = dtl['userIdentity']['arn'] # 実行ユーザ 例:arn:aws:iam::012345678910:user/hoge_user
    evt_ip = dtl['sourceIPAddress'] # ソースIPアドレス #123.123.123.123

    # 時刻変換
    JST = timezone(timedelta(hours=+9), 'JST')
    utcstr_parsed = parser.parse(t)
    ux_time = utcstr_parsed.timestamp()
    epoch = int(ux_time)
    # unixタイムスタンプをJSTに変換。dtはこの時点でdatetime.datetime型
    dt = datetime.fromtimestamp(epoch).replace(tzinfo=timezone.utc).astimezone(tz=JST)
    # dtを整形
    dt_str = dt.strftime('%Y-%m-%d %H:%M:%S')
    
    # イベントリクエスト 例:{'bucketName': 'hoge-test-bucket-0001', 'Host': 's3-ap-northeast-1.amazonaws.com'}
    evt_req_d = e['detail']['requestParameters']
    req_str = json.dumps(evt_req_d)
    evt_req = re.sub(r"[{}\"]", "", req_str)

    # 件名整形
    subject_str = "本番環境イベント監視 " + evt_name + " - " + evt_src

    # メッセージ本文整形
    fix_msg = "以下のイベントが発生しました" + "\n"
    type_msg = "イベントタイプ:" + "\n" + e_type
    time_msg = "発生時刻(JST):" + "\n" + dt_str
    ename_msg = "イベント名:" + "\n" + evt_name
    esrc_msg ="イベントソース:" "\n" + evt_src
    ereq_msg ="イベントリクエスト:" "\n" + evt_req
    eusr_msg ="実行ユーザ:" "\n" + evt_usr
    eip_msg ="ソースIPアドレス:" "\n" + evt_ip
    msg = fix_msg + "\n\n" + type_msg + "\n\n" + time_msg + "\n\n" + ename_msg + "\n\n" + esrc_msg + "\n\n" + ereq_msg + "\n\n" + eusr_msg + "\n\n" + eip_msg
    
    try:
        sns = boto3.client('sns')
        
        #SNS Publish
        publishResponse = sns.publish(
            TopicArn = os.environ['SNS_TOPIC_ARN'],
            Message = msg,
            Subject = subject_str
        )
    
    except Exception as e:
        print(e)

 

テスト用にサンプルデータを貼り付けて、エラーが出ない状態にまでもっていった。これでもういいかと思ったがやはりちゃんとしたメールが届くのが見たい…ということで、再びCloudTrailを有効にしてイベントを発生させることにした。前回無効にしたCloudTrailログ用の残骸バケットが残っていたのでそれを削除したら、まさにその行為がイベントとして検知され、メールが届いた。よしよし。

AWSイベントメールサンプル

 

上記はテストデータ残骸の内容が表示されているが、実際のイベント発生時のメールも別途ちゃんと届いている。上記画面では「APIタイプ」が含まれているがこれはいらないと判断したため、この後コードから削除した。

 

ちなみに、JSON生データからkey:valueの抽出を行うときはこんなんでやった。JSONデータをファイルに保存してから:

>>> import json
>>> event_file = open('event_msg_sample.json','r')
>>> event = json.load(event_file)
>>> for k,v in event.items():
...   print(k,v)

 

(追記)以下でもよい。先にこれをやって、階層が深くて分かりにくい場合は上記を併用すればいいかと。

>>> msg = json.dumps(event, indent=3)
>>> print(msg)

 

Park

Park


関連がありそうな記事