mingyun / mingyun.github.io

github主页
161 stars 93 forks source link

微信聊天机器人(功能增强版) #40

Open mingyun opened 7 years ago

mingyun commented 7 years ago
import itchat
import time
import requests
import hashlib

# 图灵机器人
def get_response(msg, FromUserName):
    api_url = 'http://www.tuling123.com/openapi/api'
    apikey = '**************************'
    # data中有userd才能实现上下文一致的聊天效果。
    hash = hashlib.md5()
    userid = hash.update(FromUserName.encode('utf-8'))
    data = {'key': apikey,
            'info': msg,
            'userid': userid
            }
    try:
        req = requests.post(api_url, data=data).json()
        return req.get('text')
    except:
        return

itchat.auto_login()

#适合 个人间聊天
@itchat.msg_register(['Text', 'Map', 'Card', 'Note', 'Sharing'])
def Tuling_robot(msg):
    respones = get_response(msg['Content'], msg['FromUserName'])
    itchat.send(respones, msg['FromUserName'])

#返回图片,录音,视频
@itchat.msg_register(['Picture', 'Recording', 'Attachment', 'Video'])
def download_files(msg):
    fileDir = '%s%s'%(msg['Type'], int(time.time()))
    msg['Text'](fileDir)
    itchat.send('%s received'%msg['Type'], msg['FromUserName'])
    itchat.send('@%s@%s'%('img' if msg['Type'] == 'Picture' else 'fil', fileDir), msg['FromUserName'])

#自动同意陌生人好友申请
@itchat.msg_register('Friends')
def add_friend(msg):
    itchat.add_friend(**msg['Text'])
    itchat.send_msg('Nice to meet you!', msg['RecommendInfo']['UserName'])

Message = '整点新闻:如何留住制造业人才 董明珠霸气送房'
GroupsContainer = set()
"""
#整点发新闻
检测时间,到时间节点就触发程序执行群发消息的任务。本例子中是八点整触发程序执行群内新闻播报
通过监控群聊,收集微信群的UserName并保存起来,方便后续群发。
"""
@itchat.msg_register('Text', isGroupChat = False) #isGroupChat为True,机器人可以回复群内消息,为False不能回复群内消息
def broadcast(msg):
    response = get_response(msg['Content'], msg['FromUserName'])
    itchat.send(response, msg['FromUserName'])
    groups_json_list = itchat.get_chatrooms()
    groupsName = [nm.get('UserName') for nm in groups_json_list]
    groupsName = set(groupsName)
    for grpn in groupsName:
        GroupsContainer.add(grpn)
    while True:
        current_time = time.localtime(time.time())
        if ((current_time.tm_hour == 8) and (current_time.tm_min == 0) and (current_time.tm_sec == 0)):
            for grn in GroupsContainer:
                itchat.send(Message, grn)

itchat.run()
mingyun commented 7 years ago
#https://github.com/discountry/itchat-examples/blob/master/examples/auto-reply-greetings.py
import itchat, time, re
from itchat.content import *

@itchat.msg_register([TEXT])
def text_reply(msg):
    match = re.search('年', msg['Text']).span()
    if match:
      itchat.send(('那我就祝你鸡年大吉吧'), msg['FromUserName'])

@itchat.msg_register([PICTURE, RECORDING, VIDEO, SHARING])
def other_reply(msg):
    itchat.send(('那我就祝你鸡年大吉吧'), msg['FromUserName'])

itchat.auto_login(enableCmdQR=True,hotReload=True)
itchat.run()
mingyun commented 7 years ago

大邓带你玩转python

import itchat
import time
import requests
import hashlib

#图灵机器人
def get_response(msg, FromUserName):
    api_url = 'http://www.tuling123.com/openapi/api'
#base64.b64encode('xxx'.encode('utf-8'))
    # 请在此修改为你刚刚注册的图灵机器人api_key 
    apikey = 'ZDc1NjI4MGIxOTExNDFlYmE5ZDk3ODk4ZTRlMTE5YTQ='
    # data中有userd才能实现上下文一致的聊天效果。
    hash = hashlib.md5()
    userid = hash.update(FromUserName.encode('utf-8'))
    data = {'key': apikey,
            'info': msg,
            'userid': userid
            }
    try:
        req = requests.post(api_url, data=data).json()
        return req.get('text')
    except:
        return

