1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| import json import requests import config
class WeChat():
def __init__(self): self.wx = config.QIYEWEIXIN self.touser = self.wx['touser'] self.agentid = self.wx['agentid'] self.secret = self.wx['secret'] self.corpid = self.wx['corpid']
def get_access_token(self): ''' 获取access_token ''' response = requests.get( f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corpid}&corpsecret={self.secret}") data = json.loads(response.text) return data['access_token']
def get_media_id(self, path): ''' 获取图片媒体文件id ''' if path != None: access_token = self.get_access_token() curl = f'https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type=image' files = {'image': open(path, 'rb')} response = requests.post(curl, files=files) data = json.loads(response.text) return data['media_id']
@staticmethod def set_geshi(message_list): digest = '\n'.join(message_list) content = digest.replace('\n', '<br \>') return content, digest
def send_text(self, message_list): ''' 推送文本消息 ''' messages = WeChat.set_geshi(message_list)[1] text_dict = { "touser": self.touser, "msgtype": "text", "agentid": self.agentid, "text": { "content": messages }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 }
self.send(text_dict)
def send_image(self, path=None): ''' 推送图片 ''' media_id = self.get_media_id(path) tupian_dict = { "touser": self.touser, "msgtype": "image", "agentid": self.agentid, "image": { "media_id": media_id }, "safe": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 }
self.send(tupian_dict)
def send_tuwen(self, title, message_list): ''' 推送图文消息 ''' messages, digest = WeChat.set_geshi(message_list) tuwen_dict = { "touser": self.touser, "msgtype": "mpnews", "agentid": self.agentid, "mpnews": { "articles": [ { "title": title, "thumb_media_id": self.wx['media_id'], "author": "雨园", "content": messages, "digest": digest, } ] }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 }
self.send(tuwen_dict)
def send(self, dict): access_token = self.get_access_token() json_str = json.dumps(dict) res = requests.post( f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}", data=json_str)
if json.loads(res.text)['errmsg'] == 'ok': return 'ok'
|