Open kirin-ri opened 5 months ago
data = [ ("20240416", "KATATA", "JAPAN", "KATATA", "DONGGUAN", "D0202216011", "JB00306528", "D032590", "20240412", "20240501", 2222, "", "20240327"), ("20240416", "KATATA", "JAPAN", "DIC", "TEST", "D0202216011", "JB00306528", "D032590", "20240412", "20240511", 4321, "", "20240327"),#target ("20240416", "KATATA", "JAPAN", "KATATA", "DONGGUAN", "D0202216012", "JB00305484", "TP00097324", "20240412", "20240501", 3333, "", "20240327"),#target ("20240417", "KATATA", "JAPAN", "KATATA", "DONGGUAN", "D0202216011", "JB00306528", "D032590", "20240412", "20240501", 2222, "", "20240327"),#target ("20240417", "KATATA", "JAPAN", "KATATA", "DONGGUAN", "D0202216011", "JB00302223", "D0322344230", "20240412", "20240501", 4444, "", "20240327"), ("20240417", "KATATA", "JAPAN", "DIC", "TEST", "D0202216011", "123131134", "D0322344230", "20240412", "20240504", 2341, "", "20240327"),#target ("20240418", "KATATA", "JAPAN", "KATATA", "DONGGUAN", "D0202216011", "JB00302223", "D0322344230", "20240412", "20240501", 4444, "", "20240327"), ] columns = ["DATE", "COMPANY_ID", "LOC_ID", "DEST_COMPANY_ID", "DEST_LOC_ID", "ITEM_CODE", "SALES_ORDER_NO", "CUSTOMER_PURCHASE_ORDER_NO", "ETD", "ETA", "QTY", "DELIVERY_SITE", "REGISTERED_DATE"]
WHERE TO_TIMESTAMP(DATE, 'YYYYMMDD') BETWEEN TO_TIMESTAMP('{startdate.strftime('%Y%m%d')}', 'YYYYMMDD') AND TO_TIMESTAMP('{refdate.strftime('%Y%m%d')}', 'YYYYMMDD')
df_spark = spark.sql(query)
# DATEをタイムスタンプに変換
df_spark = df_spark.withColumn('DATE', to_timestamp(col('DATE'), 'yyyyMMdd'))
# 翌日のカラムを追加
df_spark = df_spark.withColumn('next_day', date_add(col('DATE'), 1))
# データフレームのマージ
merged_df = df_spark.alias('df1').join(
df_spark.alias('df2'),
(
(col('df1.next_day') == col('df2.DATE')) &
(col('df1.COMPANY_ID') == col('df2.COMPANY_ID')) &
(col('df1.LOC_ID') == col('df2.LOC_ID')) &
(col('df1.DEST_COMPANY_ID') == col('df2.DEST_COMPANY_ID')) &
(col('df1.DEST_LOC_ID') == col('df2.DEST_LOC_ID')) &
(col('df1.ITEM_CODE') == col('df2.ITEM_CODE')) &
(col('df1.SALES_ORDER_NO') == col('df2.SALES_ORDER_NO')) &
(col('df1.CUSTOMER_PURCHASE_ORDER_NO') == col('df2.CUSTOMER_PURCHASE_ORDER_NO')) &
(col('df1.ETD') == col('df2.ETD')) &
(col('df1.ETA') == col('df2.ETA')) &
(col('df1.QTY') == col('df2.QTY')) &
(col('df1.DELIVERY_SITE') == col('df2.DELIVERY_SITE')) &
(col('df1.REGISTERED_DATE') == col('df2.REGISTERED_DATE'))
),
how='left'
)
# 翌日にデータがない行をフィルタリング
vanished_next_day = merged_df.filter(col('df2.DATE').isNull())
# 翌日が指定した日付範囲に含まれるかをチェック
vanished_next_day = vanished_next_day.filter(col('df1.next_day').between(lit(startdate), lit(refdate)))
# 結果の表示(翌日が指定範囲内にあるレコードのみ)
filtered_results = vanished_next_day.select(
col('df1.next_day').alias('next_day'),
col('df1.ETA').alias('ETA'),
col('df1.QTY').alias('QTY')
)
# 結果を表示(データの最後の数行)
filtered_results.show()
# Databricksのdisplay関数を使って表示
display(filtered_results)
STORAGE_PLACE ORDER_RECIPIENT REF_DATE
vanished_next_day = vanished_next_day.withColumn('STORAGE_PLACE', concat_ws('-', col('df1.COMPANY_ID'), col('df1.LOC_ID'))) vanished_next_day = vanished_next_day.withColumn('ORDER_RECIPIENT', concat_ws('-', col('df1.DEST_COMPANY_ID'), col('df1.DEST_LOC_ID'))) vanished_next_day = vanished_next_day.withColumn('REF_DATE', lit(startdate))
import datetime
import sys
# 共通機能のパッケージimport
import d2s
# from quality_scrapRate.common import Task # 追加
from pyspark.sql import functions as F # 静的解析が通らないため修正、以降これを使用する場合はF.をつけること
# サンプルデータの準備 対象日開始日の前日と対象終了日の後日を取得。
startdate = pd.to_datetime("2024-04-17")
enddate = pd.to_datetime("2024-04-18")
targetstartdate = startdate - pd.Timedelta(days=1)
df = d2s.readData(
# self.spark,
# self.dbutils,
spark,
dbutils,
sfDatabase="DKN_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
DATE,
COMPANY_ID,
LOC_ID,
DEST_COMPANY_ID,
DEST_LOC_ID,
ITEM_CODE,
QTY,
ETA
FROM
DKN_SO
WHERE
TO_TIMESTAMP(DATE, 'YYYYMMDD') BETWEEN TO_TIMESTAMP('{targetstartdate.strftime('%Y%m%d')}', 'YYYYMMDD')
AND TO_TIMESTAMP('{enddate.strftime('%Y%m%d')}', 'YYYYMMDD')
""",
)
# DATEをタイムスタンプに変換
df = df.withColumn('DATE', F.to_timestamp(F.col('DATE'), 'yyyyMMdd'))
# 翌日のカラムを追加
df = df.withColumn('next_day', F.date_add(F.col('DATE'), 1))
# データフレームのマージ
merged_df = df.alias('df1').join(
df.alias('df2'),
(
(F.col('df1.next_day') == F.col('df2.DATE')) &
(F.col('df1.COMPANY_ID') == F.col('df2.COMPANY_ID')) &
(F.col('df1.LOC_ID') == F.col('df2.LOC_ID')) &
(F.col('df1.DEST_COMPANY_ID') == F.col('df2.DEST_COMPANY_ID')) &
(F.col('df1.DEST_LOC_ID') == F.col('df2.DEST_LOC_ID')) &
(F.col('df1.ITEM_CODE') == F.col('df2.ITEM_CODE')) &
(F.col('df1.ETA') == F.col('df2.ETA')) &
(F.col('df1.QTY') == F.col('df2.QTY'))
),
how='left'
)
# 翌日にデータがない行をフィルタリング
vanished_next_day = merged_df.filter(F.col('df2.DATE').isNull())
# 翌日が指定した日付範囲に含まれるかをチェック
vanished_next_day = vanished_next_day.filter(F.col('df1.next_day').between(F.lit(targetstartdate), F.lit(enddate)))
vanished_next_day = vanished_next_day.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID')))
vanished_next_day = vanished_next_day.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID')))
vanished_next_day = vanished_next_day.withColumn('REF_DATE', F.lit(startdate))
# 結果の表示(翌日が指定範囲内にあるレコードのみ)
filtered_results = vanished_next_day.select(
F.col('df1.next_day').alias('SHIP_DATE'),
F.col('df1.ETA').alias('ETA'),
F.col('df1.QTY').alias('QTY'),
F.col('STORAGE_PLACE'),
F.col('ORDER_RECIPIENT'),
F.col('REF_DATE')
)
filtered_results = filtered_results.orderBy(['SHIP_DATE','STORAGE_PLACE','ORDER_RECIPIENT'])
# Databricksのdisplay関数を使って表示
display(filtered_results)
2024-04-17 20240415 24 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240415 18 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240416 18 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240419 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240417 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240422 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240423 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240417 24 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240425 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240426 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240410 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240416 1770 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240416 4950 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240416 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240411 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240418 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240416 150 KATATA-JAPAN DIL-YODOGAWA 2024-04-17T00:00:00.000+00:00 2024-04-17 20240501 2880 KATATA-JAPAN KATATA-DONGGUAN 2024-04-17T00:00:00.000+00:00 2024-04-18 20240409 50 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240415 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240411 60 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 240 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240416 24 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240415 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240411 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240403 75 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 32 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240416 20 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240412 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 200 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240401 30 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 16 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240410 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240417 240 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00 2024-04-18 20240416 48 KATATA-JAPAN DIL-SHIGA 2024-04-17T00:00:00.000+00:00
df = df.withColumn('ETA', F.date_format(F.to_date(F.col('ETA'), 'yyyyMMdd'), 'yyyy-MM-dd'))
args = sys.argv
try:
fromPeriod = args[1]
except: # noqaE722
fromPeriod = ''
try:
toPeriod = args[2]
except: # noqaE722
toPeriod = ''
print(f"fromPeriod-----------------{fromPeriod}")
print(f"toPeriod-----------------{toPeriod}")
# 基準日算出
today = datetime.date.today()
if len(fromPeriod) > 0:
refdate = datetime.datetime.strptime(fromPeriod,
'%Y-%m-%d %H:%M:%S')
else:
# 今日が日曜日なら
if datetime.datetime.today().weekday() == 6:
# 先週日曜日の値を出す
refdate = today + relativedelta(weekday=SU(-1))
else:
# 先々週日曜日の値を出す
refdate = today + relativedelta(weekday=SU(-2))
startdate = refdate + datetime.timedelta(days=1)
if len(toPeriod) > 0:
enddate = datetime.datetime.strptime(toPeriod,
'%Y-%m-%d %H:%M:%S')
else:
# 先週土曜日の値を出す
enddate = today + relativedelta(weekday=SA(-1))
print(f"startdate-----------------{startdate}")
print(f"enddate-----------------{enddate}")
merged_df = df.alias('df1').join(
df.alias('df2'),
(
(F.col('df1.next_day') == F.col('df2.DATE')) &
(F.col('df1.SALESORDERNO') == F.col('df2.SALESORDERNO'))
),
how='left'
)
# 消失した注文と数量が減少した注文をフィルタリング
disappeared_orders = merged_df.filter(F.col('df2.DATE').isNull())
reduced_qty_orders = merged_df.filter((F.col('df2.DATE').isNotNull()) & (F.col('df1.QTY') > F.col('df2.QTY')))
# 消失した注文の結果を整形
disappeared_orders = disappeared_orders.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID')))
disappeared_orders = disappeared_orders.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID')))
disappeared_orders = disappeared_orders.withColumn('REF_DATE', F.lit(startdate))
# 減少した注文の結果を整形
reduced_qty_orders = reduced_qty_orders.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID')))
reduced_qty_orders = reduced_qty_orders.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID')))
reduced_qty_orders = reduced_qty_orders.withColumn('REF_DATE', F.lit(startdate))
reduced_qty_orders = reduced_qty_orders.withColumn('REDUCED_QTY', F.col('df1.QTY') - F.col('df2.QTY'))
# 消失した注文と減少した注文を結合
result = disappeared_orders.select(
F.col('df1.next_day').alias('SHIP_DATE'),
F.col('df1.ETA').alias('ETA'),
F.col('df1.QTY').alias('QTY'),
F.col('STORAGE_PLACE'),
F.col('ORDER_RECIPIENT'),
F.col('REF_DATE')
).union(
reduced_qty_orders.select(
F.col('df1.next_day').alias('SHIP_DATE'),
F.col('df1.ETA').alias('ETA'),
F.col('REDUCED_QTY').alias('QTY'),
F.col('STORAGE_PLACE'),
F.col('ORDER_RECIPIENT'),
F.col('REF_DATE')
)
)
result = result.orderBy(['SHIP_DATE', 'STORAGE_PLACE', 'ORDER_RECIPIENT'])
# Databricksのdisplay関数を使って表示
display(result)
qty_diff_df = merged_df.withColumn('QTY_DIFF', F.when(F.col('df2.QTY').isNotNull() & (F.col('df1.QTY') > F.col('df2.QTY')), F.col('df1.QTY') - F.col('df2.QTY')).otherwise(None))
vanished_or_decreased_df = qty_diff_df.filter((F.col('df2.DATE').isNull()) | (F.col('QTY_DIFF').isNotNull()))
vanished_or_decreased_df = vanished_or_decreased_df.filter(F.col('df1.next_day').between(F.lit(previous_day), F.lit(enddate)))
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID'))) vanished_or_decreased_df = vanished_or_decreased_df.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID'))) vanished_or_decreased_df = vanished_or_decreased_df.withColumn('REF_DATE', F.lit(startdate))
filtered_results = vanished_or_decreased_df.select( F.col('df1.next_day').alias('SHIP_DATE'), F.col('df1.ETA').alias('ETA'), F.when(F.col('QTY_DIFF').isNotNull(), F.col('QTY_DIFF')).otherwise(F.col('df1.QTY')).alias('QTY'), F.col('STORAGE_PLACE'), F.col('ORDER_RECIPIENT'), F.col('REF_DATE') )
filtered_results = filtered_results.orderBy(['SHIP_DATE','STORAGE_PLACE','ORDER_RECIPIENT'])
display(filtered_results)``
# QTYが減少している場合の差分を計算
qty_diff_df = merged_df.withColumn('QTY_DIFF', F.when(F.col('df2.QTY').isNotNull() & (F.col('df1.QTY') > F.col('df2.QTY')), F.col('df1.QTY') - F.col('df2.QTY')).otherwise(None))
# 翌日にデータがないか、QTYが減少している行をフィルタリング
vanished_or_decreased_df = qty_diff_df.filter((F.col('df2.DATE').isNull()) | (F.col('QTY_DIFF').isNotNull()))
# 翌日が指定した日付範囲に含まれるかをチェック
vanished_or_decreased_df = vanished_or_decreased_df.filter(F.col('df1.next_day').between(F.lit(previous_day), F.lit(enddate)))
# 必要なカラムを追加
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID')))
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID')))
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('REF_DATE', F.lit(startdate))
# 結果の表示(翌日が指定範囲内にあるレコードのみ)
filtered_results = vanished_or_decreased_df.select(
F.col('df1.next_day').alias('SHIP_DATE'),
F.col('df1.ETA').alias('ETA'),
F.when(F.col('QTY_DIFF').isNotNull(), F.col('QTY_DIFF')).otherwise(F.col('df1.QTY')).alias('QTY'),
F.col('STORAGE_PLACE'),
F.col('ORDER_RECIPIENT'),
F.col('REF_DATE')
)
filtered_results = filtered_results.orderBy(['SHIP_DATE','STORAGE_PLACE','ORDER_RECIPIENT'])
# Databricksのdisplay関数を使って表示
display(filtered_results)
QTYが減少している場合の差分を計算
merged_df = merged_df.withColumn('QTY_DIFF',
F.when(
F.col('df2.QTY').isNotNull() & (F.col('df1.QTY') > F.col('df2.QTY')),
F.col('df1.QTY') - F.col('df2.QTY')
).otherwise(F.col('df1.QTY'))
)
# 翌日にデータがないか、QTYが減少している行をフィルタリング
vanished_or_decreased_df = merged_df.filter((F.col('df2.DATE').isNull()) | (F.col('QTY_DIFF') < F.col('df1.QTY')))
# 翌日が指定した日付範囲に含まれるかをチェック
vanished_or_decreased_df = vanished_or_decreased_df.filter(F.col('df1.next_day').between(F.lit(previous_day), F.lit(enddate)))
# 必要なカラムを追加
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('STORAGE_PLACE', F.concat_ws('-', F.col('df1.COMPANY_ID'), F.col('df1.LOC_ID')))
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('ORDER_RECIPIENT', F.concat_ws('-', F.col('df1.DEST_COMPANY_ID'), F.col('df1.DEST_LOC_ID')))
vanished_or_decreased_df = vanished_or_decreased_df.withColumn('REF_DATE', F.lit(startdate))
# 結果の表示(翌日が指定範囲内にあるレコードのみ)
filtered_results = vanished_or_decreased_df.select(
F.col('df1.next_day').alias('SHIP_DATE'),
F.col('df1.ETA').alias('ETA'),
F.col('QTY_DIFF').alias('QTY'),
F.col('STORAGE_PLACE'),
F.col('ORDER_RECIPIENT'),
F.col('REF_DATE')
)
filtered_results = filtered_results.orderBy(['SHIP_DATE','STORAGE_PLACE','ORDER_RECIPIENT'])
# Databricksのdisplay関数を使って表示
display(filtered_results)
設備A,2024-05-01T04:12:34Z,2024-05-01T05:45:12Z,1.5444
設備A,2024-05-01T10:00:16Z,2024-05-01T11:30:20Z,1.5028
設備A,2024-05-03T22:00:10Z,2024-05-04T01:45:32Z,1.9999+1.7589
設備A,2024-05-12T05:12:24Z,2024-05-12T08:25:37Z,3.2222
設備A,2024-05-15T08:14:24Z,,
設備B,2024-05-05T08:45:00Z,2024-05-05T12:00:00Z,3.25
設備B,2024-05-06T14:20:34Z,2024-05-06T16:25:10Z,2.0764
設備B,2024-05-07T19:00:45Z,2024-05-07T21:10:20Z,2.1586
設備B,2024-05-13T13:34:45Z,2024-05-13T15:45:58Z,2.1842
設備B,2024-05-13T18:14:12Z,2024-05-13T20:44:29Z,2.5047
設備C,2024-05-08T04:30:12Z,2024-05-08T06:25:34Z,1.9256
設備C,2024-05-09T11:10:22Z,2024-05-09T13:50:16Z,2.6644
設備C,2024-05-10T23:10:18Z,2024-05-11T02:00:12Z,0.8314+2.0022
設備C,2024-05-15T07:25:36Z,2024-05-15T09:35:42Z,2.1683
EQUIPMENT_ID,OPERATION_START_DATE,OPERATION_END_DATE 設備A,2024-05-01T04:12:34Z,2024-05-01T05:45:12Z 設備A,2024-05-01T10:00:16Z,2024-05-01T11:30:20Z 設備A,2024-04-30T22:00:10Z,2024-05-01T01:45:32Z 設備A,2024-04-28T05:12:24Z,2024-05-01T08:25:37Z 設備A,2024-04-27T08:14:24Z, 設備B,2024-05-01T08:45:00Z,2024-05-01T12:00:00Z 設備B,2024-05-01T04:12:34Z,2024-05-01T05:45:12Z
SELECT EQUIPMENT_ID, OPERATION_START_DATE, OPERATION_END_DATE FROM YOUR_TABLE WHERE OPERATION_START_DATE >= '2024-05-01 00:00:00' AND OPERATION_END_DATE <= '2024-05-01 23:59:59' OR OPERATION_START_DATE <= '2024-05-01 23:59:59' AND OPERATION_END_DATE IS NULL
SELECT EQUIPMENT_ID, OPERATION_START_DATE, OPERATION_END_DATE FROM YOUR_TABLE_NAME WHERE (OPERATION_START_DATE <= '2024-05-01T23:59:59Z' AND (OPERATION_END_DATE >= '2024-05-01T00:00:00Z' OR OPERATION_END_DATE IS NULL)) OR (OPERATION_START_DATE <= '2024-05-01T00:00:00Z' AND OPERATION_END_DATE IS NULL);
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType())) df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
filtered_df = df.withColumn("FILTERED_START_DATE", greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00").cast(TimestampType()))) \ .withColumn("FILTERED_END_DATE", least(when(col("OPERATION_END_DATE").isNull(), lit("2024-05-01T23:59:59")).otherwise(col("OPERATION_END_DATE")), lit("2024-05-01T23:59:59").cast(TimestampType()))) \ .filter((col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59").cast(TimestampType())) & ((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00").cast(TimestampType())) | col("OPERATION_END_DATE").isNull()))
result_df = filtered_df.withColumn("DURATION", (col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600) \ .groupBy("EQUIPMENT_ID") \ .agg(_sum("DURATION").alias("TOTAL_OPERATION_HOURS"))
result_df.show()
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS")
)
# 結果を表示
result_df.show()
from pyspark.sql.functions import col, lit, greatest, least, coalesce, sum as _sum
AnalysisException: cannot resolve 'greatest(OPERATION_START_DATE, CAST('2024-05-01T00:00:00Z' AS TIMESTAMP))' due to data type mismatch: The expressions should all have the same type, got GREATEST(string, timestamp).;
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS")
)
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算と結果の表示
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS"),
min("FILTERED_START_DATE").alias("ADJUSTED_START_DATE"),
max("FILTERED_END_DATE").alias("ADJUSTED_END_DATE")
AttributeError: 'str' object has no attribute 'alias'
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算と結果の表示
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS"),
_min("FILTERED_START_DATE").alias("ADJUSTED_START_DATE"),
_max("FILTERED_END_DATE").alias("ADJUSTED_END_DATE")
)
from pyspark.sql.functions import col, lit, greatest, least, coalesce, sum as _sum, min as _min, max as _max
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算と結果の表示
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).select(
col("EQUIPMENT_ID"),
col("FILTERED_START_DATE").alias("ADJUSTED_START_DATE"),
col("FILTERED_END_DATE").alias("ADJUSTED_END_DATE"),
col("DURATION")
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS"),
_min("ADJUSTED_START_DATE").alias("ADJUSTED_START_DATE"),
_max("ADJUSTED_END_DATE").alias("ADJUSTED_END_DATE")
)
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col("OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col("OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn(
"FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit("2024-05-01T00:00:00Z").cast(TimestampType()))
).withColumn(
"FILTERED_END_DATE",
least(coalesce(col("OPERATION_END_DATE"), lit("2024-05-01T23:59:59Z").cast(TimestampType())), lit("2024-05-01T23:59:59Z").cast(TimestampType()))
).filter(
(col("OPERATION_START_DATE") <= lit("2024-05-01T23:59:59Z").cast(TimestampType())) &
((col("OPERATION_END_DATE") >= lit("2024-05-01T00:00:00Z").cast(TimestampType())) | col("OPERATION_END_DATE").isNull())
)
# 稼動時間の計算と結果の表示
result_df = filtered_df.withColumn(
"DURATION",
(col("FILTERED_END_DATE").cast("long") - col("FILTERED_START_DATE").cast("long")) / 3600
).groupBy("EQUIPMENT_ID").agg(
_sum("DURATION").alias("TOTAL_OPERATION_HOURS")
).withColumn(
"ADJUSTED_START_DATE", lit("2024-05-01T00:00:00Z").cast(TimestampType())
).withColumn(
"ADJUSTED_END_DATE", lit("2024-05-01T23:59:59Z").cast(TimestampType())
)
# 現在の日付を取得し、前日の日付を計算
today = datetime.utcnow().date()
yesterday = today - timedelta(days=1)
start_time = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
end_time = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
# タイムスタンプ文字列に変換
start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
この条件文は、以下の2つの主要な条件を組み合わせて、2024年5月1日に稼動していた設備のデータを取得します:
稼動期間が2024年5月1日に少なくとも一部が含まれている稼動。
稼動が2024年5月1日の始まり以前に開始され、まだ終了していない稼動。
import datetime
# 共通機能のパッケージimport
import d2s
from ecm_conv_raw_operation_time.common import Task # 追加
from pyspark.sql import functions as F # 静的解析が通らないため修正、以降これを使用する場合はF.をつけること
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, lit, greatest
from pyspark.sql.functions import least, coalesce, sum as _sum
# メトリクス名でクラスを追加
class ecm_conv_raw_operation_time(Task):
def launch(self):
# 基準日算出
# 現在の日付を取得し、前日の日付を計算
today = datetime.datetime.today()
today = datetime.datetime(2024, 5, 2)
yesterday = today - datetime.timedelta(days=1)
start_time = datetime.datetime(yesterday.year, yesterday.month,
yesterday.day, 0, 0, 0)
end_time = datetime.datetime(yesterday.year, yesterday.month,
yesterday.day, 23, 59, 59)
# タイムスタンプ文字列に変換
start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_time_str = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
print(start_time_str)
print(end_time_str)
# 前処理
# データ読み込み
# 生産数
df = d2s.readData(
self.spark,
self.dbutils,
sfDatabase="ECM_DEV",
sfSchema="PUBLIC",
query=f"""
SELECT
EQUIPMENT_ID,
OPERATION_START_DATE,
OPERATION_END_DATE,
OPERATION_TIME
FROM
ECM_OPERATION_TIME
WHERE
(OPERATION_START_DATE <= '{end_time_str}'
AND (OPERATION_END_DATE >= '{start_time_str}'
OR OPERATION_END_DATE IS NULL))
OR (OPERATION_START_DATE <= '{start_time_str}'
AND OPERATION_END_DATE IS NULL)
""",
)
# データ加工
# タイムスタンプ型に変換
df = df.withColumn("OPERATION_START_DATE", col(
"OPERATION_START_DATE").cast(TimestampType()))
df = df.withColumn("OPERATION_END_DATE", col(
"OPERATION_END_DATE").cast(TimestampType()))
# フィルタリングと時間の調整
filtered_df = df.withColumn("FILTERED_START_DATE",
greatest(col("OPERATION_START_DATE"), lit(start_time_str).cast(TimestampType()))).withColumn( # noqa
"FILTERED_END_DATE", least(coalesce(col( # noqa
"OPERATION_END_DATE"), lit(end_time_str).cast( # noqa
TimestampType())), lit(end_time_str).cast( # noqa
TimestampType()))).filter((col( # noqa
"OPERATION_START_DATE") <= lit(end_time_str).cast(TimestampType())) & ((col( # noqa
"OPERATION_END_DATE") >= lit(start_time_str).cast(TimestampType())) | col( # noqa
"OPERATION_END_DATE").isNull())) # noqa
# 稼動時間の計算と結果の表示
result_df = filtered_df.withColumn("DURATION",
(col("FILTERED_END_DATE").cast("long") - col( # noqa
"FILTERED_START_DATE").cast("long")) / 3600).groupBy("EQUIPMENT_ID").agg( # noqa
_sum("DURATION").alias("OPERATION_TIME")).withColumn( # noqa
"OPERATION_START_DATE", lit(start_time_str).cast(TimestampType())).withColumn( # noqa
"OPERATION_END_DATE", lit(end_time_str).cast(TimestampType())) # noqa
result_df = result_df.withColumn("EXEC_DATE", F.lit(today))
# Anaplan用のユニークキー付与
result_df = result_df.withColumn("UID",
F.concat_ws("_",
"EQUIPMENT_ID",
"OPERATION_START_DATE",
"OPERATION_END_DATE",
"EXEC_DATE"))
# 並び替え
result_df = result_df.select("EQUIPMENT_ID",
"OPERATION_START_DATE",
"OPERATION_END_DATE",
"OPERATION_TIME",
"EXEC_DATE",
"UID").sort("UID")
# データ書き込み
# saveModeは Overwrite や Append 等の指定が可能
d2s.writeData(
result_df,
self.dbutils,
sfSchema="IND_ECM_DEV_SCHEMA",
sfDatabase="IND_ECM_DEV_DB",
dbtable="CONV_RAW_OPERATION_TIME",
saveMode="Append",
)
# 追加
def entrypoint(): # pragma: no cover
task = ecm_conv_raw_operation_time() # メトリクス名
task.launch()
# 追加
if __name__ == '__main__':
entrypoint()
for UID, OPERATION_START_DATE, \
OPERATION_END_DATE, QTY, EXEC_DATE in sqlResult:
tmpData = {}
tmpLabel = UID.split("_")
tmpData["label"] = f"{tmpLabel[0]}"
tmpData["date"] = f"{OPERATION_START_DATE}"
tmpData["qty"] = QTY
data.append(tmpData)
/*
* メトリクス
* 数量のグラフ
*/
export class MetricsQty extends Metrics {
constructor() {
super('qty');
}
render(
details: MetricsDetails,
stats: StatDef[],
extraOpts?: ExtraOptsHandler,
) {
// グラフの横軸最大
const MAX_LEN = 3;
// グラフのラベル数最大
const MAX_LABELS = 6;
const builder = new ChartDataBuilder(
MAX_LEN,
MAX_LABELS,
stats,
details.data,
);
builder.newSeries(STAT_QTY, 'bar').addYScale({
type: 'linear',
position: 'left',
ticks: {
// max: 100,
min: 0,
// stepSize: 10,
},
title: { display: true, text: '[時間/回]' },
});
const op: {} = {
scales: builder.buildYScales(),
plugins: {
legend: { position: 'top' },
layout: {
padding: {
right: 200,
},
},
},
};
extraOpts?.call(this, op);
return (
<div className="card card-chart">
<div className="card-header">
<h3 className="card-title">{details?.graph_title[0]}</h3>
</div>
<div className="card-body">
<Chart
height={300}
width={600}
type="bar"
data={builder.buildData()}
options={op}
id="chart-key"
/>
</div>
</div>
);
}
}
EQUIPMENT_ID,OPERATION_START_DATE,OPERATION_END_DATE,OPERATION_TIME 設備A,2024-05-01T04:12:34Z,2024-05-01T05:45:12Z,1.5444 設備A,2024-05-01T10:00:16Z,2024-05-01T11:30:20Z,1.5028 設備A,2024-04-30T22:00:10Z,2024-05-01T01:45:32Z,3.75 設備A,2024-05-01T22:00:10Z,2024-05-02T01:45:32Z,3.75 設備A,2024-04-28T05:12:24Z,2024-04-30T08:25:37Z,75.2202 設備A,2024-04-27T08:14:24Z,, 設備A,2024-05-01T08:14:24Z,, 設備B,2024-05-01T08:45:00Z,2024-05-01T12:00:00Z,3.25 設備B,2024-05-01T04:12:34Z,2024-05-01T05:45:12Z,1.5444 設備B,2024-04-28T08:45:00Z,2024-04-28T12:00:00Z,3.25 設備B,2024-05-02T04:12:34Z,2024-05-02T05:45:12Z,1.5444 ``
oldest_date_row = df_input_1.orderBy(
F.asc("OPERATION_START_DATE")).first()
oldest_date = oldest_date_row["OPERATION_START_DATE"]
formatted_oldest_date = oldest_date.strftime('%Y-%m-%d')
df_combined_result = df_combined_result.withColumn(
"TARGET_START_DATE", F.lit(formatted_oldest_date))
latest_date = df_input_1.select(
"OPERATION_START_DATE").orderBy(F.desc(
"OPERATION_START_DATE")).first()["OPERATION_START_DATE"]
formatted_date = latest_date.strftime('%Y-%m-%d')
df_combined_result = df_combined_result.withColumn("TARGET_END_DATE",
F.lit(formatted_date))
df_combined_result = df_combined_result.sort("EQUIPMENT_ID", "TARGET_START_DATE", "TARGET_END_DATE")