itchat.auto_login()

#适合 个人间聊天
@itchat.msg_register(['Text', 'Map', 'Card', 'Note', 'Sharing'])
def Tuling_robot(msg):
    respones = get_response(msg['Content'], msg['FromUserName'])
    itchat.send(respones, msg['FromUserName'])

itchat.run()
mingyun commented 7 years ago

签到统计

#!/usr/bin/env python3
import itchat
from itchat.content import TEXT
from peewee import *
import datetime

CHAT_ROOM = '100days'

db = MySQLDatabase('xxxxxxx', user='xxxxxxxxxx', password='xxxxxxxxxxx', charset='utf8mb4')

class BaseModel(Model):
        class Meta:
                database = db

class User(BaseModel):
        username = CharField(unique=True,max_length=100)
        openid = CharField(unique=True,max_length=100)
        count = IntegerField(default=1)
        updated_date = DateTimeField(default=datetime.datetime.now)

@itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True, isMpChat=True)
def simple_reply(msg):
    if not 'helper' in msg['Text']: return
    friend = itchat.search_friends(nickName=msg['ActualNickName'])
    if friend:
        print(friend)
    print(msg)
    response = response_handler(msg)
    print(response)
    return response

def response_handler(msg):
    response = ''
    if '签到' in msg['Text'] and 'helper' in msg['Text']:
        response = check_in(msg) + '\n用户: %s' % msg['ActualNickName']
    elif '检查' in msg['Text'] and 'helper' in msg['Text']:
        response = print_unchecked_username(get_unchecked_member())
    elif '清理' in msg['Text'] and 'helper' in msg['Text']:
        response = delete_unchecked_member(get_unchecked_member())
    elif '排行' in msg['Text'] and 'helper' in msg['Text']:
        response = print_top_members()
    else:
        response = '未定义操作!'
    return response

def check_in(userinfo):
    db.connect()
    if User.select().where(User.username == userinfo['ActualNickName']).count() < 1:
        User.create(username=userinfo['ActualNickName'],openid=userinfo['ActualUserName'])
        return '开始你的100天挑战吧!中断1天就会被踢出袄~'
    if User.get(User.username == userinfo['ActualNickName']).updated_date.date().strftime("%Y-%m-%d") < datetime.datetime.now().strftime("%Y-%m-%d"):
        User.update(updated_date = datetime.datetime.now(),count=User.count + 1).where(User.username == userinfo['ActualNickName']).execute()
        return '签到成功!' + '时间: %s' % User.get(User.username == userinfo['ActualNickName']).updated_date
    return '您今日已签到!'

def get_unchecked_user():
    memberList = []
    itchat.get_friends(update=True)
    unchecked_users = User.select().where(User.updated_date < datetime.datetime.now().strftime("%Y-%m-%d"))
    for user in unchecked_users:
        if itchat.search_friends(nickName=user.username):
            memberList.append(itchat.search_friends(nickName=user.username)[0])
    return memberList

def get_unchecked_member():
    itchat.get_chatrooms(update=True)
    chatroom = itchat.search_chatrooms('100days')[0]
    memberList = []
    itchat.get_friends(update=True)
    unchecked_users = User.select().where(User.updated_date < datetime.datetime.now().strftime("%Y-%m-%d"))
    for user in unchecked_users:
        if [m for m in chatroom['MemberList'] if m['NickName'] == user.username]:
            memberList.append([m for m in chatroom['MemberList'] if m['NickName'] == user.username][0])
    return memberList

def print_top_members():
    memberList = []
    top_members = User.select().order_by(User.count.desc()).limit(10)
    content = '签到排行:\n'
    for member in top_members:
        content += member.username + ':' + str(member.count) + '\n'
    return content

def print_unchecked_username(memberList):
    if not memberList:
        return '所有成员均已签到!'
    content = '未签到用户:\n'
    for member in memberList:
        content += member['NickName'] + '\n'
    return content

