facebook / prophet

Tool for producing high quality forecasts for time series data that has multiple seasonality with linear or non-linear growth.
https://facebook.github.io/prophet
MIT License
18.25k stars 4.5k forks source link

Doing forecast with prophet in threads the fitting process will be stuck #2345

Open wdongdongde opened 1 year ago

wdongdongde commented 1 year ago
image

then, in each thread, i initialize a new Prophet object:

image

after running a period of time, the calculating process is stuck.

wdongdongde commented 1 year ago

main part of source code is: ` import pytz import time import json import datetime import copy from datetime import timezone import numpy as np import pandas as pd import requests import traceback from prophet import Prophet from log_utils import logger

def get_str_time_from_ts(ts: int) -> str: tz = pytz.timezone('Asia/Shanghai') dt = datetime.datetime.fromtimestamp(ts, tz) return dt.strftime('%Y-%m-%d %H:%M:%S')

def get_ts_from_str_time(strtime: str) -> int:

没有时区信息的时间

dt = datetime.datetime.strptime(strtime, '%Y-%m-%d %H:%M:%S')
# 转成utc,不直接转东八区,是因为直接转会有误差
utc_dt = dt.replace(tzinfo=timezone.utc)
# 得到ututc的时间戳
stamp = utc_dt.timestamp()
return int(stamp - 8 * 60 * 60)

def get_seconds_from_interval(interval): if interval == "min" or interval == "MS": return 60 return 60

def prophet_predict(point_num: int, interval: str, type: str, timeseries: list, now_time: str, params: dict, meta: dict = None): logger.info( "prophet_predict params: point_num:{},interval:{},type:{}, now_time:{},params:{}".format( point_num, interval, type, now_time, params))

这里是按间隔得到了预测的时间,point_num是自预测时间开始的点数,实际预测的点为当前时间到Now_time的点数+pointNum

total_num = (int(now_time) - int(timeseries[-1][0])) // 60 + point_num
length = len(timeseries)
logger.info("timeseries detail: length:{}, first three:{}, last three:{}".format(length, timeseries[0:3],
                                                                                 timeseries[length - 3:length]))
try:
    start = time.time()
    if str(timeseries[0][0], 'utf-8') == "" and int(now_time) > 0:
        data = pd.DataFrame()
        data["y"] = [t[1] for t in timeseries]
        data['y'] = data['y'].astype('float32')
        num = len(timeseries)
        now_time = int(now_time)
        start_time = now_time - (num - 1) * get_seconds_from_interval(interval)
        data['ds'] = pd.date_range(get_str_time_from_ts(start_time), get_str_time_from_ts(now_time), freq=interval)
        logger.info("generate data cost:{}".format(time.time() - start))
    else:
        data = pd.DataFrame(timeseries, columns=["ds", "y"])
        data['ds'] = data['ds'].astype('int')
        data['ds'] = data.apply(lambda x: get_str_time_from_ts(x['ds']), axis=1)
        data['y'] = data['y'].astype('float32')
        data['ds'] = pd.to_datetime(data["ds"])
        logger.info("data:{}".format(data))

    floor = None
    cap = None
    if params.get("floor") is not None and params.get("cap") is not None:
        floor = params.get("floor")
        cap = params.get("cap")
        data['floor'] = floor
        data['cap'] = cap
        params.pop('floor')
        params.pop('cap')
        params.update({'growth': 'logistic'})

    s = time.time()
    m = Prophet(**params)
    m.fit(data)
    future = m.make_future_dataframe(periods=total_num, freq=interval)
    if floor is not None and cap is not None:
        future['floor'] = floor
        future['cap'] = cap
    forecast = m.predict(future)
    logger.info("make forcast total_num:{}, cost:{}".format(total_num, time.time() - s))

    # 返回需要的结果
    size = len(forecast)
    yhat = get_timeseries_from_df(forecast[["ds", "yhat"]][size - point_num:size], start_time=now_time,
                                  value="yhat")
    yhat_upper = get_timeseries_from_df(forecast[["ds", "yhat_upper"]][size - point_num:size], start_time=now_time,
                                        value="yhat_upper")
    yhat_lower = get_timeseries_from_df(forecast[["ds", "yhat_lower"]][size - point_num:size], start_time=now_time,
                                        value="yhat_lower")
    logger.info("prophet_predict cost:{}".format(time.time() - start))
    logger.info("yhat:{}".format(yhat))
    logger.info("yhat_upper:{}".format(yhat_upper))
    logger.info("yhat_lower:{}".format(yhat_lower))

    # if type == "value-predict":
    #     if meta is None:
    #         return np.array([]), yhat, np.array([])
    # if type == "dynamic-thresh":
    #     if meta is None:
    #         return yhat_upper, np.array([]), yhat_lower
    #     else:
    #         data, url = gen_records(yhat_upper, yhat_lower, meta)
    #         push_prom(data, url)
except Exception as e:
    msg = traceback.format_exc()
    logger.error("background thread error:{}".format(msg))

def gen_records(yhat_upper, yhat_lower, meta): url = meta.get("url") metric = meta.get("metric") attr_meta = meta.get("meta") if attr_meta is None: attr_meta = dict() upper_attr_meta = copy.deepcopy(attr_meta) upper_attr_meta["index"] = "0"

lower_attr_meta = copy.deepcopy(attr_meta)
lower_attr_meta["index"] = "1"

data = []
for upper in yhat_upper:
    data_map = {}
    data_map["ts"] = upper[0] * 1000
    data_map["v"] = upper[1]
    data_map["n"] = metric
    # 上限
    data_map["ls"] = upper_attr_meta
    data.append(data_map)

for lower in yhat_lower:
    data_map = {}
    data_map["ts"] = lower[0] * 1000
    data_map["v"] = lower[1]
    data_map["n"] = metric
    data_map["ls"] = lower_attr_meta
    data.append(data_map)
length = len(data)
logger.info("data to push detail: length:{}, first three:{}, last three:{}".format(length, data[0:3],
                                                                                   data[length - 3:length]))
return data, url

def push_prom(records: list, url="http://alert-store.bigo.sg:8881/insert/42/prometheus"): pass

def get_timeseries_from_df(df, start_time, timestamp="ds", value="yhat"): df[timestamp] = df[timestamp].astype('str') timeseries_list = [] for index, row in df.iterrows(): t = get_ts_from_str_time(row[timestamp]) if row[value] >= 0: val = row[value] else: val = 0 if t >= int(start_time): timeseries_list.append([t, val]) else: logger.info("time {} is smaller than :{}".format(t, start_time)) return np.array(timeserieslist, dtype=np.object)

转成时间戳的时候如果时区不对会有问题

# return np.array([[get_ts_from_str_time(row[timestamp]), row[value] if row[value] >= 0 else 0] for index, row in df.iterrows()],
#     dtype=np.object_)

def get_str_timeseries_from_df(df, timestamp="ds", value="yhat"): df[timestamp] = df[timestamp].astype('str')

转成时间戳的时候如果时区不对会有问题

return np.array([[str(get_ts_from_str_time(row[timestamp])), str(row[value])] for index, row in df.iterrows()],
                dtype=np.object_)

if name == "main": from concurrent.futures import ThreadPoolExecutor import threading def run(timeseries): point_num = 10 interval = "min" type = "dynamic-thresh" now_time = "1652932800" params = {'growth': 'flat'} meta = { 'url': 'http://alert-store.bigo.sg:8881/insert/42/prometheus', 'metric': 'devops_aiops_dynamic_thresh_test'}

    prophet_predict(point_num, interval, type, timeseries, now_time, params, meta)

df = pd.read_csv(
    "/Users/bigo/Projects/aiops/python_scripts/data/original/L7-traffic_setting.sharemasala.com.csv")
ori_timeseries = df.values.tolist()[-14400:-1]
timeseries = []
for t in ori_timeseries:
    timeseries.append([bytes(str(int(t[0])), encoding='utf-8'), bytes(str(t[1]), encoding='utf-8')])

# pool = ThreadPoolExecutor(max_workers=50)
# for i in range(500):
#     pool.submit(run, timeseries)
start = time.time()
thread_list = []
for i in range(50):
    thread = threading.Thread(target=run,
                              args=(timeseries,))
    thread_list.append(thread)

for t in thread_list:
    t.daemon = True
    t.start()

for t in thread_list:
    t.join()
end = time.time()
print("结束:", end - start)

` result:

image
vesran commented 1 year ago

UP I've got a similar problem but couldn't know why. Any idea ?

RamiroGhilino commented 11 months ago

Similar problem, in my case using Process instead of Thread (tried multiprocessing and concurrent.futures). Any update?