Open gsy0911 opened 3 years ago
import boto3 import json import s3fs import pandas as pd from typing import Union class FileManipulate: fs = s3fs.S3FileSystem(anon=False) @staticmethod def _decode_path(s3_full_path: str) -> (str, str): """ S3バケットとprefixに分割するために最大分割数を3に設定している。このとき、 split_path[0] = "s3:" split_path[1] = "" <- 空文字 Parameters ---------- s3_full_path Returns ------- """ split_path = s3_full_path.split("/", maxsplit=3) return split_path[2], split_path[3] @staticmethod def _write_s3_file(s3bucket: str, prefix: str, body: str): """ Args: s3bucket: prefix: body: Returns: """ s3 = boto3.resource('s3') bucket = s3.Bucket(s3bucket) obj = bucket.Object(f"{prefix}") response = obj.put( Body=body.encode('utf-8'), ContentEncoding='utf-8', ContentType='text/plane' ) return response @staticmethod def write_json(s3_path: str, payload: dict): """ """ s3bucket, prefix = FileManipulate._decode_path(s3_path) return FileManipulate._write_s3_file( s3bucket=s3bucket, prefix=prefix, body=json.dumps(payload)) @staticmethod def _read_s3_file(s3bucket: str, prefix: str) -> Union[dict, str]: """ S3のファイルを読み書きする関数。 :params s3bucket: :params prefix: """ s3 = boto3.resource('s3') content_object = s3.Object(s3bucket, prefix) return content_object.get()['Body'].read() @staticmethod def read_json(s3_path: str) -> dict: """ jsonを取得する関数。 :params s3_path: """ s3bucket, prefix = FileManipulate._decode_path(s3_path) return json.loads(FileManipulate._read_s3_file(s3bucket=s3bucket, prefix=prefix)) @staticmethod def remove_s3_file(s3_path: str) -> bool: try: FileManipulate.fs.rm(s3_path) return True except FileNotFoundError: return False @staticmethod def get_json_file_list(s3_parent_path: str, length: Union[int, None]) -> list: file_list = [f"s3://{f}" for f in FileManipulate.fs.ls(s3_parent_path, refresh=True) if f.endswith(".json")] file_list.sort() if length is None: return file_list elif length > 0: return file_list[:length] return file_list @staticmethod def get_csv_file_list(s3_parent_path: str, length: Union[int, None]) -> list: file_list = [f"s3://{f}" for f in FileManipulate.fs.ls(s3_parent_path, refresh=True) if f.endswith(".csv")] file_list.sort() if length is None: return file_list elif length > 0: return file_list[:length] return file_list @staticmethod def json_file_list_to_df(file_list: list) -> pd.DataFrame: df_list = list() for f in file_list: df_list.append(FileManipulate.json_file_to_df(f)) return pd.concat(df_list) @staticmethod def json_file_to_df(file_name: str) -> pd.DataFrame: tmp_dict = FileManipulate.read_json(file_name) return pd.DataFrame.from_dict(tmp_dict, orient="index").T