def delete_unchecked_member(memberList):
    if datetime.datetime.now().hour < 23: return '请在23时以后清理未签到成员!'
    itchat.get_chatrooms(update=True)
    chatroom = itchat.search_chatrooms(CHAT_ROOM)[0]
    print(chatroom)
    itchat.update_chatroom(chatroom['UserName'],detailedMember=True)
    itchat.delete_member_from_chatroom(chatroom['UserName'], memberList)
    return '未签到成员清除完毕!'

itchat.auto_login(enableCmdQR=2,hotReload=True)
itchat.run()
mingyun commented 7 years ago
def auto_add_member(msg,roomName):
    friend = itchat.search_friends(userName=msg['FromUserName'])
    print(friend)
    itchat.get_chatrooms(update=True)
    chatroom = itchat.search_chatrooms(roomName)[0]
    print(chatroom['UserName'])
    r = itchat.add_member_into_chatroom(chatroom['UserName'], [friend], useInvitation=True)
    print(r)
    if r['BaseResponse']['ErrMsg'] == '请求成功':
      return '自动邀请加入群聊成功!请等待获取加群链接!'
    else:
      return '请求发生错误,请重试!'

@itchat.msg_register(TEXT)
def text_reply(msg):
    print(msg)
    if 'IFE' in msg['Text'] or 'ife' in msg['Text']:
      return auto_add_member(msg,'2017IFE抱团群')
    if '签到' in msg['Text'] or '100days' in msg['Text']:
      return auto_add_member(msg,'100days')
    if 'fcc' in msg['Text'] or 'FCC' in msg['Text']:
      return auto_add_member(msg,'FCC知乎学习小组')

# 收到好友邀请自动添加好友
@itchat.msg_register(FRIENDS)
def add_friend(msg):
        print(msg)
        itchat.add_friend(msg['RecommendInfo']['UserName'],status=3,verifyContent='自动添加好友成功!') # 该操作会自动将新好友的消息录入,不需要重载通讯录

itchat.auto_login(enableCmdQR=2,hotReload=True)
itchat.run()
mingyun commented 7 years ago
'''
本代码最好运行在python3环境下
运行代码之前先通过pip安装itchat以及pillow
pip install itchat pillow
'''

# 开启自动邀请加入群聊功能的微信群组列表,字典key是好友发来消息的触发关键字,value是群聊的名称,你可以配置相互对应任意数量的关键词:群组
group_dict = {'fcc':'FCC知乎学习小组', '北京':'freecodecamp北京', 'bj':'freecodecamp北京', '100days':'100days', 'ife':'2017IFE'}

#自动搜索好友列表,邀请好友加群的主要逻辑
def auto_add_member(userName,roomName):
    friend = itchat.search_friends(userName=userName)
    print(friend)
    itchat.get_chatrooms(update=True)
    chatroom = itchat.search_chatrooms(roomName)[0]
    print(chatroom['UserName'])
    #如果群聊人数不满100人,可以去掉useInvitation=True,这样好友发送关键字后会被直接加入群
    r = itchat.add_member_into_chatroom(chatroom['UserName'], [friend], useInvitation=True)
    print(r)
    if r['BaseResponse']['ErrMsg'] == '请求成功':
      return '自动邀请加入群聊成功!请等待获取加群链接!'
    else:
      return '请求发生错误,请重试!'

#处理微信聊天消息,根据关键字返回相应群组邀请链接
@itchat.msg_register(TEXT)
def auto_invite_reply(msg):
    if msg['Text'].replace(" ", "").lower() in group_dict:
        return auto_add_member(msg['FromUserName'],group_dict[msg['Text'].replace(" ", "").lower()])

# 收到好友邀请自动添加好友
@itchat.msg_register(FRIENDS)
def add_friend(msg):
        print(msg)
        itchat.add_friend(msg['RecommendInfo']['UserName'],status=3,verifyContent='自动添加好友成功!') # 该操作会自动将新好友的消息录入,不需要重载通讯录
        if msg['RecommendInfo']['Content'].replace(" ", "").lower() in group_dict:
            auto_add_member(msg['RecommendInfo']['UserName'],group_dict[msg['RecommendInfo']['Content'].replace(" ", "").lower()])

