Closed gsy0911 closed 1 year ago
import logging
import pandas as pd
import requests
from bs4 import BeautifulSoup
import os
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ANALYSIS_PATH_FORMAT = "s3://{}/type={}/span={}/dt={}/method={}.csv"
RECOMMENDED_PATH_FORMAT = "s3://{}/type={}/span={}/dt={}.csv"
KABUTAN_BRAND_INFO_URL = 'https://kabutan.jp/stock/?code={}'
class EndDateNoInArgumentException(Exception):
# end_dateがeventの中にない場合に生じる例外
pass
class NoMethodResultsException(Exception):
pass
def replace_str_for_company_performance(input_str: str) -> str:
"""
htmlの不要なtagを除去する関数
:param input_str:
:return:
"""
return input_str.replace("<td>", "").replace("<span>倍</span></td>", "").replace("<span>%</span></td>", "")
def replace_str_for_company_performance_index(input_str: str) -> str:
"""
htmlの不要なtagを除去する関数
:param input_str:
:return:
"""
return input_str.replace('<th scope="col">', "")\
.replace("</th>", "").replace('<abbr title="Price Earnings Ratio">', "")\
.replace('<abbr title="Price Book-value Ratio">', "").replace("</abbr>", "")
def replace_str_for_stock_prices_trend(input_str: str) -> str:
"""
htmlの不要なtagを除去する関数
:param input_str:
:return:
"""
return input_str.replace('<td>', "").replace("</td>", "")\
.replace('</span>%', "").replace('<span class="up">', "").replace('<span class="down">', "")
def get_specific_stock_info(code) -> dict:
"""
株探のサイトより
PER, PBR, 利回り, 信用倍率, 5, 25, 75, 200日線を取得する関数
:param code:
:return:
"""
target_url = KABUTAN_BRAND_INFO_URL.format(code)
# requestsを使って、webから取得
r = requests.get(target_url)
if r.status_code != 200:
return {}
# beautiful soupでparseする
whole_page = BeautifulSoup(r.text, 'lxml')
# 株情報を保持する辞書
stock_info_dict = dict()
# {'PER': '10.6', 'PBR': '0.85', '利回り': '2.18', '信用倍率': '6.08'}
company_performance_table = whole_page.find('div', id="stockinfo_i3")
if company_performance_table is None:
return {"code": code}
index = company_performance_table.find_all('th')
value = company_performance_table.find_all('td')
for idx, val in zip(index, value):
stock_info_dict[replace_str_for_company_performance_index(str(idx))] = replace_str_for_company_performance(
str(val))
# {'5日線': '+0.80', '25日線': '+7.44', '75日線': '+15.74', '200日線': '+6.92'}
stock_prices_trend = whole_page.find('div', class_="kabuka_trend")
if stock_prices_trend is None:
return {"code": code}
stock_prices_trend_table = stock_prices_trend.find_all("tr")
# ただしく情報を取得できない場合はreturn
if len(stock_prices_trend_table) < 4:
return stock_info_dict
index = stock_prices_trend_table[2].find_all("td")
value = stock_prices_trend_table[3].find_all("td")
for idx, val in zip(index, value):
stock_info_dict[replace_str_for_stock_prices_trend(str(idx))] = replace_str_for_stock_prices_trend(str(val))
# {'brand_name': '**', 'market': 'market', 'purchase_unit': int, 'industry': '業種', 'close': '終値'}
brand_basic_info = whole_page.find('div', class_="si_i1_1")
if brand_basic_info is None:
return {"code": code}
brand_name_with_code = brand_basic_info.find("h2")
brand_name_with_code.find("span").extract()
stock_info_dict['brand_name'] = brand_name_with_code.text
stock_info_dict['market'] = brand_basic_info.find("span").text
if stock_info_dict['market'] == "東証E":
return {"code": code}
brand_basic_stock_info = whole_page.find('div', id="stockinfo_i2")
if brand_basic_stock_info is None:
return {"code": code}
stock_info_dict['industry'] = brand_basic_stock_info.find("a").text
stock_info_dict['purchase_unit'] = brand_basic_stock_info.find_all("dd")[1].text
brand_basic_info = whole_page.find('div', class_="si_i1_2")
if brand_basic_info is None:
return {"code": code}
close = brand_basic_info.find("span", class_="kabuka").text
stock_info_dict['close'] = close.replace("円", "").replace(",", "")
return stock_info_dict
def summation_all_results(x, method_info_list: list) -> float:
summation_result = 0.0
for method_info in method_info_list:
method_name = method_info['method_name']
method_ratio = method_info['method_ratio']
summation_result += method_ratio * x[method_name]
return summation_result
def summary_result(event: dict, *, logger=None):
if logger is None:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.info(f"event := {event}")
if 'end_date' not in event:
raise EndDateNoInArgumentException
if 'analysis_parallel_results' not in event:
raise NoMethodResultsException
end_date = event['end_date']
analysis_parallel_result = event['analysis_parallel_results']
path_type = "jp-analysis"
path_span = "daily"
# dfとして読み込むリストの作成
stock_df_list = []
for method_info in analysis_parallel_result:
method_name = method_info['method_name']
s3_path = ANALYSIS_PATH_FORMAT.format(os.environ['s3_bucket'], path_type, path_span, end_date, method_name)
logger.info(s3_path)
tmp_df = pd.read_csv(s3_path)
# 評価値を手法名で抽出しておく
tmp_df[method_name] = tmp_df['score_sigmoid']
# 計算に必要な列のみ抽出
tmp_df = tmp_df[["code", method_name]]
stock_df_list.append(tmp_df)
logger.info(tmp_df.head())
analysis_df = stock_df_list[0]
for tmp_df in stock_df_list[1:]:
logger.info(f"processing:= {tmp_df.head()}")
analysis_df = analysis_df.merge(tmp_df, on="code", how="outer").fillna(0)
analysis_df['mixed_score'] = analysis_df.apply(lambda x: summation_all_results(x, analysis_parallel_result), axis=1)
logger.info(analysis_df.head())
# sigmoidで正規化した値で降順ソートする
analysis_df['mixed_score'] = analysis_df['mixed_score'].astype(float)
analysis_df = analysis_df.sort_values('mixed_score', ascending=False)
# 50行を取得し、この30行に対して市場などの情報を付与する
analysis_df = analysis_df.head(30)
analysis_df['data'] = analysis_df['code'].map(lambda x: get_specific_stock_info(x))
# {'PER': '10.6', 'PBR': '0.85', '利回り': '2.18', '信用倍率': '6.08'}
# {'5日線': '+0.80', '25日線': '+7.44', '75日線': '+15.74', '200日線': '+6.92'}
# {'brand_name': '**', 'market': 'market', 'purchase_unit': int, 'industry': '業種', 'close': '終値'}
data_key_list = ['PER', 'PBR', '利回り', '信用倍率', '5日線', '25日線', '75日線', '200日線',
'brand_name', 'market', 'purchase_unit', 'industry', 'close']
for data_key in data_key_list:
analysis_df[data_key] = analysis_df['data'].map(lambda x: x.get(data_key, "-"))
# 取得後は不要なdata列は削除
analysis_df = analysis_df.drop('data', axis=1)
# データを取得できなかった列も削除
analysis_df = analysis_df[analysis_df['market'] != "-"]
# 結果をlistとして出力
code_list = analysis_df.head(10).to_dict(orient='records')
event['code_list'] = code_list
# 結果をS3に保存する(今後の評価のため)
path_type = "jp-recommended"
path_span = "daily"
analysis_df.head(10).to_csv(RECOMMENDED_PATH_FORMAT.format(os.environ['s3_bucket'], path_type, path_span, end_date), index=False)
return event
完了の定義
以下の内容を追加する?