chenyi852 / chenyi852.github.com

It's my first blog
Apache License 2.0
0 stars 0 forks source link

[python]爬虫框架 #51

Open chenyi852 opened 3 years ago

chenyi852 commented 3 years ago

代码来源

#!/usr/bin/env python

-- coding:utf-8 --

@Time : 2020/5/11 0011 14:24

@File : python_demo.py

import json
import os
import random
import re
import requests
import threading
import time
import logging

from lxml import etree
from queue import Queue
from datetime import datetime
from openpyxl import Workbook

日志打印 需要调试时用

logging.getLogger(“urllib3”).setLevel(logging.WARNING)
logging.getLogger(“chardet”).setLevel(logging.WARNING)
logging.getLogger(“requests”).setLevel(logging.WARNING)

logging.basicConfig(level=logging.DEBUG, format=’%(asctime)s - %(name)s - %(levelname)s: %(message)s’)

logging.basicConfig(level=logging.INFO, format=’%(asctime)s - %(name)s - %(levelname)s: %(message)s’)

logging.basicConfig(level=logging.WARNING, format=’%(asctime)s - %(name)s - %(levelname)s: %(message)s’)

logging.basicConfig(level=logging.ERROR, format=’%(asctime)s - %(name)s - %(levelname)s: %(message)s’)
LOGGER = logging.getLogger(‘python_demo’)

class PythonDemo:
“”"
很多网站用到的毫秒时间戳
cts = int(time.time() * 1000) - 5000
“”"
def init(self):

     # 忽略取消验证时的提示
     requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)

     # 所有代理接口
     self.all_proxy = AllProxy(pool_type='only_new')

     # 所有user-agent接口
     self.user_agent = UserAgent()

     # 请求数据队列
     self.get_queue = Queue()

     # 保存数据队列
     self.save_queue = Queue()

     thread_list = []

     # ''表示本地获取 new表示使用新获取的代理ip,pool表示使用代理池的代理ip
     for i in range(10):
         t_shop = threading.Thread(target=self.demo_thread, args=(i, ''))
         thread_list.append(t_shop)

     for i in range(1):
         t_shop = threading.Thread(target=self.save_thread)
         thread_list.append(t_shop)

     for t in thread_list:
         t.setDaemon(True)  # 把子线程设置为守护线程,该线程不重要主线程结束,子线程结束
         t.start()

def demo(self, get_data, proxy, proxy_type=''):
     """ 获取数据
     """

     print(get_data)

     # 尝试多少次
     for i in range(5):

         # 生成请求链接
         url = get_data
         print(url)

         headers = {
             'Accept': '*/*',
             'Accept-Encoding': 'gzip, deflate, br',
             'Accept-Language': 'zh-CN,zh;q=0.9',
             'Referer': '',
             'Sec-Fetch-Mode': 'no-cors',
             'Sec-Fetch-Site': 'same-site',
             # "User-Agent": 'Mozilla/5.0 (Linux; Android 5.1.1; KIW-AL10 Build/HONORKIW-AL10) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/35.0.1916.138 Mobile Safari/537.36 T7/6.4 baidubrowser/6.4.14.0 (Baidu; P1 5.1.1)',
             "User-Agent": self.user_agent.get_random_user_agent(),
             'Cookie': ''
         }

         # 转化成request可用的代理
         if proxy is not None:
             ip_port, proxies = proxy[0], proxy[1]
         else:
             ip_port, proxies = None, {}

         try:
             res = requests.get(url, headers=headers, proxies=proxies)
             print(res.status_code)
             if res.status_code == 200:
                 # print(res.text)

                 # 解析html数据
                 html = etree.HTML(res.text)
                 if html is not None:
                     e_data = html.xpath('//li[@post_type="0"]/a/@href')
                     # 处理html数据

                 # 解析json数据
                 json_data = json.loads(res.text)
                 print(json_data)
                 # 处理劲松数据

                 # 获取成功 退出循环
                 break

             else:
                 # 重新获取代理
                 if proxy_type == '':
                     # time.sleep(random.randint(2, 5))
                     time.sleep(1)
                 else:
                     proxy = self.all_proxy.get_proxy(proxy_type)

         except Exception as err:
             print(err)
             # 重新获取代理
             if proxy_type == '':
                 # time.sleep(random.randint(2, 5))
                 time.sleep(1)
             else:
                 proxy = self.all_proxy.get_proxy(proxy_type)

def demo_thread(self, index, proxy_type=''):
     """ 标准线程 获取队列一般是链接或者可以直接生成链接的参数
     """

     proxy = self.all_proxy.get_proxy(proxy_type)

     while True:
         try:
             get_data = self.get_queue.get()
             self.get_queue.task_done()

             proxy = self.demo(get_data, proxy, proxy_type=proxy_type)
             time.sleep(random.randint(2, 5))

         except Exception as err:
             LOGGER.exception(err)
             time.sleep(1)

def save_thread(self):
     """ 保存数据表格文件,数据过多时分成多个文件
     """

     # 文件基础名
     file_name_basic = 'demo_file'
     # 第几个文件
     file_index = 1

     wb = Workbook()
     ws = wb.active
     ws_cnt = 0
     # 每个文件最多保存多少条数据
     ws_cnt_max = 20000
     # 当前文件名
     file_name = file_name_basic + "_%s.xlsx" % file_index
     file_index += 1

     while True:
         try:
             time.sleep(10)

             while True:
                 try:
                     data = self.save_queue.get_nowait()
                     self.save_queue.task_done()
                     ws.append(data)
                     ws_cnt += 1
                 except:
                     break

             # 保存数据
             wb.save(file_name)
             print('save_data', ws_cnt)

             # 重新创建文件
             if ws_cnt >= ws_cnt_max:

                 wb = Workbook()
                 ws = wb.active
                 ws_cnt = 0
                 file_name = file_name_basic + "%s.xlsx" % file_index
                 file_index += 1

         except Exception as err:
             # LOGGER.exception(err)
             print(err)
             time.sleep(1)