'''
如果是在Linux环境下,请设置
enableCmdQR=2
其他操作系统平台请设置
enableCmdQR=True
'''

itchat.auto_login(enableCmdQR=2,hotReload=True, statusStorageDir='wechat_auto.pkl')
itchat.run()

'''
linux下可以通过如下命令使脚本在后台运行
nohup auto_invite_out_of_box.py > record.log 2>&1&
或者使用screen保持脚本不会因ssh断开而中断,也可以在后台挂起运行
chmod +x auto_invite_out_of_box.py
screen ./auto_invite_out_of_box.py
或者Windows和OSX平台只需要开着电脑挂着一个终端不要关闭就好了
'''
mingyun commented 7 years ago
import logging
#自动响应加好友请求,根据验证信息邀请入群
from wxpy import *

logger = logging.getLogger()
logger.setLevel(logging.INFO)

bot = Bot('bot.pkl', console_qr=-2)
if bot.self.nick_name == '游否':
    raise ValueError('Wrong User!')

group = ensure_one(bot.groups().search('wxpy 交流群'))
tuling = Tuling()

def valid(msg):
    return 'wxpy' in msg.text.lower()

def invite(user):
    if user in group:
        logger.info('{} is already in {}'.format(user, group))
        user.send('你已经加入 {} 啦'.format(group.nick_name))
    else:
        logger.info('inviting {} to {}'.format(user, group))
        group.add_members(user, use_invitation=True)

@bot.register(msg_types=FRIENDS)
def new_friends(msg):
    user = msg.card.accept()
    if valid(msg):
        invite(user)
    else:
        user.send('你忘了写加群口令啦,快回去看看口令是啥~')

@bot.register(Friend, msg_types=TEXT)
def exist_friends(msg):
    if valid(msg):
        invite(msg.sender)
    else:
        tuling.do_reply(msg)

bot.start(False)
embed()
mingyun commented 7 years ago
from wxpy import *

bot = Bot()

tuling = Tuling('你的 API KEY (http://www.tuling123.com/)')
my_friend = ensure_one(bot.friends().search('好友的名称'))

#https://gist.github.com/youfou/88624bfce3a5450d079edea41f77cf63
@bot.register(my_friend, TEXT)
def tuling_reply(msg):
    tuling.do_reply(msg)

bot.start()
mingyun commented 7 years ago
from xml.etree import ElementTree as ETree
from wxpy import *
#将被撤回的微信消息发送到文件传输助手
bot = Bot()

@bot.register(msg_types=NOTE)
def get_revoked(msg):
    revoked = ETree.fromstring(msg.raw['Content']).find('revokemsg')
    if revoked:
        revoked_msg = bot.messages.search(id=int(revoked.find('msgid').text))[0]
        bot.file_helper.send(revoked_msg)

bot.start()
mingyun commented 7 years ago
#coding=utf-8
import itchat
from itchat.content import *
import pdb
global name

@itchat.msg_register([TEXT, MAP, CARD, NOTE, SHARING], True, False, False) 
# get text and send to XiaoIce # 将文字等信息转发给小冰
def send_xiaoice(msg):
  global name
  name = msg['FromUserName']
  itchat.send(msg['Text'],toUserName='xiaoice-ms')

@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO], True, False, False) 
# get img and send to XiaoIce # 将图片等信息转发给小冰
def send_xiaoice(msg):
  global name
  name = msg['FromUserName']
  msg['Text'](msg['FileName'])
  itchat.send('@%s@%s' % ({'Picture': 'img', 'Video': 'vid'}.get(msg['Type'], 'fil'), msg['FileName']), toUserName='xiaoice-ms')

@itchat.msg_register([TEXT, MAP, CARD, NOTE, SHARING], False, False, True) 
# get text and send to Sender # 将小冰回复的文字等信息转发给发送者
def send_reply(msg):
  global name
  itchat.send(msg['Text'],name)

