ministep / SQL_DataAnalysis

SQL数据分析
9 stars 0 forks source link

python 通过经纬度地址查询附近是否有大学高校? #28

Open kemistep opened 4 years ago

kemistep commented 4 years ago

问题1: 现在已知信息,可以通过高德地图api接口,将经纬度转换成地址,并且还需要判断它附近有没有高校? 问题2: 30万的请求,怎么快点查询,如何最短的时间调用完毕,并且保存到数据库,存在IO 读取的问题; 通过python实现。

经纬度地址信息都保存在mysql数据库

kemistep commented 4 years ago

补充资料: 地理/逆地理编码-API文档-开发指南-Web服务 API | 高德地图API]

请求key,可以通过自己申请获取 或者 使用别人的key,github有很多公开的key,都是一群不注意个人保护的,不过可以供使用了

kemistep commented 4 years ago

代码解析如下

# -*- coding: utf-8 -*-
"""
逆地理编码
"""
# -*- coding: utf-8 -*-
"""
逆地理编码
"""
from py_function_tools import odps_read_sql,write_excle,write_to_database,hive_read_sql
from py_function_tools import odps_write_dataframe,odps_read_table,write_database_from_odps
import requests
import pymysql
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json,re
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent import futures
import concurrent.futures

key_api = '2edd8b08faa0e7104b52cbfeef6ab4fa'

key_api_list = ['2edd8b08faa0e7104b52cbfeef6ab4fa',
                '009c5ac1720b955e8597b41d5052bc10',
                '16ff9e74345b3140bf13667b2992e93c']
import random

 ## 逆地理编码
def get_regeo(location:str,timeout)->dict:
    """获取逆地理编码"""
    address = None
    pois = None
    addressComponent=None
    len_pois = 0 
    try:
        url ='https://restapi.amap.com/v3/geocode/regeo?parameters'
        param ={
            'key':random.choice(key_api_list),
            'location':location,
            'poitype':'高等院校',##返回附近POI类型
            'radius':'1000',##搜索半径
            'extensions':'all',##返回结果控制
            'output':'json'
        }
        response = requests.get(url,params=param, timeout=timeout)
        result = response.json()

        address = result['regeocode']['formatted_address']
        addressComponent = result['regeocode']['addressComponent']
        data = result['regeocode']['pois']
        len_pois = len(data)
        if len_pois >= 3:
            pois = data[:3]
        else:
            pois = data
        len_pois = len(data)
    except :
        print(location,'api解析失败')
    return address,addressComponent,pois,len_pois

def get_row_api(row):
    column_name = 'location'
    location_value = row[column_name]
    #print('正在解析ing',location_value)
    location_address = None 
    location_addressComponent = None 
    location_pois = None
    location_pois_length = 0  
    try: 
        location_address,location_addressComponent,location_pois,location_pois_length = get_regeo(location_value,timeout=60)
    except:
        print('pandas失败,列值是',row)
    row['location_address']=location_address
    row['location_addressComponent']=location_addressComponent
    row['location_pois']=location_pois
    row['location_pois_length']=location_pois_length
    return row

def get_chuncksize(table_name,number):
    uid_end_number = number
    odps_table_name = table_name+'_end_number_vv1_'+str(uid_end_number)
    print('正在解析的table',table_name,'尾数是',uid_end_number)
    info = None
    try:
        start_time = time.time() # 开始时间
        sql = """ select * from tmp_active_user_location_last_v1 
            where uid_end_number = '{uid_end_number}' limit 10 """.format(uid_end_number=uid_end_number)
        #table_name = 'tmp_active_user_location_last'
        #df_v1 = odps_read_table(table_name)
        df_v1 = odps_read_sql(sql)
        df_v1.head()
        print(df_v1.shape)
        df_vv1 = df_v1.copy()
        df_vv2 = df_vv1.apply(get_row_api, axis=1) # axis=1 is important to use the row itself
        end_time = time.time() #结束时间
        print("api解析程序耗时%f秒." % (end_time - start_time))    
        df_vv2 = df_vv2.applymap(str)
        print('正在写入数据库:',odps_table_name)
        odps_write_dataframe(df =df_vv2,tb_name=odps_table_name)  
        print('Successful---',odps_table_name)
        info = 'success'
    except:
        print('解析失败或者保存失败')
        info = 'fail'
    return info

def main():
    begin_time = time.time()
    #location_list=[]
    #location_list=['113.679287,23.632575','121.3994,31.16523']
    result_list = []
    table_name = 'tmp_active_user_location_last_v1'
    number_list = range(0,10)
    #with ThreadPoolExecutor(max_workers=3) as executor: # 线程
    with ProcessPoolExecutor(max_workers=3) as executor:  # 进程
        future_to_url = {executor.submit(get_chuncksize,table_name , number): number for number in number_list}

        for future in concurrent.futures.as_completed(future_to_url):
            location = future_to_url[future]
            try:
                info = future.result()
                result_list.append(info)
            except Exception as exc:
                print('%r generated an exception: %s' % (location, exc))
        times = time.time() - begin_time
        print(times)
    return result_list
