RDSのデータをS3にエクスポートするLambdaという「あるある」パターン。DBはPostgreSQL、Python3.9、エラー処理抜き。
前提条件
-
RDSに接続するため、Lambdaの配置はVPCとする。
-
PostgreSQL接続時に外部ライブラリのpsycopg2が必要となる。レイヤーに登録してから定義すればよいがちょっとハマりどころがある。
-
上記に加えてaws_s3拡張機能のインストールが必要。
DBに接続して以下コマンドを実行する。
CREATE EXTENSION aws_s3CASCADE;
- RDS用のIAMロール作成
実行するLambda用のIAMロールとは別に、RDSにアタッチするロールを作成する。
必要なアクション
・s3.PutObject
・s3.AbortMultipartUpload
resourceとして格納先バケットを指定(arn:aws:s3:::バケット名/*)
ロールを作成したらDBインスタンスにアタッチする。RDSの対象DBの画面で「接続とセキュリティ」タブをスクロールして、「IAMロールの管理」を表示。作成したIAMロールと、機能:s3Exportを指定して「ロールの追加」。(簡単にできそうだと思ってたけど前提が結構面倒くさかった…)
ここまできて本題のコード。前回同様、認証情報はシークレットマネージャに格納して取得する方式。スキーマやテーブル名他名称部分は適宜置き換えで。あとDB接続時は本来with構文にする方が望ましい。
import json
import boto3
import psycopg2
from datetime import datetime, date, time, timedelta
# 認証情報をシークレットマネージャーから取得
def getCredentials():
credential = {}
secret_name = "postgres/mysecretname"
region_name = "ap-northeast-1"
client = boto3.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
secret = json.loads(get_secret_value_response['SecretString'])
credential['username'] = secret['username']
credential['password'] = secret['password']
credential['host'] = "RDS endpoint URL"
credential['db'] = "databasename"
return credential
def lambda_handler(event, context):
# バケットとテーブル定義
bucket = 'my bucket name'
tbl = 'mytbl'
# 日付情報取得
yesterday = datetime.combine(date.today()-timedelta(1),time())
today = datetime.combine(date.today(),time())
# s3パス定義
key1 = yesterday.strftime("%Y")
key2 = yesterday.strftime("%m")
key3 = yesterday.strftime("%%d")
s3_path = key1 + "/" + key2 + "/" + key3 + 'tbl-export.txt'
# DB接続
credential = getCredentials()
connection = psycopg2.connect(user=credential['username'], password=credential['password'], host=credential['host'], database=credential['db'])
cursor = connection.cursor()
# クエリ定義
query = f''''select * from aws_s3.query_export_to_s3(
'select * from public.{tbl}',
aws_commons.create_s3_uri(
'{bucket}',
'{s3_path}',
'ap-northeast-1'
)
))'''[1:-1]
# クエリ実行(エクスポートが実行される)
cursor.excute(query)
# 接続クローズ
cursor.close()
connection.commit()
参考までに、以下はDBから直接クエリ実行する場合。コードではpythonから実行するため、query変数を定義してヒアドキュメントでクエリを記述。また、f-stringsによる変数展開を行うため記述が多少異なる形になった。
select * from aws_s3.query_export_to_s3(
'select * from public.data_tbl',
aws_commons.create_s3_uri(
'bucketname',
'2023/09/01/rds_export_data.csv',
'ap-northeast-1'
)
);
ちなみにここではテーブルデータ全部を引っこ抜いているが、一定の時刻範囲で抽出したい場合はクエリを調整すればよいかと。(当然timestamp列が存在する前提)