@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO], False, False, True) 
# get img and send to Sender 将小冰回复的图片等信息转发给发送者https://zhuanlan.zhihu.com/p/25912740
def send_xiaoice(msg):
  global name
  msg['Text'](msg['FileName'])
  itchat.send('@%s@%s' % ({'Picture': 'img', 'Video': 'vid'}.get(msg['Type'], 'fil'), msg['FileName']), name)

itchat.auto_login(hotReload=True)
itchat.run()
mingyun commented 7 years ago
#https://zhuanlan.zhihu.com/p/26514576

import itchat

# 先登录
itchat.login()

# 获取好友列表
friends = itchat.get_friends(update=True)[0:]

# 初始化计数器,有男有女,当然,有些人是不填的
male = female = other = 0

# 遍历这个列表,列表里第一位是自己,所以从"自己"之后开始计算
# 1表示男性,2女性
for i in friends[1:]:
    sex = i["Sex"]
    if sex == 1:
        male += 1
    elif sex == 2:
        female += 1
    else:
        other += 1

# 总数算上,好计算比例啊~
total = len(friends[1:])

# 好了,打印结果
print u"男性好友:%.2f%%" % (float(male) / total * 100)
print u"女性好友:%.2f%%" % (float(female) / total * 100)
print u"其他:%.2f%%" % (float(other) / total * 100)
# 使用echarts,加上这段
from echarts import Echart, Legend, Pie

chart = Echart(u'%s的微信好友性别比例' % (friends[0]['NickName']), 'from WeChat')
chart.use(Pie('WeChat',
              [{'value': male, 'name': u'男性 %.2f%%' % (float(male) / total * 100)},
               {'value': female, 'name': u'女性 %.2f%%' % (float(female) / total * 100)},
               {'value': other, 'name': u'其他 %.2f%%' % (float(other) / total * 100)}],
              radius=["50%", "70%"]))
chart.use(Legend(["male", "female", "other"]))
del chart.json["xAxis"]
del chart.json["yAxis"]
chart.plot()

# coding:utf-8
import itchat
import re

itchat.login()
friends = itchat.get_friends(update=True)[0:]
tList = []
for i in friends:
    signature = i["Signature"].replace(" ", "").replace("span", "").replace("class", "").replace("emoji", "")
    rep = re.compile("1f\d.+")
    signature = rep.sub("", signature)
    tList.append(signature)

# 拼接字符串
text = "".join(tList)

# jieba分词
import jieba
wordlist_jieba = jieba.cut(text, cut_all=True)
wl_space_split = " ".join(wordlist_jieba)

# wordcloud词云
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import PIL.Image as Image

# 这里要选择字体存放路径,这里是Mac的,win的字体在windows/Fonts中
my_wordcloud = WordCloud(background_color="white", max_words=2000, 
                         max_font_size=40, random_state=42,
                         font_path='/Users/sebastian/Library/Fonts/Arial Unicode.ttf').generate(wl_space_split)

plt.imshow(my_wordcloud)
plt.axis("off")
plt.show()

# wordcloud词云
import matplotlib.pyplot as plt
from wordcloud import WordCloud, ImageColorGenerator
import os
import numpy as np
import PIL.Image as Image

d = os.path.dirname(__file__)
alice_coloring = np.array(Image.open(os.path.join(d, "wechat.jpg")))
my_wordcloud = WordCloud(background_color="white", max_words=2000, mask=alice_coloring,
                         max_font_size=40, random_state=42,
                         font_path='/Users/sebastian/Library/Fonts/Arial Unicode.ttf')\
    .generate(wl_space_split)

image_colors = ImageColorGenerator(alice_coloring)
plt.imshow(my_wordcloud.recolor(color_func=image_colors))
plt.imshow(my_wordcloud)
plt.axis("off")
plt.show()

# 保存图片 并发送到手机https://zhuanlan.zhihu.com/p/26514576
my_wordcloud.to_file(os.path.join(d, "wechat_cloud.png"))
itchat.send_image("wechat_cloud.png", 'filehelper')