S3上のCSVファイルをRDS(PostgreSQL)にインポートするという「あるある」パターンのLambda。(Python3.9, エラー処理抜き)

RDSのデータをS3にエクスポートする逆パターンについては以下記事参照。
RDS(PostgreSQL)のデータをS3にエクスポートするLambda

 

S3イベントがトリガーでLambdaが起動し、S3バケット内のCSVを一時領域にダウンロードしてRDSにインポートという流れ。DB認証情報はSecretManagerに格納して取得している。(認証取得は環境変数セットでもいけるけど昨今の流れ的にはSecret格納が優勢ということで)

 

前提条件

  1. RDSに接続するため、Lambdaの配置はVPCとする。
  2. PostgreSQL接続時に外部ライブラリのpsycopg2が必要となる。Lambdaのレイヤーに登録してからコード内で定義する。(今回詳細手順は割愛)
import json
import boto3
import psycopg2

# SecretManagerからRDS/DB認証情報取得
# secret_name は実際に作成したシークレットを指定
def getCredentials():
    credential = {}
    
    secret_name = "secretname"
    region_name = "ap-northeast-1"
    
    client = boto3.client(
      service_name='secretsmanager',
      region_name=region_name
    )
    
    get_secret_value_response = client.get_secret_value(
      SecretId=secret_name
    )
    
    secret = json.loads(get_secret_value_response['SecretString'])
    
    credential['username'] = secret['username']
    credential['password'] = secret['password']
    credential['host'] = "RDS endpoint URL"
    credential['db'] = "databasename"
    
    return credential

def lambda_handler(event, context):

  #イベント全体の定義
  data = event
  s = json.dumps(data)
  e = json.loads(s)
  
  #イベントデータからS3部分抽出
  record = e['Records']
  s3_event_record = (record[0]['s3']
  
  #バケット名の定義
  bucket = s3_event_record['bucket']['name']
  print('bucket name:' + bucket)
  
  #オブジェクト名の定義(prefixを含む)
  key = s3_event_record['object']['key']
  print('prefix:' + key)
  
  #オブジェクトのファイル名のみ抽出(ダウンロード時に使用)
  path = key.split('/')
  file = path[-1] #区切り文字で区切った最後の文字列を取得
  tmpfile = '/tmp' + file
  
  #バケット上のCSVを/tmp/ファイル名.csvとして一時領域にダウンロード
  s3 = boto3.resource('s3')
  s3.Bucket(bucket).download_file(key, tmpfile)
  
  # DB接続
  credential = getCredentials()
  connection = psycopg2.connect(user=credential['username'], password=credential['password'], host=credential['host'], database=credential['db'])
  cursor = connection.cursor()
  
  #ダウンロードしたCSVをDBにインポート。sepは区切り文字指定。
  with open(tmpfile, 'r') as f:
    cursor.copy_from(f, 'data_tbl', sep=',')

  connection.commit()

 

参考
・SecretManager経由の接続
PostgreSQL with AWS Lambda using Python

・インポート部分
copy data from csv to postgresql using python

 


関連がありそうな記事