S3上のCSVファイルをRDS(PostgreSQL)にインポートするという「あるある」パターンのLambda。(Python3.9, エラー処理抜き)
RDSのデータをS3にエクスポートする逆パターンについては以下記事参照。
RDS(PostgreSQL)のデータをS3にエクスポートするLambda
S3イベントがトリガーでLambdaが起動し、S3バケット内のCSVを一時領域にダウンロードしてRDSにインポートという流れ。DB認証情報はSecretManagerに格納して取得している。(認証取得は環境変数セットでもいけるけど昨今の流れ的にはSecret格納が優勢ということで)
前提条件
- RDSに接続するため、Lambdaの配置はVPCとする。
- PostgreSQL接続時に外部ライブラリのpsycopg2が必要となる。Lambdaのレイヤーに登録してからコード内で定義する。(今回詳細手順は割愛)
import json
import boto3
import psycopg2
# SecretManagerからRDS/DB認証情報取得
# secret_name は実際に作成したシークレットを指定
def getCredentials():
credential = {}
secret_name = "secretname"
region_name = "ap-northeast-1"
client = boto3.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
secret = json.loads(get_secret_value_response['SecretString'])
credential['username'] = secret['username']
credential['password'] = secret['password']
credential['host'] = "RDS endpoint URL"
credential['db'] = "databasename"
return credential
def lambda_handler(event, context):
#イベント全体の定義
data = event
s = json.dumps(data)
e = json.loads(s)
#イベントデータからS3部分抽出
record = e['Records']
s3_event_record = (record[0]['s3']
#バケット名の定義
bucket = s3_event_record['bucket']['name']
print('bucket name:' + bucket)
#オブジェクト名の定義(prefixを含む)
key = s3_event_record['object']['key']
print('prefix:' + key)
#オブジェクトのファイル名のみ抽出(ダウンロード時に使用)
path = key.split('/')
file = path[-1] #区切り文字で区切った最後の文字列を取得
tmpfile = '/tmp' + file
#バケット上のCSVを/tmp/ファイル名.csvとして一時領域にダウンロード
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(key, tmpfile)
# DB接続
credential = getCredentials()
connection = psycopg2.connect(user=credential['username'], password=credential['password'], host=credential['host'], database=credential['db'])
cursor = connection.cursor()
#ダウンロードしたCSVをDBにインポート。sepは区切り文字指定。
with open(tmpfile, 'r') as f:
cursor.copy_from(f, 'data_tbl', sep=',')
connection.commit()
参考
・SecretManager経由の接続
PostgreSQL with AWS Lambda using Python
・インポート部分
copy data from csv to postgresql using python