RDSのデータをS3にエクスポートするLambdaという「あるある」パターン。DBはPostgreSQL、Python3.9、エラー処理抜き。

 

前提条件

  1. RDSに接続するため、Lambdaの配置はVPCとする。

  2. PostgreSQL接続時に外部ライブラリのpsycopg2が必要となる。レイヤーに登録してから定義すればよいがちょっとハマりどころがある。

  3. 上記に加えてaws_s3拡張機能のインストールが必要。

DBに接続して以下コマンドを実行する。

CREATE EXTENSION aws_s3CASCADE;

 

  1. RDS用のIAMロール作成

実行するLambda用のIAMロールとは別に、RDSにアタッチするロールを作成する。

必要なアクション

・s3.PutObject
・s3.AbortMultipartUpload

resourceとして格納先バケットを指定(arn:aws:s3:::バケット名/*)

ロールを作成したらDBインスタンスにアタッチする。RDSの対象DBの画面で「接続とセキュリティ」タブをスクロールして、「IAMロールの管理」を表示。作成したIAMロールと、機能:s3Exportを指定して「ロールの追加」。(簡単にできそうだと思ってたけど前提が結構面倒くさかった…)

 

ここまできて本題のコード。前回同様、認証情報はシークレットマネージャに格納して取得する方式。スキーマやテーブル名他名称部分は適宜置き換えで。あとDB接続時は本来with構文にする方が望ましい。

 

import json
import boto3
import psycopg2
from datetime import datetime, date, time, timedelta

# 認証情報をシークレットマネージャーから取得
def getCredentials():
    credential = {}
    
    secret_name = "postgres/mysecretname"
    region_name = "ap-northeast-1"
    
    client = boto3.client(
      service_name='secretsmanager',
      region_name=region_name
    )
    
    get_secret_value_response = client.get_secret_value(
      SecretId=secret_name
    )
    
    secret = json.loads(get_secret_value_response['SecretString'])
    
    credential['username'] = secret['username']
    credential['password'] = secret['password']
    credential['host'] = "RDS endpoint URL"
    credential['db'] = "databasename"
    
    return credential

def lambda_handler(event, context):
  # バケットとテーブル定義
  bucket = 'my bucket name'
  tbl = 'mytbl'
  
  # 日付情報取得
  yesterday = datetime.combine(date.today()-timedelta(1),time())
  today = datetime.combine(date.today(),time())
  
  # s3パス定義
  key1 = yesterday.strftime("%Y")
  key2 = yesterday.strftime("%m")
  key3 = yesterday.strftime("%%d")
  s3_path = key1 + "/" + key2 + "/" + key3 + 'tbl-export.txt'
  
  # DB接続
  credential = getCredentials()
  connection = psycopg2.connect(user=credential['username'], password=credential['password'], host=credential['host'], database=credential['db'])
  cursor = connection.cursor()
  
  # クエリ定義
  query = f''''select * from aws_s3.query_export_to_s3(
    'select * from public.{tbl}',
    aws_commons.create_s3_uri(
      '{bucket}',
      '{s3_path}',
      'ap-northeast-1'
    )
  ))'''[1:-1]

  # クエリ実行(エクスポートが実行される)
  cursor.excute(query)
  
  # 接続クローズ
  cursor.close()
  connection.commit()

 

参考までに、以下はDBから直接クエリ実行する場合。コードではpythonから実行するため、query変数を定義してヒアドキュメントでクエリを記述。また、f-stringsによる変数展開を行うため記述が多少異なる形になった。

 

select * from aws_s3.query_export_to_s3(
    'select * from public.data_tbl',
    aws_commons.create_s3_uri(
      'bucketname',
      '2023/09/01/rds_export_data.csv',
      'ap-northeast-1'
    )
  );

 

ちなみにここではテーブルデータ全部を引っこ抜いているが、一定の時刻範囲で抽出したい場合はクエリを調整すればよいかと。(当然timestamp列が存在する前提)

 


関連がありそうな記事