ministep / SQL_DataAnalysis

SQL数据分析
9 stars 0 forks source link

多线程采集股票数据,避免io慢问题 #38

Open kemistep opened 4 years ago

kemistep commented 4 years ago
#common 是自写的模块函数
from common.mysql_pipeline import MysqlDBPipeline
db = MysqlDBPipeline()
#采集股票历史数据
from common.stock.get_xueqiu_quote_stock import get_xueqiu_quote_stock_daily
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent import futures
import concurrent.futures

def main():
    begin_time = time.time()
    result_list = []
    table_name = 'dim_stock_config'
    number_list = range(1,3)
    df_v1 = db.read_query_table(table_name)
    stock_code_list = df_v1['stock_code'].tolist()
    with ThreadPoolExecutor(max_workers=3) as executor: # 线程
    #with ProcessPoolExecutor(max_workers=3) as executor:  # 进程
        future_to_url = {executor.submit(get_xueqiu_quote_stock_daily,stock_code): stock_code for stock_code in stock_code_list}
        for future in concurrent.futures.as_completed(future_to_url):
            location = future_to_url[future]
            try:
                info = future.result()
                result_list.append(info)
            except Exception as exc:
                print('%r generated an exception: %s' % (location, exc))
        times = time.time() - begin_time
        print(times)
    return result_list
if __name__ == "__main__":
    start_time = time.time() # 开始时间
    result_list = main()
    end_time = time.time() #结束时间
    print("程序耗时%f秒." % (end_time - start_time))    
    print(result_list)
kemistep commented 4 years ago

这里是多线程精华

    with ThreadPoolExecutor(max_workers=3) as executor: # 线程
    #with ProcessPoolExecutor(max_workers=3) as executor:  # 进程
        future_to_url = {executor.submit(get_xueqiu_quote_stock_daily,stock_code): stock_code for stock_code in stock_code_list}
        for future in concurrent.futures.as_completed(future_to_url):
            location = future_to_url[future]
            try:
                info = future.result()
                result_list.append(info)
            except Exception as exc:
                print('%r generated an exception: %s' % (location, exc))
        times = time.time() - begin_time
        print(times)