Open kirin-ri opened 1 month ago
{
"physical_name": "PARTSDELIVERY_ONTIMEDELIVERYRATE_WEEKLY_DELAYDAYS",
"logical_name": "納期遵守率(ETD):週次",
"provide": false,
"tags": [
"SCM",
"部品納期管理",
"納期遵守率",
"週次"
],
"category": "部品納期管理",
"description": "サプライヤの部品納期回答精度(週次)として、遅延日数の平均値、最大値、最小値、中央値、標準偏差を算出",
"processing": "「ETD: 一次部品_発注計画_納期回答」.「出荷日」ー「ATD:一次サプライヤ_一次部品_出荷実績」.「出荷日」\nを計算し、カラム「遅延日数」に追加する。\nまた、この際に値が0未満となる場合は0とする\n<hr>\n「ATD:一次サプライヤ_一次部品_出荷実績」.「出荷元:一次サプライヤ」、\n「ATD:一次サプライヤ_一次部品_出荷実績」.「工場名」\nをキーに集計し、\n行数と「遅延日数」の最大値、最小値、平均値、中央値、標準偏差を算出して\n「行数」、\n「遅延日数:最大値」、\n「遅延日数:最小値」、\n「遅延日数:平均値」、\n「遅延日数:中央値」、\n「遅延日数:標準偏差」に格納する\n<hr>\n「納入日」の年月日を格納する\n<hr>\n集計キーとして\n「ATD:一次サプライヤ_一次部品_出荷実績」.「出荷日」\nを使用していないため、単体で「基準日」に格納する\n<hr>\nAnaplan用に\n「ATD:一次サプライヤ_一次部品_出荷実績」.「出荷元:一次サプライヤ」、\n「ATD:一次サプライヤ_一次部品_出荷実績」.「工場名」、\n「ATD:一次サプライヤ_一次部品_出荷実績」.「出荷日」の年月日\nを_区切りで結合し「UID」に格納する\n<hr>\n「ETDとATDの遅延日数(週次)」にINSERTする",
"graph_title": [
"平均値、最大値、最小値、中央値",
"平均値 ± 標準偏差"
],
"metrics_pattern": "max,min,avg,median,stddev",
"instant_calc_pattern": "yyyymmdd",
"cycle_group": "納期遵守率(ETD)",
"cycle": "週次",
"in_info": [
{
"schema": "DATASET_SCHEMA",
"table": "t1_priprt_shpt",
"column": [
{
"physical_name": "SUPPLIER_NAME",
"logical_name": "出荷元_一次サプライヤ",
"type": "文字列"
},
{
"physical_name": "FACTORY_NAME",
"logical_name": "出荷元_生産拠点",
"type": "文字列"
},
{
"physical_name": "PURCHASE_ORDER_NO",
"logical_name": "PO_注文No",
"type": "文字列"
},
{
"physical_name": "PARTS_NO",
"logical_name": "一次部品_部品番号",
"type": "文字列"
},
{
"physical_name": "SHIPMENT_ATD",
"logical_name": "納入日",
"type": "日付"
}
]
},
{
"schema": "DATASET_SCHEMA",
"table": "t1_priprt_fcsp_reply",
"column": [
{
"physical_name": "PURCHASE_ORDER_NO",
"logical_name": "PO_注文No",
"type": "文字列"
},
{
"physical_name": "FACTORY_NAME",
"logical_name": "発注元_生産拠点_工場",
"type": "文字列"
},
{
"physical_name": "REPLY_ETD",
"logical_name": "納入日",
"type": "日付"
}
]
}
],
"out_info": [
{
"schema": "METRICS_SCHEMA",
"table": "PARTSDELIVERY_ONTIMEDELIVERYRATE_WEEKLY_DELAYDAYS",
"column": [
{
"physical_name": "SUPPLIER_NAME",
"logical_name": "サプライヤ名",
"type": "文字列"
},
{
"physical_name": "FACTORY_NAME",
"logical_name": "工場名",
"type": "文字列"
},
{
"physical_name": "REF_DATE",
"logical_name": "基準日",
"type": "文字列"
},
{
"physical_name": "COUNT",
"logical_name": "件数",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_MAX",
"logical_name": "遅延日数:最大値",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_MIN",
"logical_name": "遅延日数:最小値",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_AVG",
"logical_name": "遅延日数:平均値",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_MEDIAN",
"logical_name": "遅延日数:中央値",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_STDDEV",
"logical_name": "遅延日数:標準偏差",
"type": "数値"
},
{
"physical_name": "DELAY_DAYS_STDDEV",
"logical_name": "ユニークキー",
"type": "文字列"
},
{
"physical_name": "JOB_ID",
"logical_name": "ジョブID",
"type": "文字列"
}
]
}
],
"in_mapping": null,
"calc_options": null,
"repository_name": "partsDelivery_ontimeDeliveryRate_weekly_delayDays",
"version": "0.0.1",
"last_update": "2023-05-15"
}
from distutils.util import strtobool
import json
import os
import traceback
import logging
from tools.snowflakeAccessor import SnowflakeAccessor
from werkzeug.exceptions import InternalServerError
ENABLE_DPL_API = bool(strtobool(os.getenv("ENABLE_DPL_API", 'true')))
# logger初期化
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# APIメタ情報全件取得
def _getAllApiMeta():
logger.info('_getAllApiMeta start')
sf = SnowflakeAccessor()
out = []
# APIメタデータ取得クエリ
query = """
SELECT
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
A.PRESET,
array_agg(
B.ID
)
WITHIN
GROUP(
ORDER BY B.ID
) RELATED_METRICS
FROM
API_META_INFO A
LEFT OUTER JOIN(
SELECT
tmp.ID,
item.value API_NAME
FROM
CATALOG_META_INFO tmp,
LATERAL FLATTEN(input => tmp.API_NAME) item
) B
ON
A.API_PHYSICAL_NAME = B.API_NAME
GROUP BY
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
A.PRESET
ORDER BY
A.API_PHYSICAL_NAME
"""
try:
result = sf.execute(query)
for (physical_name,
logical_name,
provide,
items,
tags,
preset,
related_metrics) in result:
outinfo = []
column = []
try:
for item in json.loads(items):
outinfo.append(
{
"physicalName": item["PHYSICAL_NAME"],
"logicalName": item["LOGICAL_NAME"],
"type": item["TYPE"],
"required": item["REQUIRED"],
}
)
except TypeError:
outinfo.append(None)
column.append({"column": outinfo})
if tags == "[]" or not tags:
tags = '["その他"]'
out.append(
{
"related_metrics": json.loads(related_metrics),
"physical_name": physical_name,
"logical_name": logical_name,
"provide": provide,
"outinfo": column,
"tags": json.loads(tags),
"preset": preset
}
)
except Exception as e:
logger.error(f"{e}", exc_info=True)
logger.info('error _getAllApiMeta')
traceback.print_exc()
raise InternalServerError
finally:
sf.close()
logger.info('_getAllApiMeta end')
return out
# IN_MAPPING情報取得
def _getAllInMapping():
logger.info('_getAllInMapping start')
obj = SnowflakeAccessor()
res = []
try:
r = obj.execute(
"""
SELECT
IN_MAPPING
FROM
CATALOG_META_INFO
WHERE
IN_MAPPING <> to_variant({});
"""
)
for in_mapping in r:
res.append(json.loads(in_mapping[0]))
except Exception as e:
logger.error(f"{e}", exc_info=True)
logger.info('error _getAllInMapping')
traceback.print_exc()
raise InternalServerError
finally:
obj.close()
logger.info('_getAllInMapping end')
return res
# IN_MAPPINGから「必須の集計キー」と「計算処理で使用する項目」を抽出
def _getUndeletableItems(mappingInfo: list):
logger.info('_getUndeletableItems start')
tmpData = {}
try:
# マッピング情報ループ
for tmpMapping in mappingInfo:
# 各メトリクスのINPUT(API,メトリクス)分ループ
for tmpInput in tmpMapping:
tmpDataVal = []
if tmpInput['api'] is not None:
# 集計キー
for keyOrder in tmpInput['keyOrder']:
# 集計キーかつ必須項目
if keyOrder['required']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and keyOrder['physicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(keyOrder['physicalName'])
elif keyOrder['physicalName'] not in tmpDataVal:
tmpDataVal.append(keyOrder['physicalName'])
# 計算処理で使用する項目
for value in tmpInput['values']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and value['inPhysicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(value['inPhysicalName'])
elif value['inPhysicalName'] not in tmpDataVal:
tmpDataVal.append(value['inPhysicalName'])
if tmpDataVal:
tmpData[tmpInput['api']] = tmpDataVal
except Exception as e:
logger.info('error _getUndeletableItems')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_getUndeletableItems end')
return tmpData
# 各API情報に削除不可カラム情報を追加
def _addUndeletableItemsInfo(out: list, undeletableItems: dict):
logger.info('_addUndeletableItemsInfo start')
try:
# 削除不可カラム情報ループ
for idx, apiInfo in enumerate(out):
if apiInfo['physical_name'] in undeletableItems:
out[idx]['undeletableCol'] = undeletableItems[apiInfo['physical_name']]
else:
out[idx]['undeletableCol'] = []
except Exception as e:
logger.info('error _addUndeletableItemsInfo')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_addUndeletableItemsInfo end')
return out
# メイン処理
def getApiList():
try:
# 表示APIリスト取得
out = _getAllApiMeta()
# メトリクス処理マッピング情報取得
mappingInfo = _getAllInMapping()
# メトリクス処理マッピング情報から、「必須の集計キー」と「計算処理で使用する項目」を抽出
undeletableItems = _getUndeletableItems(mappingInfo)
# 各API情報に削除不可カラム情報を追加
out = _addUndeletableItemsInfo(out, undeletableItems)
except Exception:
logger.info('error getApiList')
raise InternalServerError
return {
"api": out,
"enableDplApi": ENABLE_DPL_API
}
from arango import ArangoClient
# APIメタ情報全件取得
def _getAllApiMeta():
logger.info('_getAllApiMeta start')
sf = SnowflakeAccessor()
out = []
# ArangoDBからカタログメタデータ取得
# ArangoDBクライアント初期化
client = ArangoClient()
# データベース接続
db = client.db('your_db_name', username='your_username', password='your_password')
collection = db.collection('metricsnode')
# ArangoDBからデータ取得
cursor = db.aql.execute('FOR doc IN metricsnode RETURN {id: doc._key, api_name: doc.physical_name}')
catalog_meta_data = []
for doc in cursor:
catalog_meta_data.append({
"id": doc["id"], # ArangoDBのドキュメントID
"api_name": doc["api_name"] # API物理名
})
# APIメタデータ取得クエリ (Snowflakeから)
query = """
SELECT
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
A.PRESET
FROM
API_META_INFO A
ORDER BY
A.API_PHYSICAL_NAME
"""
try:
result = sf.execute(query)
for (physical_name,
logical_name,
provide,
items,
tags,
preset) in result:
# ArangoDBデータから関連メトリクスIDを取得
related_metrics = [
meta["id"] for meta in catalog_meta_data if meta["api_name"] == physical_name
]
outinfo = []
column = []
try:
for item in json.loads(items):
outinfo.append(
{
"physicalName": item["PHYSICAL_NAME"],
"logicalName": item["LOGICAL_NAME"],
"type": item["TYPE"],
"required": item["REQUIRED"],
}
)
except TypeError:
outinfo.append(None)
column.append({"column": outinfo})
if tags == "[]" or not tags:
tags = '["その他"]'
out.append(
{
"related_metrics": related_metrics, # ArangoDBからの関連メトリクスID
"physical_name": physical_name,
"logical_name": logical_name,
"provide": provide,
"outinfo": column,
"tags": json.loads(tags),
"preset": preset
}
)
except Exception as e:
logger.error(f"{e}", exc_info=True)
logger.info('error _getAllApiMeta')
traceback.print_exc()
raise InternalServerError
finally:
sf.close()
logger.info('_getAllApiMeta end')
return out
class ArangoDBAccessor(object):
def __init__(
self,
user=ARANGO_USER,
password=ARANGO_PWD,
database=ARANGO_DB,
url=ARANGO_URL
):
logger.info("--- ArangoDB connect start")
self.client = ArangoClient(hosts=url, verify_override=False)
self.db = self.client.db(database,
username=user,
password=password)
logger.info("--- ArangoDB connect complete")
def execute(self, aql: str, vars={}):
if vars:
self.cur = self.db.aql.execute(aql, bind_vars=vars)
else:
self.cur = self.db.aql.execute(aql)
return self.cur
def beginTransaction(self):
self.tran = self.db.begin_transaction()
def commitTransaction(self):
self.tran.commit_transaction()
def abortTransaction(self):
self.tran.abort_transaction()
def close(self):
if hasattr(self, 'cur'):
self.cur.close(ignore_missing=True)
self.client.close()
logger.info("--- ArangoDB close complete")
def _getAllApiMeta():
logger.info('_getAllApiMeta start')
sf = SnowflakeAccessor()
adb = ArangoDBAccessor() # ArangoDBAccessorをインスタンス化
out = []
# ArangoDBからカタログメタデータを取得
aql_query = 'FOR doc IN metricsnode RETURN {id: doc._key, api_name: doc.physical_name}'
try:
catalog_meta_data = []
arango_result = adb.execute(aql_query)
for doc in arango_result:
catalog_meta_data.append({
"id": doc["id"], # ArangoDBのドキュメントID
"api_name": doc["api_name"] # API物理名
})
except Exception as e:
logger.error(f"ArangoDB error: {e}", exc_info=True)
raise InternalServerError
# APIメタデータ取得クエリ (Snowflakeから)
query = """
SELECT
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
A.PRESET
FROM
API_META_INFO A
ORDER BY
A.API_PHYSICAL_NAME
"""
try:
result = sf.execute(query)
for (physical_name,
logical_name,
provide,
items,
tags,
preset) in result:
# ArangoDBデータから関連メトリクスIDを取得
related_metrics = [
meta["id"] for meta in catalog_meta_data if meta["api_name"] == physical_name
]
outinfo = []
column = []
try:
for item in json.loads(items):
outinfo.append(
{
"physicalName": item["PHYSICAL_NAME"],
"logicalName": item["LOGICAL_NAME"],
"type": item["TYPE"],
"required": item["REQUIRED"],
}
)
except TypeError:
outinfo.append(None)
column.append({"column": outinfo})
if tags == "[]" or not tags:
tags = '["その他"]'
out.append(
{
"related_metrics": related_metrics, # ArangoDBからの関連メトリクスID
"physical_name": physical_name,
"logical_name": logical_name,
"provide": provide,
"outinfo": column,
"tags": json.loads(tags),
"preset": preset
}
)
except Exception as e:
logger.error(f"Snowflake error: {e}", exc_info=True)
logger.info('error _getAllApiMeta')
traceback.print_exc()
raise InternalServerError
finally:
sf.close()
adb.close() # ArangoDB接続をクローズ
logger.info('_getAllApiMeta end')
return out
env/lib/python3.9/site-packages/flask/cli.py"
(env) (base) q_li@vm-I-DNA-daas-2:~/Desktop/catalog-web-app/server$ flask run -p 5522
Usage: flask run [OPTIONS]
Try 'flask run --help' for help.
Error: While importing 'app', an ImportError was raised:
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/flask/cli.py", line 218, in locate_app
__import__(module_name)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/app.py", line 13, in <module>
from ap.customizeMetrics import customizeMetrics
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/customizeMetrics.py", line 15, in <module>
from tools.metricsUtils import (
ImportError: cannot import name 'updateParam' from 'tools.metricsUtils' (/home/uenv/q_li/Desktop/catalog-web-app/server/tools/metricsUtils.py)``
[2024-10-18 04:04:07,018] INFO in app: getMetricsList
2024-10-18 04:04:07,018 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb getMetricsList
2024-10-18 04:04:07,018 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb --- snowflake connect start
[2024-10-18 04:04:07,022] INFO in app: getApiList
2024-10-18 04:04:07,022 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde getApiList
2024-10-18 04:04:07,022 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde _getAllApiMeta start
2024-10-18 04:04:07,022 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde --- snowflake connect start
2024-10-18 04:04:07,308 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb --- snowflake connect complete
2024-10-18 04:04:07,316 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde --- snowflake connect complete
2024-10-18 04:04:07,316 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde --- ArangoDB connect start
2024-10-18 04:04:07,316 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:04:07,340 ERROR -20241018-1692f176-48f8-469a-8066-6fc96c319fde ArangoDB error: [HTTP 404][ERR 1203] AQL: collection or view not found: metricsnode (while parsing)
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 28, in _getAllApiMeta
arango_result = adb.execute(aql_query)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/tools/arangoDBAccessor.py", line 34, in execute
self.cur = self.db.aql.execute(aql)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/aql.py", line 453, in execute
return self._execute(request, response_handler)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/api.py", line 74, in _execute
return self._executor.execute(request, response_handler)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/executor.py", line 66, in execute
return response_handler(resp)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/aql.py", line 450, in response_handler
raise AQLQueryExecuteError(resp, request)
arango.exceptions.AQLQueryExecuteError: [HTTP 404][ERR 1203] AQL: collection or view not found: metricsnode (while parsing)
2024-10-18 04:04:07,341 INFO -20241018-1692f176-48f8-469a-8066-6fc96c319fde error getApiList
2024-10-18 04:04:07,344 INFO - 127.0.0.1 - - [18/Oct/2024 04:04:07] "GET /api/api HTTP/1.1" 500 -
[2024-10-18 04:04:07,349] INFO in app: getApiList
2024-10-18 04:04:07,349 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 getApiList
2024-10-18 04:04:07,349 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 _getAllApiMeta start
2024-10-18 04:04:07,349 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 --- snowflake connect start
2024-10-18 04:04:07,432 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb --- ArangoDB connect start
2024-10-18 04:04:07,432 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:04:07,463 INFO -20241018-0a925efc-25c1-487d-9a97-b8d6b73653cb --- ArangoDB close complete
2024-10-18 04:04:07,514 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 --- snowflake connect complete
2024-10-18 04:04:07,514 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 --- ArangoDB connect start
2024-10-18 04:04:07,514 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 --- ArangoDB connect complete
2024-10-18 04:04:07,526 INFO - 127.0.0.1 - - [18/Oct/2024 04:04:07] "GET /api/metrics HTTP/1.1" 200 -
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
[2024-10-18 04:04:07,531] INFO in app: getMetricsList
2024-10-18 04:04:07,531 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba getMetricsList
2024-10-18 04:04:07,531 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba --- snowflake connect start
2024-10-18 04:04:07,534 ERROR -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 ArangoDB error: [HTTP 404][ERR 1203] AQL: collection or view not found: metricsnode (while parsing)
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 28, in _getAllApiMeta
arango_result = adb.execute(aql_query)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/tools/arangoDBAccessor.py", line 34, in execute
self.cur = self.db.aql.execute(aql)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/aql.py", line 453, in execute
return self._execute(request, response_handler)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/api.py", line 74, in _execute
return self._executor.execute(request, response_handler)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/executor.py", line 66, in execute
return response_handler(resp)
File "/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/arango/aql.py", line 450, in response_handler
raise AQLQueryExecuteError(resp, request)
arango.exceptions.AQLQueryExecuteError: [HTTP 404][ERR 1203] AQL: collection or view not found: metricsnode (while parsing)
2024-10-18 04:04:07,535 INFO -20241018-7e4a1f16-5d6f-4908-99bf-8f3d53766ba4 error getApiList
2024-10-18 04:04:07,538 INFO - 127.0.0.1 - - [18/Oct/2024 04:04:07] "GET /api/api HTTP/1.1" 500 -
2024-10-18 04:04:07,754 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba --- snowflake connect complete
2024-10-18 04:04:07,896 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba --- ArangoDB connect start
2024-10-18 04:04:07,897 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:04:07,918 INFO -20241018-0b73e809-cbb2-4782-9838-aa39095228ba --- ArangoDB close complete
2024-10-18 04:04:08,011 INFO - 127.0.0.1 - - [18/Oct/2024 04:04:08] "GET /api/metrics HTTP/1.1" 200 -
2024-10-18 04:05:24,436 ERROR -20241018-dc114898-ea49-4149-9db4-77de883dde81 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
2024-10-18 04:05:24,437 INFO -20241018-dc114898-ea49-4149-9db4-77de883dde81 error getApiList
2024-10-18 04:05:24,438 INFO - 127.0.0.1 - - [18/Oct/2024 04:05:24] "GET /api/api HTTP/1.1" 500 -
[2024-10-18 04:05:24,443] INFO in app: getApiList
2024-10-18 04:05:24,443 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb getApiList
2024-10-18 04:05:24,443 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb _getAllApiMeta start
2024-10-18 04:05:24,443 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- snowflake connect start
2024-10-18 04:05:24,606 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- snowflake connect complete
2024-10-18 04:05:24,606 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- ArangoDB connect start
2024-10-18 04:05:24,606 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:05:24,765 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- ArangoDB close complete
2024-10-18 04:05:24,765 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb _getAllApiMeta end
2024-10-18 04:05:24,766 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb _getAllInMapping start
2024-10-18 04:05:24,766 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- snowflake connect start
2024-10-18 04:05:24,921 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb --- snowflake connect complete
2024-10-18 04:05:25,069 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb _getAllInMapping end
2024-10-18 04:05:25,069 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb _getUndeletableItems start
2024-10-18 04:05:25,069 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb error _getUndeletableItems
2024-10-18 04:05:25,069 ERROR -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
2024-10-18 04:05:25,070 INFO -20241018-c44e3dae-ff17-4962-a123-e6dff8b357eb error getApiList
2024-10-18 04:05:25,071 INFO - 127.0.0.1 - - [18/Oct/2024 04:05:25] "GET /api/api HTTP/1.1" 500 -
[2024-10-18 04:13:17,722] INFO in app: getApiList
2024-10-18 04:13:17,722 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 getApiList
2024-10-18 04:13:17,722 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 _getAllApiMeta start
2024-10-18 04:13:17,722 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- snowflake connect start
2024-10-18 04:13:17,942 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- snowflake connect complete
2024-10-18 04:13:17,943 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- ArangoDB connect start
2024-10-18 04:13:17,943 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:13:18,109 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- ArangoDB close complete
2024-10-18 04:13:18,110 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 _getAllApiMeta end
2024-10-18 04:13:18,110 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 _getAllInMapping start
2024-10-18 04:13:18,110 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- snowflake connect start
2024-10-18 04:13:18,304 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 --- snowflake connect complete
2024-10-18 04:13:18,450 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 _getAllInMapping end
2024-10-18 04:13:18,450 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 _getUndeletableItems start
2024-10-18 04:13:18,450 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 error _getUndeletableItems
2024-10-18 04:13:18,450 ERROR -20241018-3c781708-598f-4f0f-b024-62aca3420909 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 146, in _getUndeletableItems
for keyOrder in tmpInput['keyOrder']:
KeyError: 'keyOrder'
2024-10-18 04:13:18,450 INFO -20241018-3c781708-598f-4f0f-b024-62aca3420909 error getApiList
import json
import logging
import os
import traceback
from distutils.util import strtobool
from tools.arangoDBAccessor import ArangoDBAccessor
from tools.snowflakeAccessor import SnowflakeAccessor
from werkzeug.exceptions import InternalServerError
ENABLE_DPL_API = bool(strtobool(os.getenv("ENABLE_DPL_API", 'true')))
# logger初期化
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# APIメタ情報全件取得
def _getAllApiMeta():
logger.info('_getAllApiMeta start')
sf = SnowflakeAccessor()
adb = ArangoDBAccessor() # ArangoDBAccessorをインスタンス化
out = []
# ArangoDBからカタログメタデータを取得
aql_query = 'FOR doc IN metrics_node RETURN {id: doc._key, api_name: doc.physical_name}'
try:
catalog_meta_data = []
arango_result = adb.execute(aql_query)
for doc in arango_result:
catalog_meta_data.append({
"id": doc["id"], # ArangoDBのドキュメントID
"api_name": doc["api_name"] # API物理名
})
except Exception as e:
logger.error(f"ArangoDB error: {e}", exc_info=True)
raise InternalServerError
# APIメタデータ取得クエリ (Snowflakeから)
query = """
SELECT
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
A.PRESET
FROM
API_META_INFO A
ORDER BY
A.API_PHYSICAL_NAME
"""
try:
result = sf.execute(query)
for (physical_name,
logical_name,
provide,
items,
tags,
preset) in result:
# ArangoDBデータから関連メトリクスIDを取得
related_metrics = [
meta["id"] for meta in catalog_meta_data if meta["api_name"] == physical_name
]
outinfo = []
column = []
try:
for item in json.loads(items):
outinfo.append(
{
"physicalName": item["PHYSICAL_NAME"],
"logicalName": item["LOGICAL_NAME"],
"type": item["TYPE"],
"required": item["REQUIRED"],
}
)
except TypeError:
outinfo.append(None)
column.append({"column": outinfo})
if tags == "[]" or not tags:
tags = '["その他"]'
out.append(
{
"related_metrics": related_metrics, # ArangoDBからの関連メトリクスID
"physical_name": physical_name,
"logical_name": logical_name,
"provide": provide,
"outinfo": column,
"tags": json.loads(tags),
"preset": preset
}
)
except Exception as e:
logger.error(f"Snowflake error: {e}", exc_info=True)
logger.info('error _getAllApiMeta')
traceback.print_exc()
raise InternalServerError
finally:
sf.close()
adb.close() # ArangoDB接続をクローズ
logger.info('_getAllApiMeta end')
return out
# IN_MAPPING情報取得
def _getAllInMapping():
logger.info('_getAllInMapping start')
obj = SnowflakeAccessor()
res = []
try:
r = obj.execute(
"""
SELECT
IN_MAPPING
FROM
CATALOG_META_INFO
WHERE
IN_MAPPING <> to_variant({});
"""
)
for in_mapping in r:
res.append(json.loads(in_mapping[0]))
except Exception as e:
logger.error(f"{e}", exc_info=True)
logger.info('error _getAllInMapping')
traceback.print_exc()
raise InternalServerError
finally:
obj.close()
logger.info('_getAllInMapping end')
return res
# IN_MAPPINGから「必須の集計キー」と「計算処理で使用する項目」を抽出
def _getUndeletableItems(mappingInfo: list):
logger.info('_getUndeletableItems start')
tmpData = {}
try:
# マッピング情報ループ
for tmpMapping in mappingInfo:
# 各メトリクスのINPUT(API,メトリクス)分ループ
for tmpInput in tmpMapping:
tmpDataVal = []
if tmpInput['api'] is not None:
# 集計キー
for keyOrder in tmpInput['keyOrder']:
# 集計キーかつ必須項目
if keyOrder['required']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and keyOrder['physicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(keyOrder['physicalName'])
elif keyOrder['physicalName'] not in tmpDataVal:
tmpDataVal.append(keyOrder['physicalName'])
# 計算処理で使用する項目
for value in tmpInput['values']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and value['inPhysicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(value['inPhysicalName'])
elif value['inPhysicalName'] not in tmpDataVal:
tmpDataVal.append(value['inPhysicalName'])
if tmpDataVal:
tmpData[tmpInput['api']] = tmpDataVal
except Exception as e:
logger.info('error _getUndeletableItems')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_getUndeletableItems end')
return tmpData
# 各API情報に削除不可カラム情報を追加
def _addUndeletableItemsInfo(out: list, undeletableItems: dict):
logger.info('_addUndeletableItemsInfo start')
try:
# 削除不可カラム情報ループ
for idx, apiInfo in enumerate(out):
if apiInfo['physical_name'] in undeletableItems:
out[idx]['undeletableCol'] = undeletableItems[apiInfo['physical_name']]
else:
out[idx]['undeletableCol'] = []
except Exception as e:
logger.info('error _addUndeletableItemsInfo')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_addUndeletableItemsInfo end')
return out
# メイン処理
def getApiList():
try:
# 表示APIリスト取得
out = _getAllApiMeta()
# メトリクス処理マッピング情報取得
mappingInfo = _getAllInMapping()
# メトリクス処理マッピング情報から、「必須の集計キー」と「計算処理で使用する項目」を抽出
undeletableItems = _getUndeletableItems(mappingInfo)
# 各API情報に削除不可カラム情報を追加
out = _addUndeletableItemsInfo(out, undeletableItems)
except Exception:
logger.info('error getApiList')
raise InternalServerError
return {
"api": out,
"enableDplApi": ENABLE_DPL_API
}
# IN_MAPPINGから「必須の集計キー」と「計算処理で使用する項目」を抽出
def _getUndeletableItems(mappingInfo: list):
logger.info('_getUndeletableItems start')
tmpData = {}
try:
# マッピング情報ループ
for tmpMapping in mappingInfo:
# 各メトリクスのINPUT(API,メトリクス)分ループ
for tmpInput in tmpMapping:
tmpDataVal = []
if tmpInput['api'] is not None:
# 'keyOrder'キーが存在するか確認
if 'keyOrder' in tmpInput:
# 集計キー
for keyOrder in tmpInput['keyOrder']:
# 集計キーかつ必須項目
if keyOrder['required']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and keyOrder['physicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(keyOrder['physicalName'])
elif keyOrder['physicalName'] not in tmpDataVal:
tmpDataVal.append(keyOrder['physicalName'])
# 'values'キーが存在するか確認
if 'values' in tmpInput:
# 計算処理で使用する項目
for value in tmpInput['values']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and value['inPhysicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(value['inPhysicalName'])
elif value['inPhysicalName'] not in tmpDataVal:
tmpDataVal.append(value['inPhysicalName'])
if tmpDataVal:
tmpData[tmpInput['api']] = tmpDataVal
except Exception as e:
logger.info('error _getUndeletableItems')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_getUndeletableItems end')
return tmpData
2024-10-18 04:15:34,313 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d --- ArangoDB connect complete
/home/uenv/q_li/Desktop/catalog-web-app/server/env/lib/python3.9/site-packages/urllib3/connectionpool.py:1045: InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings
warnings.warn(
2024-10-18 04:15:34,788 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d --- ArangoDB close complete
2024-10-18 04:15:34,789 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d _getAllApiMeta end
2024-10-18 04:15:34,789 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d _getAllInMapping start
2024-10-18 04:15:34,789 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d --- snowflake connect start
2024-10-18 04:15:34,870 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d --- snowflake connect complete
2024-10-18 04:15:34,995 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d _getAllInMapping end
2024-10-18 04:15:34,995 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d _getUndeletableItems start
2024-10-18 04:15:34,995 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d error _getUndeletableItems
2024-10-18 04:15:34,995 ERROR -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d 'inPhysicalName'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 167, in _getUndeletableItems
elif value['inPhysicalName'] not in tmpDataVal:
KeyError: 'inPhysicalName'
Traceback (most recent call last):
File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/getApiList.py", line 167, in _getUndeletableItems
elif value['inPhysicalName'] not in tmpDataVal:
KeyError: 'inPhysicalName'
2024-10-18 04:15:34,995 INFO -20241018-2f207d4d-6a90-487c-9834-ab1fca7f2c7d error getApiList
# IN_MAPPINGから「必須の集計キー」と「計算処理で使用する項目」を抽出
def _getUndeletableItems(mappingInfo: list):
logger.info('_getUndeletableItems start')
tmpData = {}
try:
# マッピング情報ループ
for tmpMapping in mappingInfo:
# 各メトリクスのINPUT(API,メトリクス)分ループ
for tmpInput in tmpMapping:
tmpDataVal = []
if tmpInput['api'] is not None:
# 'keyOrder'キーが存在するか確認
if 'keyOrder' in tmpInput:
# 集計キー
for keyOrder in tmpInput['keyOrder']:
# 集計キーかつ必須項目
if keyOrder['required']:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and keyOrder['physicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(keyOrder['physicalName'])
elif keyOrder['physicalName'] not in tmpDataVal:
tmpDataVal.append(keyOrder['physicalName'])
# 'values'キーが存在するか確認
if 'values' in tmpInput:
# 計算処理で使用する項目
for value in tmpInput['values']:
# 'inPhysicalName'キーが存在するか確認
if 'inPhysicalName' in value:
# 既に作成されているAPIデータの場合はそちらに追加
if tmpInput['api'] in tmpData and value['inPhysicalName'] not in tmpData[tmpInput['api']]:
tmpDataVal = tmpData[tmpInput['api']]
tmpDataVal.append(value['inPhysicalName'])
elif value['inPhysicalName'] not in tmpDataVal:
tmpDataVal.append(value['inPhysicalName'])
if tmpDataVal:
tmpData[tmpInput['api']] = tmpDataVal
except Exception as e:
logger.info('error _getUndeletableItems')
logger.error(f"{e}", exc_info=True)
traceback.print_exc()
raise InternalServerError
logger.info('_getUndeletableItems end')
return tmpData
# This program is an implementation sample
import json
import sys
from datetime import datetime
import regex
from moz_sql_parser import parse
from snowflakeAccessor import SnowflakeAccessor
# Snowflake type list
typeList = {
"NUMBER": "数値",
"DECIMAL": "数値",
"NUMERIC": "数値",
"INT": "数値",
"INTEGER": "数値",
"FLOAT": "数値",
"DOUBLE": "数値",
"REAL": "数値",
"FIXED": "数値",
"VARCHAR": "文字列",
"STRING": "文字列",
"TEXT": "文字列",
"BOOLEAN": "論理値",
"DATE": "日付",
"DATETIME": "日付",
"TIMESTAMP": "日付",
}
# 型論理名→Snowflakeデータ型置換用
logTypeList = {
"数値": "NUMBER",
"文字列": "VARCHAR",
"論理値": "BOOLEAN",
"日付": "DATE",
}
def main(metrics: str, env: str, project: str, deployCheck: str):
try:
print("---------- start!")
# input database name
inputDb = f"{project}_{env}".upper()
# metrics path
if "startpack" == project:
targetPath = f"{metrics}/{metrics.lower()}/tasks/{metrics}.py"
verPath = f"{metrics}/{metrics.lower()}/__init__.py"
# Static analysis measures
dbtable = metrics.upper()
else:
# excProject = path.split('__')[1]
# targetPath = f"{path}/{excProject.lower()}/tasks/{excProject}.py"
# verPath = f"{path}/{excProject.lower()}/__init__.py"
targetPath = f"{metrics}/{metrics.lower()}/tasks/{metrics}.py"
verPath = f"{metrics}/{metrics.lower()}/__init__.py"
# Static analysis measures
# dbtable = excProject.upper()
dbtable = metrics.replace(f'{project}_', '').upper()
with open(targetPath) as f:
t = f.read()
t = t.replace('\n', ' ').replace(' ', "")
print("---------- output")
dbtblList = []
pat = r"(writeData).*?(?<rec>\((?:[^\(\)]+|(?&rec))*\))"
mi = regex.finditer(pat, t)
for m in mi:
# print(m.group())
mm = m.group()
sfSchema = (regex.search(r"sfSchema=\"(.*?)\"", mm).group(1))
sfDatabase = (regex.search(r"sfDatabase=\"(.*?)\"", mm).group(1))
tmp = (regex.search(r"dbtable=\"(.*?)\"", mm).group(1))
if (tmp != dbtable):
dbtable = tmp
dbtblList.append(dbtable)
if (len(dbtblList) != 1):
dbtable = dbtblList[0]
print(f"Table: {dbtable}")
# デプロイ必要有無判定確認フラグがTrueの場合はOutのデプロイ状況をメタから取得し、デプロイ済またはメタが存在しない場合はデプロイ必要有の判定で返却
if deployCheck == "true":
outProvides = _getOutProvideInfo(database=sfDatabase,
schema=sfSchema,
table=dbtable)
deployFlag = all(value for value in outProvides) or not outProvides
# Jenkins shellで提供情報取得用
print(f"---------- deploy_flag:{deployFlag}")
return
# Metrics Existence Check
# If the target exists, skip subsequent processing
existVer = _metricsExistCheck(database=sfDatabase,
schema=sfSchema, table=dbtable)
print(sfDatabase, sfSchema, dbtable)
allApi = _getAllApi(database=sfDatabase, schema=sfSchema)
inMapping = _inMappingExistCheck(database=sfDatabase,
schema=sfSchema, table=dbtable)
outputInfo = _getOutputInfo(database=sfDatabase,
schema=sfSchema, table=dbtable)
# get target version
with open(verPath) as f:
v = f.read()
v = v.replace('\n', ' ').replace(' ', "")
pat = '"(.*)"'
m = regex.search(pat, v)
tarVer = m.group(1)
# 対象メトリクスのIN,OUT提供情報取得
providsInfo = _getInOutProvideInfo(
dbtable, sfDatabase, sfSchema, project, allApi)
# check
updPat = 0
if (existVer):
if (existVer == tarVer):
print("---------- metrics data already exists")
return
else:
ev = existVer.split(".")
tv = tarVer.split(".")
if (int(ev[0]) < int(tv[0])):
print("---------- update in/out")
updPat = 1
elif (int(ev[1]) < int(tv[1])):
print("---------- update in")
updPat = 2
else:
print("---------- metrics data already exists")
return
print(f"--- output:{dbtable}")
if (len(dbtblList) == 1):
# 出力テーブルが単一の場合
if dbtable in providsInfo["outProvides"].keys():
showColFlg = providsInfo["outProvides"][dbtable]
else:
# 既存Out情報に含まれない場合(新規作成)
showColFlg = True
res = _getColType(table=dbtable,
database=sfDatabase,
schema=sfSchema,
showColFlg=showColFlg,
refType="metrics")
sqlOut = _makeOutputList(database=sfDatabase,
schema=sfSchema,
table=dbtable,
outList=res,
inMapping=inMapping,
outputInfo=outputInfo)
else:
# 出力テーブルが複数の場合
sqlOut = []
for tbl in dbtblList:
if tbl in providsInfo["outProvides"].keys():
showColFlg = providsInfo["outProvides"][tbl]
else:
# 既存Out情報に含まれない場合(新規作成)
showColFlg = True
res = _getColType(table=tbl,
database=sfDatabase,
schema=sfSchema,
showColFlg=showColFlg,
refType="metrics")
tmpOut = _makeOutputList(database=sfDatabase,
schema=sfSchema,
table=tbl,
outList=res,
inMapping=inMapping,
outputInfo=outputInfo)
sqlOut = sqlOut+tmpOut
print("---------- input")
pat = r"(readData).*?(?<rec>\((?:[^\(\)]+|(?&rec))*\))"
mi = regex.finditer(pat, t)
inputList = []
refInfo = {}
# Get columns of tables up to the first level and joined tables
for m in mi:
mm = m.group()
inSfSchema = (regex.search(r"sfSchema=\"(.*?)\"", mm).group(1))
if inSfDatabase := regex.search(r"sfDatabase=\"(.*?)\"", mm):
inSfDatabase = inSfDatabase.group(1)
query = regex.search(r"query=f?\"\"\"(.*?)\"\"\"", mm).group(1)
query = query.replace(
" ON", " ON ").replace(
"AS ", " AS ").replace(
"ORDER BY", " ORDER BY").replace(
"GROUP BY", " GROUP BY")
tmp = json.loads(json.dumps(parse(query)))
resAnalysis = _sqlAnalysis(tmp)
for r in resAnalysis:
inputList.append(r)
for input in inputList:
keys = list(input.keys())
for key in keys:
if key not in refInfo.keys():
refInfo[key] = {"database": inSfDatabase,
"schema": inSfSchema}
# Get column type
apiList = []
for input in inputList:
keys = list(input.keys())
for key in keys:
inSfDatabase = refInfo[key]["database"]
inSfSchema = refInfo[key]["schema"]
if key in providsInfo["inProvides"].keys():
showColFlg = providsInfo["inProvides"][key]
else:
# 既存In情報に含まれない場合(Inテーブルを変更した場合)
showColFlg = True
if (inSfDatabase):
if key.replace(f"{project}_".upper(), '').lower() in allApi: # noqa
apiList.append(key.replace(f"{project}_".upper(), '').lower()) # noqa
refType = "api"
else:
apiList.append("")
refType = "metrics"
if showColFlg:
resTypes = _getColType(table=key,
database=inSfDatabase,
schema=inSfSchema,
showColFlg=showColFlg,
refType=refType)
else:
# 未デプロイのINについては画面から項目が変わることはないため、既存データを取得し設定する
resTypes = _getColType(table=key,
database=sfDatabase,
schema=sfSchema,
showColFlg=showColFlg,
refType=refType)
else:
if key.replace(f"{project}_".upper(), '').lower() in allApi: # noqa
refType = "api"
else:
refType = "metrics"
if showColFlg:
resTypes = _getColType(table=key,
database=inputDb,
showColFlg=showColFlg,
refType=refType)
else:
# 未デプロイのINについては画面から項目が変わることはないため、既存データを取得し設定する
resTypes = _getColType(table=key,
database=sfDatabase,
schema=sfSchema,
showColFlg=showColFlg,
refType=refType)
apiList.append("")
for add in input.get(key):
add["type"] = resTypes.get(add["name"])
if (inSfDatabase):
add["db"] = inSfDatabase
add["schema"] = inSfSchema
else:
add["db"] = inputDb
add["schema"] = "PUBLIC"
print(f"Api: {apiList}")
apiMetaInfo = _getApiMetaInfo(database=sfDatabase,
schema=sfSchema, table=apiList)
sqlIn = _makeInputList(inputList=inputList, apiMetaInfo=apiMetaInfo,
inMapping=inMapping, project=project)
for i, value in enumerate(sqlIn):
if value["table"].replace(f"{project}_".upper(), '').lower() in allApi: # noqa
sqlIn[i]["table"] = value["table"].replace(f"{project}_".upper(), '') # noqa
else:
# メトリクスのINはアッパーに変換
sqlIn[i]["table"] = value["table"].replace(f"{project}_".upper(), '').upper() # noqa
if (1 == updPat):
_updateMetaDataInOut(sfDatabase, sfSchema, sqlIn,
sqlOut, apiList, dbtable, tarVer)
_updateProvideData(sfDatabase, sfSchema, sqlIn, sqlOut, dbtable)
elif (2 == updPat):
_updateMetaDataInput(sfDatabase, sfSchema, sqlIn,
apiList, dbtable, tarVer)
_updateProvideData(sfDatabase, sfSchema, sqlIn, sqlOut, dbtable)
else:
_insertMetaData(sfDatabase, sfSchema, sqlIn,
sqlOut, apiList, dbtable, tarVer)
_insertProvideData(sfDatabase, sfSchema, sqlIn, sqlOut, dbtable)
print("---------- finish!")
except Exception as e:
print(f"processing analysys error:{e}")
raise e
# Check if metrics information exists
def _metricsExistCheck(database: str, schema: str, table: str):
try:
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
VERSION
FROM
CATALOG_META_INFO
WHERE
PHYSICAL_NAME = '{table}'
LIMIT 1;
'''
res = obj.execute(query)
data = None
for i in res:
data = i[0]
return data
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Check if inMapping exists
def _inMappingExistCheck(database: str, schema: str, table: str):
try:
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
IN_MAPPING
FROM
CATALOG_META_INFO
WHERE
PHYSICAL_NAME = '{table}'
LIMIT 1;
'''
res = obj.execute(query)
data = None
for i in res:
data = i[0]
return data
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Get inputInfo
def _getApiName(database: str, schema: str, table: str):
try:
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
API_NAME
FROM
CATALOG_META_INFO
WHERE
PHYSICAL_NAME = '{table}'
LIMIT 1;
'''
res = obj.execute(query)
data = None
for i in res:
data = i[0]
return data
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Get all inputInfo
def _getAllApi(database: str, schema: str):
try:
obj = SnowflakeAccessor(database=database, schema=schema)
query = '''
SELECT
API_PHYSICAL_NAME
FROM
API_META_INFO;
'''
res = obj.execute(query)
data = []
for i in res:
data.append(i[0].lower())
return data
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Get inputInfo
def _getApiMetaInfo(database: str, schema: str, table: str):
try:
result = []
obj = SnowflakeAccessor(database=database, schema=schema)
for t in table:
query = f'''
SELECT
API_LINKAGE_ITEMS
FROM
API_META_INFO
WHERE
API_PHYSICAL_NAME = '{t}'
LIMIT 1;
'''
res = obj.execute(query)
data = {}
for i in res:
data["table"] = t
data["value"] = i[0]
result.append(data)
return result
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Get outputInfo
def _getOutputInfo(database: str, schema: str, table: str):
try:
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
OUT_INFO
FROM
CATALOG_META_INFO
WHERE
PHYSICAL_NAME = '{table}'
LIMIT 1;
'''
res = obj.execute(query)
data = None
for i in res:
data = i[0]
return data
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Analysis of SQL passed in JSON format
def _sqlAnalysis(query: json):
try:
colList = []
tableList = []
tableName = {}
# Get column and source table aliases
select = query["select"]
table = query["from"]
if "with" in query:
withQs = query["with"]
else:
withQs = []
if "value" in select:
if "distinct" in select["value"]:
# SELECTにdistinctが含まれる場合の処理
for colTmp in select["value"]["distinct"]:
items = {}
val = colTmp["value"]
if type(val) is str:
splt = val.split('.')
if val != splt[0]:
items["value"] = splt[1]
items["name"] = splt[0]
else:
items["value"] = val
colList.append(items)
elif "count" in select["value"]:
# SELECTにcountが含まれる場合の処理
for colTmp in select["value"]["count"]:
items = {}
# val = colTmp["value"]
if type(colTmp) is str:
splt = colTmp.split('.')
if colTmp != splt[0]:
items["value"] = splt[1]
items["name"] = splt[0]
else:
items["value"] = colTmp
colList.append(items)
else:
for col in select:
items = {}
val = col["value"]
if type(val) is str:
splt = val.split('.')
if val != splt[0]:
items["value"] = splt[1]
items["name"] = splt[0]
else:
items["value"] = val
colList.append(items)
# Get table name and alias
if type(table) is str:
tableName["table"] = table
tableList.append(tableName)
else:
if type(table[0]) is str:
tableName["table"] = table[0]
tableList.append(tableName)
else:
tableName["table"] = table[0]["value"]
tableName["name"] = table[0]["name"]
tableList.append(tableName)
# Get inner joined table name
for join in table[1:]:
if "inner join" in join:
if type(join["inner join"]) is str:
if "using" in join:
for withQ in withQs:
if withQ["name"] == join["inner join"]:
withTable = withQ["value"]["from"]
tableName = {}
tableName["table"] = withQ["value"]["from"]
if tableName not in tableList:
tableList.append(tableName)
else:
tableName = {}
tableName["table"] = join["inner join"]
tableList.append(tableName)
else:
if type(join["inner join"]["value"]) is str:
tableName = {}
tableName["table"] = join["inner join"]["value"]
tableName["name"] = join["inner join"]["name"]
tableList.append(tableName)
# get using info
if "using" in join:
usingCols = join["using"]
if type(usingCols) is str:
for withQ in withQs:
if withQ["value"]["from"] == withTable:
for withSelectCol in withQ["value"]["select"]: # noqa
if "name" in withSelectCol:
if withSelectCol["name"] == usingCols: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable
colList.append(items)
else:
if withSelectCol["value"] == usingCols: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable
colList.append(items)
else:
for usingCol in usingCols:
for withQ in withQs:
if withQ["value"]["from"] == withTable:
for withSelectCol in withQ["value"]["select"]: # noqa
if "name" in withSelectCol:
if withSelectCol["name"] == usingCol: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(
items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
if withSelectCol["value"] == usingCol: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(
items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
# get on info
if "and" in join["on"]:
ands = join["on"]["and"]
for a in ands:
# left
items = {}
left = a["eq"][0].split(".")
items["value"] = left[1]
items["name"] = left[0]
colList.append(items)
# right
items = {}
if "to_timestamp" in a["eq"][1]:
right = a["eq"][1]["to_timestamp"].split(
".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
right = a["eq"][1].split(".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
ands = join["on"]
# left
items = {}
left = ands["eq"][0].split(".")
items["value"] = left[1]
items["name"] = left[0]
colList.append(items)
# right
items = {}
if "to_timestamp" in ands["eq"][1]:
right = ands["eq"][1]["to_timestamp"].split(
".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
right = ands["eq"][1].split(".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
elif "left join" in join:
if type(join["left join"]) is str:
if "using" in join:
for withQ in withQs:
if withQ["name"] == join["left join"]:
withTable = withQ["value"]["from"]
tableName = {}
tableName["table"] = withQ["value"]["from"]
if tableName not in tableList:
tableList.append(tableName)
else:
tableName = {}
tableName["table"] = join["left join"]
tableList.append(tableName)
else:
if type(join["left join"]["value"]) is str:
tableName = {}
tableName["table"] = join["left join"]["value"]
tableName["name"] = join["left join"]["name"]
tableList.append(tableName)
# get using info
if "using" in join:
usingCols = join["using"]
if type(usingCols) is str:
for withQ in withQs:
if withQ["value"]["from"] == withTable:
for withSelectCol in withQ["value"]["select"]: # noqa
if "name" in withSelectCol:
if withSelectCol["name"] == usingCols: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable
colList.append(items)
else:
if withSelectCol["value"] == usingCols: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable
colList.append(items)
else:
for usingCol in usingCols:
for withQ in withQs:
if withQ["value"]["from"] == withTable:
for withSelectCol in withQ["value"]["select"]: # noqa
if "name" in withSelectCol:
if withSelectCol["name"] == usingCol: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(
items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
if withSelectCol["value"] == usingCol: # noqa
items = {}
if "max" in withSelectCol["value"]: # noqa
if "to_timestamp" in withSelectCol["value"]["max"]: # noqa
items["value"] = withSelectCol["value"]["max"]["to_timestamp"] # noqa
items["name"] = withTable # noqa
colList.append(
items)
else:
items["value"] = withSelectCol["value"] # noqa
items["name"] = withTable # noqa
colList.append(items)
else:
# get on info
if "and" in join["on"]:
ands = join["on"]["and"]
for a in ands:
# left
items = {}
left = a["eq"][0].split(".")
items["value"] = left[1]
items["name"] = left[0]
colList.append(items)
# right
items = {}
if "to_timestamp" in a["eq"][1]:
right = a["eq"][1]["to_timestamp"].split(
".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
right = a["eq"][1].split(".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
ands = join["on"]
# left
items = {}
left = ands["eq"][0].split(".")
items["value"] = left[1]
items["name"] = left[0]
colList.append(items)
# right
items = {}
if "to_timestamp" in ands["eq"][1]:
right = ands["eq"][1]["to_timestamp"].split(
".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
else:
right = ands["eq"][1].split(".")
items["value"] = right[1]
items["name"] = right[0]
colList.append(items)
# de-duplication
colList = [dict(s) for s in set(frozenset(d.items()) for d in colList)]
# Join tables and columns with aliases
resultL = []
for t in tableList:
resCol = None
reData = []
resultD = {}
if "name" in t:
resCol = [d.get('value') for d in colList if d.get('name') == t["name"]] # noqa
for r in resCol:
tmpCol = {}
tmpCol["name"] = r
reData.append(tmpCol)
resultD[t["table"]] = reData
resultL.append(resultD)
else:
for r in [d.get('value') for d in colList]:
tmpCol = {}
tmpCol["name"] = r
reData.append(tmpCol)
resultD[t["table"]] = reData
resultL.append(resultD)
return resultL
except Exception as e:
print(f"processing analysys error:{e}")
raise e
# 対象メトリクスのIN,OUT提供情報取得
def _getInOutProvideInfo(metrics: str,
database: str,
schema: str,
project: str,
allApi: list):
try:
retProvidsInfo = {"inProvides": {}, "outProvides": {}}
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
A.IN_PROVIDE,
A.OUT_PROVIDE,
FROM
CATALOG_PROVIDE_INFO A
INNER JOIN
CATALOG_META_INFO B
ON
A.ID = B.ID
WHERE
B.PHYSICAL_NAME = '{metrics}';
'''
result = obj.execute(query)
for inProvides, outProvides in result:
for inProvide in json.loads(inProvides):
if "startpack" == project:
retProvidsInfo["inProvides"][inProvide["table"].upper()] = inProvide["provide"] # noqa
else:
if inProvide['table'].upper().replace(f"{project}_".upper(), '').lower() in allApi: # noqa
# データセットのIN情報には先頭に個社を付ける
retProvidsInfo["inProvides"][f"{project}_{inProvide['table']}".upper()] = inProvide["provide"] # noqa
else:
# メトリクスのIN情報には先頭に個社を付けない
retProvidsInfo["inProvides"][inProvide["table"].upper()] = inProvide["provide"] # noqa
for outProvide in json.loads(outProvides):
retProvidsInfo["outProvides"][outProvide["table"].upper()] = outProvide["provide"] # noqa
return retProvidsInfo
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# 対象メトリクスのOUT提供情報取得
def _getOutProvideInfo(database: str,
schema: str,
table: str):
try:
retOutProvids = []
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
A.OUT_PROVIDE,
FROM
CATALOG_PROVIDE_INFO A
INNER JOIN
CATALOG_META_INFO B
ON
A.ID = B.ID
WHERE
B.PHYSICAL_NAME = '{table}';
'''
result = obj.execute(query)
for tmp in result:
for outProvides in tmp:
for outProvide in json.loads(outProvides):
retOutProvids.append(outProvide["provide"])
return retOutProvids
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Returns the specified column type based on table information
def _getColType(table: str,
database: str,
showColFlg: bool,
refType: str,
schema=""):
try:
if (schema):
obj = SnowflakeAccessor(database=database, schema=schema)
else:
obj = SnowflakeAccessor(database=database)
view = {}
if showColFlg:
# デプロイ済、またはメタ情報にない、またはInテーブルを変更した場合のIN,OUTについては実テーブルまたはビューからカラム情報を取得する
result = obj.execute(f"show columns in table {table};")
for r in result:
pri = []
for i in r:
pri.append(i)
colName = pri[2]
colType = json.loads(pri[3])["type"]
view[colName] = colType
return view
else:
# 未デプロイのIN,OUTについては画面から項目が変わることはないため、既存データを取得する
if refType == "api":
query = f'''
SELECT
API_LINKAGE_ITEMS
FROM
API_META_INFO
WHERE
API_PHYSICAL_NAME = '{table}';
'''
result = obj.execute(query)
for r in result:
view[json.loads(r)["PHYSICAL_NAME"]] = json.loads(r)["TYPE"].upper() # noqa
return view
elif refType == "metrics":
query = f'''
SELECT
ITEM.value:column as COLUMNS
FROM
CATALOG_META_INFO,
LATERAL FLATTEN(input => CATALOG_META_INFO.OUT_INFO) AS ITEM
WHERE
ITEM.value:table = '{table}';
''' # noqa
result = obj.execute(query)
for r in result:
for tmp1 in r:
for tmp2 in json.loads(tmp1):
view[tmp2["physicalName"]] = logTypeList[tmp2["type"]] # noqa
return view
else:
raise Exception("Reference type is not specified.")
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Create an In item for the insert SQL
def _makeInputList(inputList: list, apiMetaInfo: list,
inMapping: str, project: str):
try:
In = []
if inMapping is not None:
inMappingStr = json.loads(inMapping)
else:
inMappingStr = []
for i in inputList:
tmp = {}
col = []
for j in i[list(i.keys())[0]]:
for k in apiMetaInfo:
if k["table"].upper() in list(i.keys())[0]:
apiMetaInfoStr = json.loads(k["value"])
for x in apiMetaInfoStr:
if j["name"] == x["PHYSICAL_NAME"]:
c = {}
c["logicalName"] = x["LOGICAL_NAME"]
c["physicalName"] = j["name"]
c["type"] = typeList.get(j["type"], '未定義')
col.append(c)
if inMappingStr:
for m in inMappingStr:
if m["table"].upper() in list(i.keys())[0]:
for n in m["keyOrder"]:
if j["name"] == n["physicalName"]:
c = {}
c["logicalName"] = n["logicalName"]
c["physicalName"] = j["name"]
c["type"] = typeList.get(j["type"], '未定義')
col.append(c)
for n in m["values"]:
if j["name"] == n["inPhysicalName"]:
c = {}
c["logicalName"] = n["inLogicalName"]
c["physicalName"] = j["name"]
c["type"] = typeList.get(j["type"], '未定義')
col.append(c)
tmp["db"] = i[list(i.keys())[0]][0]["db"]
tmp["schema"] = i[list(i.keys())[0]][0]["schema"]
tmp["table"] = list(i.keys())[0].replace(f"{project}_".upper(),
'').lower()
tmp["column"] = get_unique_list(col)
In.append(tmp)
return In
except Exception as e:
print(f"processing analysys error:{e}")
raise e
# Create an Out item for the insert SQL
def _makeOutputList(database: str, schema: str, table: str,
outList: list, inMapping: str, outputInfo: str):
try:
if outputInfo is not None:
outputInfoStr = json.loads(outputInfo)
else:
outputInfoStr = []
if inMapping is not None:
inMappingStr = json.loads(inMapping)
else:
inMappingStr = []
Out = []
if inMappingStr:
tmp = {}
col = []
for i in outputInfoStr:
for j in i["column"]:
for k in list(outList.keys()):
if k == j["physicalName"]:
c = {}
c["logicalName"] = j["logicalName"]
c["physicalName"] = k
c["type"] = typeList.get(outList[k], '未定義')
col.append(c)
for i in inMappingStr:
for j in i["keyOrder"]:
for co in col:
if co["physicalName"] == j["as"]:
col.remove(co)
for k in list(outList.keys()):
if k == j["as"]:
c = {}
c["logicalName"] = j["asLogicalName"]
c["physicalName"] = k
c["type"] = typeList.get(outList[k], '未定義')
col.append(c)
if table == i["table"].upper():
for j in i["values"]:
for k in list(outList.keys()):
if k == j["inPhysicalName"]:
c = {}
c["logicalName"] = j["inLogicalName"]
c["physicalName"] = k
c["type"] = typeList.get(outList[k], '未定義')
col.append(c)
tmp["db"] = database
tmp["schema"] = schema
tmp["table"] = table
tmp["column"] = get_unique_list(col)
Out.append(tmp)
else:
tmp = {}
col = []
for i in outputInfoStr:
for j in i["column"]:
for k in list(outList.keys()):
if k == j["physicalName"]:
c = {}
c["logicalName"] = j["logicalName"]
c["physicalName"] = k
c["type"] = typeList.get(outList[k], '未定義')
col.append(c)
tmp["db"] = database
tmp["schema"] = schema
tmp["table"] = table
tmp["column"] = get_unique_list(col)
Out.append(tmp)
return Out
except Exception as e:
print(f"processing analysys error:{e}")
raise e
def get_unique_list(seq):
try:
seen = []
return [x for x in seq if x not in seen and not seen.append(x)]
except Exception as e:
print(f"processing analysys error:{e}")
raise e
# SQL Insert
def _insertMetaData(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
INSERT INTO CATALOG_META_INFO(
ID,PHYSICAL_NAME,CATEGORY,DESCRIPTION,PROCESSING,
TAGS,GRAPH_TITLE,METRICS_PATTERN,LAST_UPDATE,
VERSION,IN_INFO,CUSTOM_IN,OUT_INFO,API_NAME
)
SELECT
'{metrics}',
'{metrics}',
'<カテゴリ>',
'<概要>',
'<メトリクス処理内容>',
to_variant({['<タグ>']}),
to_variant({['<グラフタイトル>']}),
'<メトリクスパターン>',
'{date}',
'{ver}',
to_variant({In}),
null,
to_variant({Out}),
to_variant({Api})
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# SQL Update In/Out
def _updateMetaDataInOut(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
UPDATE CATALOG_META_INFO SET
LAST_UPDATE='{date}',
VERSION='{ver}',
IN_INFO=to_variant({In}),
OUT_INFO=to_variant({Out}),
API_NAME=to_variant({Api})
WHERE PHYSICAL_NAME = '{metrics}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# SQL Update Input
def _updateMetaDataInput(database: str, schema: str, In: list,
Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
UPDATE CATALOG_META_INFO SET
LAST_UPDATE='{date}',
VERSION='{ver}',
IN_INFO=to_variant({In}),
API_NAME=to_variant({Api})
WHERE PHYSICAL_NAME = '{metrics}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# SQL Insert CATALOG_PROVIDE_INFO
def _insertProvideData(database: str, schema: str,
In: list, Out: list, metrics: str):
try:
# make InputData
InData = []
for item in In:
tmp = {}
tmp["table"] = item["table"]
tmp["provide"] = True
InData.append(tmp)
# make OutputData
OutData = []
for item in Out:
tmp = {}
tmp["table"] = item["table"]
tmp["provide"] = True
OutData.append(tmp)
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
INSERT INTO CATALOG_PROVIDE_INFO(
ID ,IN_PROVIDE, OUT_PROVIDE
)
SELECT
'{metrics}',
to_variant({InData}),
to_variant({OutData})
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# SQL Update CATALOG_PROVIDE_INFO
def _updateProvideData(database: str, schema: str,
In: list, Out: list, metrics: str):
# テーブル名に変更があった場合のみ更新
# データセット項目編集からの場合は未提供のデータセットまたはメトリクスも含まれるため、全て固定で提供済にしてはいけない
# データセット項目編集からの場合はテーブル名の変更はない
# メトリクスデプロイ・改修・コピーの場合は、参照先は全て提供済のため提供済固定で設定してよい
# テーブル更新前情報が取得できなく、差分比較でのテーブル名のみの更新ができない(提供情報を固定で設定するしかない)ためこの判定を行う
try:
obj = SnowflakeAccessor(database=database, schema=schema)
# make InputData
InData = []
for item in In:
tmp = {}
tmp["table"] = item["table"]
tmp["provide"] = True
InData.append(tmp)
# make OutputData
OutData = []
for item in Out:
tmp = {}
tmp["table"] = item["table"]
tmp["provide"] = True
OutData.append(tmp)
# get id & provide info
metricsId = ''
inProvidePrev = [] # 更新前IN提供情報
outProvidePrev = [] # 更新前OUT提供情報
query = f'''
SELECT
A.ID,
A.IN_PROVIDE,
A.OUT_PROVIDE,
FROM
CATALOG_PROVIDE_INFO A
INNER JOIN
CATALOG_META_INFO B
ON
A.ID = B.ID
WHERE
B.PHYSICAL_NAME = '{metrics}';
'''
result = obj.execute(query)
for id, inPrev, outPrev in result:
metricsId = id
inProvidePrev = json.loads(inPrev)
outProvidePrev = json.loads(outPrev)
# inProvidePrev内の各json要素のtableキーのvalueをリストで取得
inProvidePrevTable = [d["table"] for d in inProvidePrev]
# outProvidePrev内の各json要素のtableキーのvalueをリストで取得
outProvidePrevTable = [d["table"] for d in outProvidePrev]
# InData内の各json要素のtableキーのvalueをリストで取得
InDataTable = [d["table"] for d in InData]
# OutData内の各json要素のtableキーのvalueをリストで取得
OutDataTable = [d["table"] for d in OutData]
# InDataとInProvidePrevの差分
inDiff = list(set(InDataTable) - set(inProvidePrevTable))
# OutDataとOutProvidePrevの差分
outDiff = list(set(OutDataTable) - set(outProvidePrevTable))
# 参照テーブルまたはビューに変更があった場合のみ更新
if (inDiff or outDiff):
# update provide
print(f"--- update provide . target metricsId:{metricsId}")
query = f'''
UPDATE
CATALOG_PROVIDE_INFO
SET
IN_PROVIDE = to_variant({InData}),
OUT_PROVIDE = to_variant({OutData})
WHERE ID = '{metricsId}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
if __name__ == "__main__":
args = sys.argv
main(metrics=args[1], env=args[2], project=args[3], deployCheck=args[4])
from tools.arangoDBAccessor import ArangoDBAccessor
# Check if metrics information exists (ArangoDB版)
def _metricsExistCheck(database: str, schema: str, table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN CATALOG_META_INFO
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.version
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Check if inMapping exists (ArangoDB版)
def _inMappingExistCheck(database: str, schema: str, table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN CATALOG_META_INFO
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.in_mapping
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Get outputInfo (ArangoDB版)
def _getOutputInfo(database: str, schema: str, table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN CATALOG_META_INFO
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.out_info
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Get inputInfo (ArangoDB版)
def _getApiMetaInfo(database: str, schema: str, table: str):
try:
obj = ArangoDBAccessor()
result = []
for t in table:
query = '''
FOR doc IN API_META_INFO
FILTER doc.api_physical_name == @t
LIMIT 1
RETURN doc.api_linkage_items
'''
bind_vars = {'t': t}
res = obj.execute(query, vars=bind_vars)
data = {}
for doc in res:
data["table"] = t
data["value"] = doc
result.append(data)
return result
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# 対象メトリクスのIN,OUT提供情報取得 (ArangoDB版)
def _getInOutProvideInfo(metrics: str,
database: str,
schema: str,
project: str,
allApi: list):
try:
retProvidsInfo = {"inProvides": {}, "outProvides": {}}
obj = ArangoDBAccessor()
query = '''
FOR a IN CATALOG_PROVIDE_INFO
FOR b IN CATALOG_META_INFO
FILTER a.id == b.id
FILTER b.physical_name == @metrics
RETURN {
in_provide: a.in_provide,
out_provide: a.out_provide
}
'''
bind_vars = {'metrics': metrics}
result = obj.execute(query, vars=bind_vars)
for doc in result:
inProvides = doc['in_provide']
outProvides = doc['out_provide']
for inProvide in inProvides:
if "startpack" == project:
retProvidsInfo["inProvides"][inProvide["table"].upper()] = inProvide["provide"]
else:
if inProvide['table'].upper().replace(f"{project}_".upper(), '').lower() in allApi:
retProvidsInfo["inProvides"][f"{project}_{inProvide['table']}".upper()] = inProvide["provide"]
else:
retProvidsInfo["inProvides"][inProvide["table"].upper()] = inProvide["provide"]
for outProvide in outProvides:
retProvidsInfo["outProvides"][outProvide["table"].upper()] = outProvide["provide"]
return retProvidsInfo
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# 対象メトリクスのOUT提供情報取得 (ArangoDB版)
def _getOutProvideInfo(database: str,
schema: str,
table: str):
try:
retOutProvids = []
obj = ArangoDBAccessor()
query = '''
FOR a IN CATALOG_PROVIDE_INFO
FOR b IN CATALOG_META_INFO
FILTER a.id == b.id
FILTER b.physical_name == @table
RETURN a.out_provide
'''
bind_vars = {'table': table}
result = obj.execute(query, vars=bind_vars)
for doc in result:
for outProvide in doc:
retOutProvids.append(outProvide["provide"])
return retOutProvids
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
from tools.arangoDBAccessor import ArangoDBAccessor
# Check if metrics information exists (ArangoDB版)
def _metricsExistCheck(table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.version
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Check if inMapping exists (ArangoDB版)
def _inMappingExistCheck(table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.in_mapping
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Get outputInfo (ArangoDB版)
def _getOutputInfo(table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.out_info
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Get inputInfo (ArangoDB版)
def _getApiMetaInfo(table: str):
try:
obj = ArangoDBAccessor()
result = []
for t in table:
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @t
LIMIT 1
RETURN doc.api_linkage_items
'''
bind_vars = {'t': t}
res = obj.execute(query, vars=bind_vars)
data = {}
for doc in res:
data["table"] = t
data["value"] = doc
result.append(data)
return result
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# 対象メトリクスのIN,OUT提供情報取得 (ArangoDB版)
def _getInOutProvideInfo(metrics: str):
try:
retProvidsInfo = {"inProvides": {}, "outProvides": {}}
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @metrics
RETURN {
in_provide: doc.in_info,
out_provide: doc.out_info
}
'''
bind_vars = {'metrics': metrics}
result = obj.execute(query, vars=bind_vars)
for doc in result:
inProvides = doc['in_provide']
outProvides = doc['out_provide']
for inProvide in inProvides:
retProvidsInfo["inProvides"][inProvide["table"].upper()] = inProvide["provide"]
for outProvide in outProvides:
retProvidsInfo["outProvides"][outProvide["table"].upper()] = outProvide["provide"]
return retProvidsInfo
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# 対象メトリクスのOUT提供情報取得 (ArangoDB版)
def _getOutProvideInfo(table: str):
try:
retOutProvids = []
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
RETURN doc.out_info
'''
bind_vars = {'table': table}
result = obj.execute(query, vars=bind_vars)
for doc in result:
for outProvide in doc:
retOutProvids.append(outProvide["provide"])
return retOutProvids
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
# Get API name (ArangoDB版)
def _getApiName(table: str):
try:
obj = ArangoDBAccessor()
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.api_name
'''
bind_vars = {'table': table}
res = obj.execute(query, vars=bind_vars)
data = None
for doc in res:
data = doc
return data
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
def _getOutProvideInfo(database: str,
schema: str,
table: str):
try:
retOutProvids = []
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
SELECT
A.OUT_PROVIDE,
FROM
CATALOG_PROVIDE_INFO A
INNER JOIN
CATALOG_META_INFO B
ON
A.ID = B.ID
WHERE
B.PHYSICAL_NAME = '{table}';
'''
result = obj.execute(query)
for tmp in result:
for outProvides in tmp:
for outProvide in json.loads(outProvides):
retOutProvids.append(outProvide["provide"])
return retOutProvids
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
# Get OUT_PROVIDE information from ArangoDB
def _getOutProvideInfo(table: str):
try:
retOutProvids = []
obj = ArangoDBAccessor() # 使用ArangoDB连接器
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
RETURN doc.provide
'''
bind_vars = {'table': table} # 使用表名作为过滤条件
result = obj.execute(query, vars=bind_vars)
# 提取 provide 信息
for doc in result:
retOutProvids.append(doc) # 直接将 provide 值加入返回列表
return retOutProvids
except Exception as e:
print(f"processing analysis error:{e}")
raise e
finally:
obj.close()
elif refType == "metrics":
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @table
LIMIT 1
RETURN doc.out_info
'''
bind_vars = {'table': table}
result = obj.execute(query, vars=bind_vars)
for doc in result:
for out_info in doc:
for column in out_info["column"]:
view[column["physical_name"]] = logTypeList[column["type"]]
return view
def _insertMetaData(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
INSERT INTO CATALOG_META_INFO(
ID,PHYSICAL_NAME,CATEGORY,DESCRIPTION,PROCESSING,
TAGS,GRAPH_TITLE,METRICS_PATTERN,LAST_UPDATE,
VERSION,IN_INFO,CUSTOM_IN,OUT_INFO,API_NAME
)
SELECT
'{metrics}',
'{metrics}',
'<カテゴリ>',
'<概要>',
'<メトリクス処理内容>',
to_variant({['<タグ>']}),
to_variant({['<グラフタイトル>']}),
'<メトリクスパターン>',
'{date}',
'{ver}',
to_variant({In}),
null,
to_variant({Out}),
to_variant({Api})
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
def _insertMetaData(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = ArangoDBAccessor() # 使用 ArangoDBAccessor
# 构造要插入的数据文档
data = {
"physical_name": metrics,
"logical_name": metrics, # 可根据逻辑调整
"category": "<カテゴリ>", # 这里可以动态传递或更改
"description": "<概要>",
"processing": "<メトリクス処理内容>",
"tags": ['<タグ>'], # 标签字段
"graph_title": ['<グラフタイトル>'], # 图表标题
"metrics_pattern": "<メトリクスパターン>",
"last_update": date,
"version": ver,
"in_info": In, # 传递进来的 IN 信息
"custom_in": None, # 这里保留为 None,如果需要,可以根据情况更改
"out_info": Out, # 传递进来的 OUT 信息
"api_name": Api # 传递的 API 信息
}
# AQL 插入查询
query = '''
INSERT @data INTO metrics_node
'''
bind_vars = {'data': data}
# 执行插入操作
obj.execute(query, vars=bind_vars)
except Exception as e:
print(f"processing analysis error: {e}")
raise e
finally:
obj.close()
def _updateMetaDataInOut(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
UPDATE CATALOG_META_INFO SET
LAST_UPDATE='{date}',
VERSION='{ver}',
IN_INFO=to_variant({In}),
OUT_INFO=to_variant({Out}),
API_NAME=to_variant({Api})
WHERE PHYSICAL_NAME = '{metrics}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
def _updateMetaDataInOut(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
UPDATE CATALOG_META_INFO SET
LAST_UPDATE='{date}',
VERSION='{ver}',
IN_INFO=to_variant({In}),
OUT_INFO=to_variant({Out}),
API_NAME=to_variant({Api})
WHERE PHYSICAL_NAME = '{metrics}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
def _updateMetaDataInOut(database: str, schema: str, In: list,
Out: list, Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = ArangoDBAccessor() # 使用 ArangoDBAccessor
# 构造要更新的数据文档
update_data = {
"last_update": date,
"version": ver,
"in_info": In,
"out_info": Out,
"api_name": Api
}
# AQL 更新查询
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @metrics
UPDATE doc WITH @update_data IN metrics_node
'''
bind_vars = {
'metrics': metrics, # 过滤条件,基于 physical_name
'update_data': update_data # 更新的数据
}
# 执行更新操作
obj.execute(query, vars=bind_vars)
except Exception as e:
print(f"processing analysis error: {e}")
raise e
finally:
obj.close()
def _updateMetaDataInput(database: str, schema: str, In: list,
Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = SnowflakeAccessor(database=database, schema=schema)
query = f'''
UPDATE CATALOG_META_INFO SET
LAST_UPDATE='{date}',
VERSION='{ver}',
IN_INFO=to_variant({In}),
API_NAME=to_variant({Api})
WHERE PHYSICAL_NAME = '{metrics}'
;
'''
obj.execute(query)
except Exception as e:
print(f"processing analysys error:{e}")
raise e
finally:
obj.close()
def _updateMetaDataInput(database: str, schema: str, In: list,
Api: list, metrics: str, ver: str):
try:
date = datetime.today().strftime("%Y-%m-%d")
obj = ArangoDBAccessor() # 使用 ArangoDBAccessor
# 构造要更新的数据文档
update_data = {
"last_update": date,
"version": ver,
"in_info": In,
"api_name": Api
}
# AQL 更新查询
query = '''
FOR doc IN metrics_node
FILTER doc.physical_name == @metrics
UPDATE doc WITH @update_data IN metrics_node
'''
bind_vars = {
'metrics': metrics, # 过滤条件,基于 physical_name
'update_data': update_data # 更新的数据
}
# 执行更新操作
obj.execute(query, vars=bind_vars)
except Exception as e:
print(f"processing analysis error: {e}")
raise e
finally:
obj.close()