if __name__ == "__main__":
    start_time = time.time() # 开始时间
    result_list = main()
    end_time = time.time() #结束时间
    print("程序耗时%f秒." % (end_time - start_time))    
    print(result_list)
kemistep commented 4 years ago

解析:

  1. 如何将经纬度解析成 地理地址;
  2. 如何在最短的时间,完成30万的解析;

针对地理解析,使用的是高德地图api:

key_api = '2edd8b08faa0e7104b52cbfeef6ab4fa'

key_api_list = ['2edd8b08faa0e7104b52cbfeef6ab4fa',
                '009c5ac1720b955e8597b41d5052bc10',
                '16ff9e74345b3140bf13667b2992e93c']
import random

 ## 逆地理编码
def get_regeo(location:str,timeout)->dict:
    """获取逆地理编码"""
    address = None
    pois = None
    addressComponent=None
    len_pois = 0 
    try:
        url ='https://restapi.amap.com/v3/geocode/regeo?parameters'
        param ={
            'key':random.choice(key_api_list),
            'location':location,
            'poitype':'高等院校',##返回附近POI类型
            'radius':'1000',##搜索半径
            'extensions':'all',##返回结果控制
            'output':'json'
        }
        response = requests.get(url,params=param, timeout=timeout)
        result = response.json()

        address = result['regeocode']['formatted_address']
        addressComponent = result['regeocode']['addressComponent']
        data = result['regeocode']['pois']
        len_pois = len(data)
        if len_pois >= 3:
            pois = data[:3]
        else:
            pois = data
        len_pois = len(data)
    except :
        print(location,'api解析失败')
    return address,addressComponent,pois,len_pois

说一说中间存在的问题:

  1. 经纬度,存在一定的概率解析失败,因此提前将值默认设置成None,再结合 try,except 避免失败;
  2. pois存在很多点,导致数据库存储失败,这里只是为了判定经纬度点是否有高校,因此只需要取几个就行了,截取一下;
kemistep commented 4 years ago

大量数据IO问题:

可以使用多线程、多进程解决这个问题;

由于使用的是pandas,希望能尽可能的还原pandas,保留原始数据的一一对应,因此采用的办法是 使用appyly 和 分表处理;

  1. 将原始数据集的字段根据用户的uid尾号拆分成10张字表,对每一个子表进行解析;
  2. 使用apply 函数,尽可能的还原数据集;
def get_row_api(row):
    column_name = 'location'
    location_value = row[column_name]
    #print('正在解析ing',location_value)
    location_address = None 
    location_addressComponent = None 
    location_pois = None
    location_pois_length = 0  
    try: 
        location_address,location_addressComponent,location_pois,location_pois_length = get_regeo(location_value,timeout=60)
    except:
        print('pandas失败,列值是',row)
    row['location_address']=location_address
    row['location_addressComponent']=location_addressComponent
    row['location_pois']=location_pois
    row['location_pois_length']=location_pois_length
    return row

##df_vv2 = df_vv1.apply(get_row_api, axis=1) # axis=1 is important to use the row itself

单个子表的处理过程如下

def get_chuncksize(table_name,number):
    uid_end_number = number
    odps_table_name = table_name+'_end_number_vv1_'+str(uid_end_number)
    print('正在解析的table',table_name,'尾数是',uid_end_number)
    info = None
    try:
        start_time = time.time() # 开始时间
        sql = """ select * from tmp_active_user_location_last_v1 
            where uid_end_number = '{uid_end_number}' limit 10 """.format(uid_end_number=uid_end_number)
        #table_name = 'tmp_active_user_location_last'
        #df_v1 = odps_read_table(table_name)
        df_v1 = odps_read_sql(sql)
        df_v1.head()
        print(df_v1.shape)
        df_vv1 = df_v1.copy()
        df_vv2 = df_vv1.apply(get_row_api, axis=1) # axis=1 is important to use the row itself
        end_time = time.time() #结束时间
        print("api解析程序耗时%f秒." % (end_time - start_time))    
        df_vv2 = df_vv2.applymap(str)
        print('正在写入数据库:',odps_table_name)
        odps_write_dataframe(df =df_vv2,tb_name=odps_table_name)  
        print('Successful---',odps_table_name)
        info = 'success'
    except:
        print('解析失败或者保存失败')
        info = 'fail'
    return info

多个子表使用线程处理,其代码如下


def main():
    begin_time = time.time()
    #location_list=[]
    #location_list=['113.679287,23.632575','121.3994,31.16523']
    result_list = []
    table_name = 'tmp_active_user_location_last_v1'
    number_list = range(0,10)
    #with ThreadPoolExecutor(max_workers=3) as executor: # 线程
    with ProcessPoolExecutor(max_workers=3) as executor:  # 进程
        future_to_url = {executor.submit(get_chuncksize,table_name , number): number for number in number_list}

        for future in concurrent.futures.as_completed(future_to_url):
            location = future_to_url[future]
            try:
                info = future.result()
                result_list.append(info)
            except Exception as exc:
                print('%r generated an exception: %s' % (location, exc))
        times = time.time() - begin_time
        print(times)
    return result_list
kemistep commented 4 years ago

这是最终存入到数据库的结果: image