Open kirin-ri opened 6 months ago
// 視点フィルター変更時イベント
function changePerspective() {
const vals: string[] = [];
$('input[class="form-check-input"]:checked').each(function () {
vals.push($(this)[0].id);
});
const req = { perspectives: vals };
(async () => {
commonAjax
.axios({ loading: true })
.post('/api/digitalValueTreeInfo', req)
.then((res) => {
setDispElements(res.data.elements);
setElementsStyle(res.data.style);
setRefreshFlg(!refreshFlg);
});
})();
}
// フルスクリーン切替イベントリスナー
document.addEventListener('fullscreenchange', fullscreenchanged);
useEffect(() => {
(async () => {
const vals = [] as any;
const req = { perspectives: vals };
commonAjax
.axios({ loading: true })
.post('/api/digitalValueTreeInfo', req)
.then((res) => {
setDispElements(res.data.elements);
setElementsStyle(res.data.style);
setContentHeight(
window.innerHeight -
document.getElementsByClassName('main-header')[0].clientHeight -
document.getElementsByClassName('page-cover')[0].clientHeight -
document.getElementsByClassName('content-header')[0]
.clientHeight -
42,
);
setRefreshFlg(!refreshFlg);
});
})();
}, []);
| A | 2024-05-01 04:12:34 | 2024-05-01 05:45:12| 1.5444 |
| A | 2024-05-02 10:00:16 | 2024-05-02 11:30:20| 1.5028 |
| A | 2024-05-03 22:00:10 | 2024-05-03 23:59:59| 1.9999 |
| A | 2024-05-04 00:00:00 | 2024-05-04 01:45:32| 1.7589 |
| B | 2024-05-05 08:45:00 | 2024-05-05 12:00:00| 3.2500 |
| B | 2024-05-06 14:20:34 | 2024-05-06 16:25:10| 2.0764 |
| B | 2024-05-07 19:00:45 | 2024-05-07 21:10:20| 2.1586 |
| C | 2024-05-08 04:30:12 | 2024-05-08 06:25:34| 1.9256 |
| C | 2024-05-09 11:10:22 | 2024-05-09 13:50:16| 2.6644 |
| C | 2024-05-10 23:10:18 | 2024-05-10 23:59:59| 0.8314 |
| C | 2024-05-11 00:00:00 | 2024-05-11 02:00:12| 2.0022 |
| A | 2024-05-12 05:12:24 | 2024-05-12 08:25:37| 3.2222 |
| B | 2024-05-13 13:34:45 | 2024-05-13 15:45:58| 2.1842 |
| B | 2024-05-14 18:14:12 | 2024-05-14 20:44:29| 2.5047 |
| C | 2024-05-15 07:25:36 | 2024-05-15 09:35:42| 2.1683
設備A | 2024-05-01 04:12:34 | 2024-05-01 05:45:12 | 1.5444 設備A | 2024-05-01 10:00:16 | 2024-05-01 11:30:20 | 1.5028 設備A | 2024-05-03 22:00:10 | 2024-05-03 23:59:59 | 1.9999 設備A | 2024-05-04 00:00:00 | 2024-05-04 01:45:32 | 1.7589 設備A | 2024-05-12 05:12:24 | 2024-05-12 08:25:37 | 3.2222 設備B | 2024-05-05 08:45:00 | 2024-05-05 12:00:00 | 3.25 設備B | 2024-05-06 14:20:34 | 2024-05-06 16:25:10 | 2.0764 設備B | 2024-05-07 19:00:45 | 2024-05-07 21:10:20 | 2.1586 設備B | 2024-05-13 13:34:45 | 2024-05-13 15:45:58 | 2.1842 設備B | 2024-05-13 18:14:12 | 2024-05-13 20:44:29 | 2.5047 設備C | 2024-05-08 04:30:12 | 2024-05-08 06:25:34 | 1.9256 設備C | 2024-05-09 11:10:22 | 2024-05-09 13:50:16 | 2.6644 設備C | 2024-05-10 23:10:18 | 2024-05-10 23:59:59 | 0.8314 設備C | 2024-05-11 00:00:00 | 2024-05-11 02:00:12 | 2.0022 設備C | 2024-05-15 07:25:36 | 2024-05-15 09:35:42 | 2.1683
設備A | 2024-05-01 05:45:13 設備A | 2024-05-12 08:25:38 設備B | 2024-05-05 12:00:01 設備C | 2024-05-11 02:00:13
['A', '2024-05-01 04:12:34', '2024-05-01 05:45:12', 1.5444],
['A', '2024-05-01 10:00:16', '2024-05-01 11:30:20', 1.5028],
['A', '2024-05-03 22:00:10', '2024-05-03 23:59:59', 1.9999],
['A', '2024-05-04 00:00:00', '2024-05-04 01:45:32', 1.7589],
['A', '2024-05-12 05:12:24', '2024-05-12 08:25:37', 3.2222],
['B', '2024-05-05 08:45:00', '2024-05-05 12:00:00', 3.25],
['B', '2024-05-06 14:20:34', '2024-05-06 16:25:10', 2.0764],
['B', '2024-05-07 19:00:45', '2024-05-07 21:10:20', 2.1586],
['B', '2024-05-13 13:34:45', '2024-05-13 15:45:58', 2.1842],
['B', '2024-05-13 18:14:12', '2024-05-13 20:44:29', 2.5047],
['C', '2024-05-08 04:30:12', '2024-05-08 06:25:34', 1.9256],
['C', '2024-05-09 11:10:22', '2024-05-09 13:50:16', 2.6644],
['C', '2024-05-10 23:10:18', '2024-05-10 23:59:59', 0.8314],
['C', '2024-05-11 00:00:00', '2024-05-11 02:00:12', 2.0022],
['C', '2024-05-15 07:25:36', '2024-05-15 09:35:42', 2.1683]
]
fault_data = [ ['A', '2024-05-01 05:45:13'], ['A', '2024-05-12 08:25:38'], ['B', '2024-05-05 12:00:01'], ['C', '2024-05-11 02:00:13'] ]
^[0-9]{4}-[0-9]{2}-[0-9]{2}$
df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE").agg( F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM") )
F.to_date(F.from_utc_timestamp(F.col("OPERATION_END_TIME"), "Asia/Tokyo"))
df_input_1 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
EQUIPMENT_ID,
OPERATION_END_DATE,
OPERATION_START_DATE,
OPERATION_TIME
FROM
ECM_OPERATION_TIME
WHERE
TO_TIMESTAMP(OPERATION_END_DATE) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
OPERATION_END_DATE != ''
AND
OPERATION_END_DATE IS NOT NULL
""",
)
display(df_input_1)
# 廃品数
df_input_2 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
BREAKDOWN_TIME,
EQUIPMENT_ID
FROM
ECM_BREAKDOWN_COUNT
WHERE
TO_TIMESTAMP(BREAKDOWN_TIME) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
BREAKDOWN_TIME != ''
AND
BREAKDOWN_TIME IS NOT NULL
""",
)
display(df_input_2)
# データ加工
# 生産数テーブルの製品ID、生産開始・終了日時が同一の生産数を合計
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE",F.to_date(F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo")))
df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE").agg(
F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM")
)
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE_UTC", F.to_date("OPERATION_END_DATE"))
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo")) df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.to_date(F.col("OPERATION_END_DATE")))
BREAKDOWN_TIME EQUIPMENT_ID 2024-05-01T05:45:13Z 設備A 2024-05-01T09:48:24Z 設備A 2024-05-12T08:25:38Z 設備A 2024-05-05T12:00:01Z 設備B 2024-05-11T02:00:13Z 設備C
df_faults = df_faults.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
df_faults = df_faults.withColumn("BREAKDOWN_DATE", F.to_date(F.col("BREAKDOWN_TIME")))
df_fault_counts = df_faults.groupBy("EQUIPMENT_ID", "BREAKDOWN_DATE").count().withColumnRenamed("count", "FAULT_COUNT")
df_combined = df_aggregated.join(df_fault_counts, (df_aggregated.EQUIPMENT_ID == df_fault_counts.EQUIPMENT_ID) & (df_aggregated.OPERATION_END_DATE_UTC == df_fault_counts.BREAKDOWN_DATE), "left")
df_combined = df_combined.withColumn("MTBF", F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT"))
df_combined = df_combined.fillna({"FAULT_COUNT": 0, "MTBF": None})
df_combined_result = df_combined.select("EQUIPMENT_ID", "OPERATION_END_DATE_UTC", "OPERATION_TIME_SUM", "FAULT_COUNT", "MTBF") df_combined_result.show(truncate=False)
df_combined = df_combined.withColumn("MTBF", F.when(F.col("FAULT_COUNT") == 0, None).otherwise(F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT")))
# 显示结果
df_combined_result = df_combined.select("EQUIPMENT_ID", "OPERATION_END_DATE_UTC", "OPERATION_TIME_SUM", "FAULT_COUNT", "MTBF")
AnalysisException: Reference 'EQUIPMENT_ID' is ambiguous, could be: EQUIPMENT_ID, EQUIPMENT_ID.
df_combined_result = df_combined.select( F.col("a.EQUIPMENT_ID").alias("EQUIPMENT_ID"), F.col("a.OPERATION_END_DATE_UTC").alias("OPERATION_END_DATE_UTC"), F.col("a.OPERATION_TIME_SUM").alias("OPERATION_TIME_SUM"), F.col("b.FAULT_COUNT").alias("FAULT_COUNT"), F.col("MTBF").alias("MTBF") )``
df_aggregated["EQUIPMENT_ID"],
df_aggregated["OPERATION_END_DATE_UTC"],
df_aggregated["OPERATION_TIME_SUM"],
df_fault_counts["FAULT_COUNT"],
df_combined["MTBF"]
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE_RESULT", F.to_date("OPERATION_END_DATE")) df_input_1 = df_input_1.withColumn("OPERATION_END_DATE",F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo")) df_input_1 = df_input_1.withColumn("OPERATION_END_DATE",F.to_date(F.col("OPERATION_END_DATE"))) df_input_1.select("EQUIPMENT_ID","OPERATION_END_DATE_RESULT").show() df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE_RESULT").agg( F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM") ) display(df_input_1)
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo")) df_input_2 = df_input_2.withColumn("BREAKDOWN_DATE", F.to_date(F.col("BREAKDOWN_TIME"))) df_input_2 = df_input_2.groupBy("EQUIPMENT_ID", "BREAKDOWN_DATE").count().withColumnRenamed("count", "FAULT_COUNT") df_combined = df_input_1.join(df_input_2, (df_input_1.EQUIPMENT_ID == df_input_2.EQUIPMENT_ID) & (df_input_1.OPERATION_END_DATE_RESULT == df_input_2.BREAKDOWN_DATE), "left") df_combined = df_combined.fillna({"FAULT_COUNT": 0}) df_combined = df_combined.withColumn("MTBF", F.when(F.col("FAULT_COUNT") == 0, None).otherwise(F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT"))) df_combined_result = df_combined.select(df_input_1["EQUIPMENT_ID"], df_input_1["OPERATION_END_DATE_RESULT"], df_input_1["OPERATION_TIME_SUM"], df_input_2["FAULT_COUNT"], df_combined["MTBF"]) display(df_combined_result)
AnalysisException: Resolved attribute(s) FAULT_COUNT#2849L missing from EQUIPMENT_ID#2781,OPERATION_END_DATE_RESULT#2793,OPERATION_TIME_SUM#2830,EQUIPMENT_ID#2790,BREAKDOWN_DATE#2837,FAULT_COUNT#2871L,MTBF#2878 in operator !Project [EQUIPMENT_ID#2781, OPERATION_END_DATE_RESULT#2793, OPERATION_TIME_SUM#2830, FAULT_COUNT#2849L, MTBF#2878]. Attribute(s) with the same name appear in the operation: FAULT_COUNT. Please check if the right attribute(s) are used.;
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE_RESULT", F.to_date("OPERATION_END_DATE")) df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo")) df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.to_date(F.col("OPERATION_END_DATE")))
df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE_RESULT").agg( F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM") )
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo")) df_input_2 = df_input_2.withColumn("BREAKDOWN_DATE", F.to_date(F.col("BREAKDOWN_TIME")))
df_input_2 = df_input_2.groupBy("EQUIPMENT_ID", "BREAKDOWN_DATE").count().withColumnRenamed("count", "FAULT_COUNT")
df_combined = df_input_1.join(df_input_2, (df_input_1["EQUIPMENT_ID"] == df_input_2["EQUIPMENT_ID"]) & (df_input_1["OPERATION_END_DATE_RESULT"] == df_input_2["BREAKDOWN_DATE"]), "left" )
df_combined = df_combined.fillna({"FAULT_COUNT": 0})
df_combined = df_combined.withColumn("MTBF", F.when(F.col("FAULT_COUNT") == 0, None).otherwise(F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT")))
df_combined_result = df_combined.select( df_input_1["EQUIPMENT_ID"], df_input_1["OPERATION_END_DATE_RESULT"], df_input_1["OPERATION_TIME_SUM"], df_combined["FAULT_COUNT"], df_combined["MTBF"] )
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE_RESULT", F.to_date("OPERATION_END_DATE"))
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo"))
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.to_date(F.col("OPERATION_END_DATE")))
df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE_RESULT").agg(
F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM")
)
# 处理故障数据
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
df_input_2 = df_input_2.withColumn("BREAKDOWN_DATE", F.to_date(F.col("BREAKDOWN_TIME")))
df_input_2 = df_input_2.groupBy("EQUIPMENT_ID", "BREAKDOWN_DATE").count().withColumnRenamed("count", "FAULT_COUNT")
# 合并运行时间和故障数据
df_combined = df_input_1.join(df_input_2,
(df_input_1["EQUIPMENT_ID"] == df_input_2["EQUIPMENT_ID"]) &
(df_input_1["OPERATION_END_DATE_RESULT"] == df_input_2["BREAKDOWN_DATE"]),
"left"
)
# 处理故障次数为0的情况
df_combined = df_combined.fillna({"FAULT_COUNT": 0})
# 计算MTBF
df_combined = df_combined.withColumn("MTBF", F.when(F.col("FAULT_COUNT") == 0, None).otherwise(F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT")))
# 选择最终列并显示结果
df_combined_result = df_combined.select(
df_input_1["EQUIPMENT_ID"],
df_input_1["OPERATION_END_DATE_RESULT"],
df_input_1["OPERATION_TIME_SUM"],
df_combined["FAULT_COUNT"],
df_combined["MTBF"]
)
import datetime
import sys
# 共通機能のパッケージimport
import d2s
# from quality_scrapRate.common import Task # 追加
from pyspark.sql import functions as F # 静的解析が通らないため修正、以降これを使用する場合はF.をつけること
fromPeriod = '1900-01-01 00:00:00'
toPeriod = '2262-04-11 00:00:00'
# # メトリクス名でクラスを追加
# class quality_scrapRate(Task):
# def launch(self):
# パラメータ取得(パラメータが渡されてこない場合(指定なしの場合)は、全件対象)
# args = sys.argv
# try:
# fromPeriod = args[1]
# except: # noqaE722
# fromPeriod = '1900-01-01 00:00:00'
# try:
# toPeriod = args[2]
# except: # noqaE722
# toPeriod = '2262-04-11 00:00:00'
# print(f"fromPeriod-----------------{fromPeriod}")
# print(f"toPeriod-----------------{toPeriod}")
# 基準日算出
# today = datetime.datetime.today()
# thismonth = datetime.datetime(today.year, today.month, 1)
# if len(toPeriod) > 0:
enddate = datetime.datetime.strptime(toPeriod, '%Y-%m-%d %H:%M:%S')
# else:
# # 前月末日の値を出す
# enddate = thismonth + datetime.timedelta(days=-1)
# if len(fromPeriod) > 0:
refdate = datetime.datetime.strptime(fromPeriod, '%Y-%m-%d %H:%M:%S')
# else:
# refdate = datetime.datetime(enddate.year, enddate.month, 1)
# 前処理
# データ読み込み
# 設備稼働時間
df_input_1 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
EQUIPMENT_ID,
OPERATION_END_DATE,
OPERATION_START_DATE,
OPERATION_TIME
FROM
ECM_OPERATION_TIME
WHERE
TO_TIMESTAMP(OPERATION_END_DATE) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
OPERATION_END_DATE != ''
AND
OPERATION_END_DATE IS NOT NULL
""",
)
# 故障回数
df_input_2 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
BREAKDOWN_TIME,
EQUIPMENT_ID
FROM
ECM_BREAKDOWN_COUNT
WHERE
TO_TIMESTAMP(BREAKDOWN_TIME) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
BREAKDOWN_TIME != ''
AND
BREAKDOWN_TIME IS NOT NULL
""",
)
# データ加工
# 設備稼働時間テーブルの設備ID、稼働終了日時を合計
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE_RESULT", F.to_date("OPERATION_END_DATE"))
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo"))
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.to_date(F.col("OPERATION_END_DATE")))
df_input_1 = df_input_1.groupBy("EQUIPMENT_ID", "OPERATION_END_DATE_RESULT").agg(
F.sum("OPERATION_TIME").alias("OPERATION_TIME_SUM")
)
# 处理故障数据
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
df_input_2 = df_input_2.withColumn("BREAKDOWN_DATE", F.to_date(F.col("BREAKDOWN_TIME")))
df_input_2 = df_input_2.groupBy("EQUIPMENT_ID", "BREAKDOWN_DATE").count().withColumnRenamed("count", "FAULT_COUNT")
# 合并运行时间和故障数据
df_combined = df_input_1.join(df_input_2,
(df_input_1["EQUIPMENT_ID"] == df_input_2["EQUIPMENT_ID"]) &
(df_input_1["OPERATION_END_DATE_RESULT"] == df_input_2["BREAKDOWN_DATE"]),
"left"
)
# 处理故障次数为0的情况
df_combined = df_combined.fillna({"FAULT_COUNT": 0})
# 计算MTBF
df_combined = df_combined.withColumn("MTBF", F.when(F.col("FAULT_COUNT") == 0, None).otherwise(F.col("OPERATION_TIME_SUM") / F.col("FAULT_COUNT")))
# 选择最终列并显示结果
df_combined_result = df_combined.select(
df_input_1["EQUIPMENT_ID"],
df_input_1["OPERATION_END_DATE_RESULT"],
df_input_1["OPERATION_TIME_SUM"],
df_combined["FAULT_COUNT"],
df_combined["MTBF"]
)
display(df_combined_result)
# 並び替え
df = df.sort("PRODUCT_ID", "PRODUCTION_START_DATE", "PRODUCTION_END_DATE")
# Anaplan用のユニークキー付与
df = df.withColumn("UID",
F.concat_ws("_", "PRODUCT_ID", "PRODUCTION_START_DATE",
"PRODUCTION_END_DATE", "EXEC_DATE"))
display(df)
# # データ書き込み
# # saveModeは Overwrite や Append 等の指定が可能
# d2s.writeData(
# df,
# self.dbutils,
# sfSchema="IND_ECM_DEV_SCHEMA",
# sfDatabase="IND_ECM_DEV_DB",
# dbtable="QUALITY_SCRAPRATE",
# saveMode="Append",
# )
# # 追加
# def entrypoint(): # pragma: no cover
# task = quality_scrapRate() # メトリクス名
# task.launch()
# # 追加
# if __name__ == '__main__':
# entrypoint()
df_combined = df_combined.withColumn("FROM_PERIOD", F.lit(fromPeriod))
df_combined = df_combined.withColumn("TO_PERIOD", F.lit(toPeriod))
df_combined = df_combined.withColumn("TODAY", F.lit(today))
df_combined_result = df_combined.select( "EQUIPMENT_ID", "FROM_PERIOD", "TO_PERIOD", "TODAY", "MTBF", "UID" )
today = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo"))
df_total_operation_time = df_input_1.groupBy("EQUIPMENT_ID").agg( F.sum("OPERATION_TIME").alias("TOTAL_OPERATION_TIME") )
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
df_total_fault_count = df_input_2.groupBy("EQUIPMENT_ID").count().withColumnRenamed("count", "TOTAL_FAULT_COUNT")
df_combined = df_total_operation_time.join(df_total_fault_count, "EQUIPMENT_ID", "left")
df_combined = df_combined.withColumn("MTBF", F.col("TOTAL_OPERATION_TIME") / F.col("TOTAL_FAULT_COUNT"))
df_combined = df_combined.withColumn("MTBF", F.when(F.col("TOTAL_FAULT_COUNT") == 0, None).otherwise(F.col("MTBF")))
df_combined = df_combined.withColumn("FROM_PERIOD", F.lit(fromPeriod)) df_combined = df_combined.withColumn("TO_PERIOD", F.lit(toPeriod)) df_combined = df_combined.withColumn("TODAY", F.lit(today))
df_combined = df_combined.withColumn("UID", F.concatws("", "EQUIPMENT_ID", F.lit(fromPeriod), F.lit(toPeriod), F.lit(today)))
df_combined_result = df_combined.select( "EQUIPMENT_ID", "FROM_PERIOD", "TO_PERIOD", "TODAY", "MTBF", "UID" )
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo"))
# 计算每个设备的总稼动时间
df_total_operation_time = df_input_1.groupBy("EQUIPMENT_ID").agg(
F.sum("OPERATION_TIME").alias("TOTAL_OPERATION_TIME")
)
# 处理故障数据
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
# 计算每个设备的故障次数
df_total_fault_count = df_input_2.groupBy("EQUIPMENT_ID").count().withColumnRenamed("count", "TOTAL_FAULT_COUNT")
# 将总稼动时间和故障次数合并
df_combined = df_total_operation_time.join(df_total_fault_count, "EQUIPMENT_ID", "left")
# 计算总MTBF
df_combined = df_combined.withColumn("MTBF", F.col("TOTAL_OPERATION_TIME") / F.col("TOTAL_FAULT_COUNT"))
# 处理没有故障的情况,将MTBF设置为None
df_combined = df_combined.withColumn("MTBF", F.when(F.col("TOTAL_FAULT_COUNT") == 0, None).otherwise(F.col("MTBF")))
# 添加fromPeriod和toPeriod
df_combined = df_combined.withColumn("FROM_PERIOD", F.lit(fromPeriod))
df_combined = df_combined.withColumn("TO_PERIOD", F.lit(toPeriod))
df_combined = df_combined.withColumn("TODAY", F.lit(today))
# 生成ユニークキー
df_combined = df_combined.withColumn("UID", F.concat_ws("_", "EQUIPMENT_ID", F.lit(fromPeriod), F.lit(toPeriod), F.lit(today)))
# 选择最终列并显示结果
df_combined_result = df_combined.select(
"EQUIPMENT_ID",
"FROM_PERIOD",
"TO_PERIOD",
"TODAY",
"MTBF",
"UID"
)
fromPeriod_date = fromPeriod.split(" ")[0] toPeriod_date = toPeriod.split(" ")[0]
import datetime
import sys
# 共通機能のパッケージimport
import d2s
# from quality_scrapRate.common import Task # 追加
from pyspark.sql import functions as F # 静的解析が通らないため修正、以降これを使用する場合はF.をつけること
fromPeriod = '2024-05-01 00:00:00'
toPeriod = '2024-05-04 00:00:00'
# # メトリクス名でクラスを追加
# class quality_scrapRate(Task):
# def launch(self):
# パラメータ取得(パラメータが渡されてこない場合(指定なしの場合)は、全件対象)
# args = sys.argv
# try:
# fromPeriod = args[1]
# except: # noqaE722
# fromPeriod = '1900-01-01 00:00:00'
# try:
# toPeriod = args[2]
# except: # noqaE722
# toPeriod = '2262-04-11 00:00:00'
# print(f"fromPeriod-----------------{fromPeriod}")
# print(f"toPeriod-----------------{toPeriod}")
# 基準日算出
# today = datetime.datetime.today()
# thismonth = datetime.datetime(today.year, today.month, 1)
# if len(toPeriod) > 0:
enddate = datetime.datetime.strptime(toPeriod, '%Y-%m-%d %H:%M:%S')
# else:
# # 前月末日の値を出す
# enddate = thismonth + datetime.timedelta(days=-1)
# if len(fromPeriod) > 0:
refdate = datetime.datetime.strptime(fromPeriod, '%Y-%m-%d %H:%M:%S')
# else:
# refdate = datetime.datetime(enddate.year, enddate.month, 1)
today = datetime.datetime.today().strftime('%Y-%m-%d')
# 前処理
# データ読み込み
# 設備稼働時間
df_input_1 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
EQUIPMENT_ID,
OPERATION_END_DATE,
OPERATION_START_DATE,
OPERATION_TIME
FROM
ECM_OPERATION_TIME
WHERE
TO_TIMESTAMP(OPERATION_END_DATE) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
OPERATION_END_DATE != ''
AND
OPERATION_END_DATE IS NOT NULL
""",
)
display(df_input_1)
# 故障回数
df_input_2 = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
BREAKDOWN_TIME,
EQUIPMENT_ID
FROM
ECM_BREAKDOWN_COUNT
WHERE
TO_TIMESTAMP(BREAKDOWN_TIME) BETWEEN
TO_TIMESTAMP('{refdate}') AND TO_TIMESTAMP('{enddate}')
AND
BREAKDOWN_TIME != ''
AND
BREAKDOWN_TIME IS NOT NULL
""",
)
display(df_input_2)
# データ加工
# 設備稼働時間テーブルの設備ID、稼働終了日時を合計
df_input_1 = df_input_1.withColumn("OPERATION_END_DATE", F.from_utc_timestamp(F.col("OPERATION_END_DATE"), "Asia/Tokyo"))
# 计算每个设备的总稼动时间
df_total_operation_time = df_input_1.groupBy("EQUIPMENT_ID").agg(
F.sum("OPERATION_TIME").alias("TOTAL_OPERATION_TIME")
)
# 处理故障数据
df_input_2 = df_input_2.withColumn("BREAKDOWN_TIME", F.from_utc_timestamp(F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
# 计算每个设备的故障次数
df_total_fault_count = df_input_2.groupBy("EQUIPMENT_ID").count().withColumnRenamed("count", "TOTAL_FAULT_COUNT")
# 将总稼动时间和故障次数合并
df_combined = df_total_operation_time.join(df_total_fault_count, "EQUIPMENT_ID", "left")
# 计算总MTBF
df_combined = df_combined.withColumn("MTBF", F.col("TOTAL_OPERATION_TIME") / F.col("TOTAL_FAULT_COUNT"))
# 处理没有故障的情况,将MTBF设置为None
df_combined = df_combined.withColumn("MTBF", F.when(F.col("TOTAL_FAULT_COUNT") == 0, None).otherwise(F.col("MTBF")))
# 添加fromPeriod和toPeriod
fromPeriod_date = fromPeriod.split(" ")[0]
toPeriod_date = toPeriod.split(" ")[0]
df_combined = df_combined.withColumn("TARGET_START_DATE", F.lit(fromPeriod_date))
df_combined = df_combined.withColumn("TARGET_END_DATE", F.lit(toPeriod_date))
df_combined = df_combined.withColumn("EXEC_DATE", F.lit(today))
# 生成ユニークキー
df_combined = df_combined.withColumn("UID", F.concat_ws("_", "EQUIPMENT_ID", "TARGET_START_DATE","TARGET_END_DATE","EXEC_DATE"))
# 选择最终列并显示结果
df_combined_result = df_combined.select(
"EQUIPMENT_ID",
"TARGET_START_DATE",
"TARGET_END_DATE",
"EXEC_DATE",
"MTBF",
"UID"
)
df_combined_result = df_combined_result.sort("EQUIPMENT_ID", "TARGET_START_DATE", "TARGET_END_DATE")
display(df_combined_result)
# # データ書き込み
# # saveModeは Overwrite や Append 等の指定が可能
# d2s.writeData(
# df,
# self.dbutils,
# sfSchema="IND_ECM_DEV_SCHEMA",
# sfDatabase="IND_ECM_DEV_DB",
# dbtable="QUALITY_SCRAPRATE",
# saveMode="Append",
# )
# # 追加
# def entrypoint(): # pragma: no cover
# task = quality_scrapRate() # メトリクス名
# task.launch()
# # 追加
# if __name__ == '__main__':
# entrypoint()
加工処理(PySpark)
「生産数」と「廃品数」を「製品ID」、「基準日時」をキーに内部結合する(「生産数」と「廃品数」の紐づくデータがないもの関しては計算対象外)
「生産数」.「製品ID」ごとに集計し、「廃品数」.「廃品数」の合計 / 「生産数」.「生産数」の合計×100を計算し、「廃品率」項目を追加する
画面で入力した「集計開始日時」「集計終了日時」項目を追加する
現在日付を取得し「処理日付」項目を追加する
Anaplan用に「生産数」.「製品ID」、「処理日付」を_区切りで結合し「UID」項目を追加する
「生産数」.「製品ID」の昇順に並び替える
「廃品率」に算出データを挿入する
# データ加工
# 設備稼働時間テーブルの設備ID、稼働終了日時を合計
df_operation = df_operation.withColumn("OPERATION_END_DATE",
F.from_utc_timestamp(
F.col("OPERATION_END_DATE"),
"Asia/Tokyo"))
# 各設備の稼働時間合計
df_total_operation_time = df_operation.groupBy("EQUIPMENT_ID").agg(
F.sum("OPERATION_TIME").alias("TOTAL_OPERATION_TIME")
)
# 各設備の故障回数
df_breakdown = df_breakdown.withColumn(
"BREAKDOWN_TIME", F.from_utc_timestamp(
F.col("BREAKDOWN_TIME"), "Asia/Tokyo"))
df_total_fault_count = df_breakdown.groupBy(
"EQUIPMENT_ID").count().withColumnRenamed(
"count", "TOTAL_FAULT_COUNT")
# 設備稼働時間テーブルと故障回数テーブルを結合
df_combined = df_total_operation_time.join(
df_total_fault_count, "EQUIPMENT_ID", "inner")
# MTBF算出
df_combined = df_combined.withColumn("MEAN_TIME_BETWEEN_FAILURES",
F.col("TOTAL_OPERATION_TIME") / F.col("TOTAL_FAULT_COUNT")) # noqa
# 故障回数0の場合、MTBFをNoneに設定
df_combined = df_combined.withColumn("MEAN_TIME_BETWEEN_FAILURES",
F.when(F.col("TOTAL_FAULT_COUNT") == 0, None).otherwise(F.col("MEAN_TIME_BETWEEN_FAILURES"))) # noqa
# 画面入力集計対象期間と計算処理日時項目作成
nowdate = datetime.datetime.now().date().strftime('%Y-%m-%d')
enddate = enddate - datetime.timedelta(days=1)
df_combined = df_combined.withColumn(
"TARGET_START_DATE", F.lit(refdate.strftime('%Y-%m-%d'))).withColumn( # noqa
"TARGET_END_DATE", F.lit(enddate.strftime('%Y-%m-%d'))).withColumn( # noqa
"EXEC_DATE", F.lit(nowdate))
# 集計対象期間指定なし、または全期間指定の場合は、全データの最も古いデータ、最新のデータを設定
if wholePeriodFlg:
oldest_date_row = df_operation.orderBy(
F.asc("OPERATION_END_DATE")).first()
oldest_date = oldest_date_row["OPERATION_END_DATE"]
formatted_oldest_date = oldest_date.strftime('%Y-%m-%d')
df_combined = df_combined.withColumn(
"TARGET_START_DATE", F.lit(formatted_oldest_date))
latest_date = df_operation.select(
"OPERATION_END_DATE").orderBy(F.desc(
"OPERATION_END_DATE")).first()["OPERATION_END_DATE"]
formatted_date = latest_date.strftime('%Y-%m-%d')
df_combined = df_combined.withColumn("TARGET_END_DATE",
F.lit(formatted_date))
# ユニークキー
df_combined = df_combined.withColumn("UID", F.concat_ws(
"_", "EQUIPMENT_ID",
"EXEC_DATE"))
# 出力テーブル並び替え
df_combined_result = df_combined.select(
"EQUIPMENT_ID",
"TARGET_START_DATE",
"TARGET_END_DATE",
"EXEC_DATE",
"MEAN_TIME_BETWEEN_FAILURES",
"UID"
)
df_combined_result = df_combined_result.sort("EQUIPMENT_ID")
# データ書き込み
# saveModeは Overwrite や Append 等の指定が可能
d2s.writeData(
df_combined_result,
self.dbutils,
sfSchema="IND_ECM_DEV_SCHEMA",
sfDatabase="IND_ECM_DEV_DB",
dbtable="CONSERVATION_MEANTIMEBETWEENFAILURES",
saveMode="Append",
)
wss
<html xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" xmlns:x="urn:schemas-microsoft-com:office:excel" xmlns="http://www.w3.org/TR/REC-html40">
EQUIPMENT_ID | OPERATION_START_DATE | OPERATION_END_DATE | OPERATION_TIME -- | -- | -- | -- 設備A | 2024-05-01T04:12:34Z | 2024-05-01T05:45:12Z | 1.5444 設備A | 2024-05-01T10:00:16Z | 2024-05-01T11:30:20Z | 1.5028 設備A | 2024-05-03T22:00:10Z | 2024-05-03T23:59:59Z | 1.9999 設備A | 2024-05-04T00:00:00Z | 2024-05-04T01:45:32Z | 1.7589 設備A | 2024-05-12T05:12:24Z | 2024-05-12T08:25:37Z | 3.2222 設備A | 2024-05-15T08:14:24Z | | 設備B | 2024-05-05T08:45:00Z | 2024-05-05T12:00:00Z | 3.25 設備B | 2024-05-06T14:20:34Z | 2024-05-06T16:25:10Z | 2.0764 設備B | 2024-05-07T19:00:45Z | 2024-05-07T21:10:20Z | 2.1586 設備B | 2024-05-13T13:34:45Z | 2024-05-13T15:45:58Z | 2.1842 設備B | 2024-05-13T18:14:12Z | 2024-05-13T20:44:29Z | 2.5047 設備C | 2024-05-08T04:30:12Z | 2024-05-08T06:25:34Z | 1.9256 設備C | 2024-05-09T11:10:22Z | 2024-05-09T13:50:16Z | 2.6644 設備C | 2024-05-10T23:10:18Z | 2024-05-10T23:59:59Z | 0.8314 設備C | 2024-05-11T00:00:00Z | 2024-05-11T02:00:12Z | 2.0022 設備C | 2024-05-15T07:25:36Z | 2024-05-15T09:35:42Z | 2.1683