|
- import os
- import time
- import re
- import io
- import threading
- import json
- import xml.dom.minidom
- import random
- import traceback
- import logging
- try:
- from httplib import BadStatusLine
- except ImportError:
- from http.client import BadStatusLine
-
- import requests
- from pyqrcode import QRCode
-
- from .. import config, utils
- from ..returnvalues import ReturnValue
- from ..storage.templates import wrap_user_dict
- from .contact import update_local_chatrooms, update_local_friends
- from .messages import produce_msg
-
- logger = logging.getLogger('itchat')
-
-
- def load_login(core):
- core.login = login
- core.get_QRuuid = get_QRuuid
- core.get_QR = get_QR
- core.check_login = check_login
- core.web_init = web_init
- core.show_mobile_login = show_mobile_login
- core.start_receiving = start_receiving
- core.get_msg = get_msg
- core.logout = logout
-
-
- def login(self, enableCmdQR=False, picDir=None, qrCallback=None,
- loginCallback=None, exitCallback=None):
- if self.alive or self.isLogging:
- logger.warning('itchat has already logged in.')
- return
- self.isLogging = True
- logger.info('Ready to login.')
- while self.isLogging:
- uuid = push_login(self)
- if uuid:
- qrStorage = io.BytesIO()
- else:
- logger.info('Getting uuid of QR code.')
- while not self.get_QRuuid():
- time.sleep(1)
- logger.info('Downloading QR code.')
- qrStorage = self.get_QR(enableCmdQR=enableCmdQR,
- picDir=picDir, qrCallback=qrCallback)
- # logger.info('Please scan the QR code to log in.')
- isLoggedIn = False
- while not isLoggedIn:
- status = self.check_login()
- if hasattr(qrCallback, '__call__'):
- qrCallback(uuid=self.uuid, status=status,
- qrcode=qrStorage.getvalue())
- if status == '200':
- isLoggedIn = True
- elif status == '201':
- if isLoggedIn is not None:
- logger.info('Please press confirm on your phone.')
- isLoggedIn = None
- time.sleep(7)
- time.sleep(0.5)
- elif status != '408':
- break
- if isLoggedIn:
- break
- elif self.isLogging:
- logger.info('Log in time out, reloading QR code.')
- else:
- return # log in process is stopped by user
- logger.info('Loading the contact, this may take a little while.')
- self.web_init()
- self.show_mobile_login()
- self.get_contact(True)
- if hasattr(loginCallback, '__call__'):
- r = loginCallback()
- else:
- # utils.clear_screen()
- if os.path.exists(picDir or config.DEFAULT_QR):
- os.remove(picDir or config.DEFAULT_QR)
- logger.info('Login successfully as %s' % self.storageClass.nickName)
- self.start_receiving(exitCallback)
- self.isLogging = False
-
-
- def push_login(core):
- cookiesDict = core.s.cookies.get_dict()
- if 'wxuin' in cookiesDict:
- url = '%s/cgi-bin/mmwebwx-bin/webwxpushloginurl?uin=%s' % (
- config.BASE_URL, cookiesDict['wxuin'])
- headers = {'User-Agent': config.USER_AGENT}
- r = core.s.get(url, headers=headers).json()
- if 'uuid' in r and r.get('ret') in (0, '0'):
- core.uuid = r['uuid']
- return r['uuid']
- return False
-
-
- def get_QRuuid(self):
- url = '%s/jslogin' % config.BASE_URL
- params = {
- 'appid': 'wx782c26e4c19acffb',
- 'fun': 'new',
- 'redirect_uri': 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxnewloginpage?mod=desktop',
- 'lang': 'zh_CN'}
- headers = {'User-Agent': config.USER_AGENT}
- r = self.s.get(url, params=params, headers=headers)
- regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)";'
- data = re.search(regx, r.text)
- if data and data.group(1) == '200':
- self.uuid = data.group(2)
- return self.uuid
-
-
- def get_QR(self, uuid=None, enableCmdQR=False, picDir=None, qrCallback=None):
- uuid = uuid or self.uuid
- picDir = picDir or config.DEFAULT_QR
- qrStorage = io.BytesIO()
- qrCode = QRCode('https://login.weixin.qq.com/l/' + uuid)
- qrCode.png(qrStorage, scale=10)
- if hasattr(qrCallback, '__call__'):
- qrCallback(uuid=uuid, status='0', qrcode=qrStorage.getvalue())
- else:
- with open(picDir, 'wb') as f:
- f.write(qrStorage.getvalue())
- if enableCmdQR:
- utils.print_cmd_qr(qrCode.text(1), enableCmdQR=enableCmdQR)
- else:
- utils.print_qr(picDir)
- return qrStorage
-
-
- def check_login(self, uuid=None):
- uuid = uuid or self.uuid
- url = '%s/cgi-bin/mmwebwx-bin/login' % config.BASE_URL
- localTime = int(time.time())
- params = 'loginicon=true&uuid=%s&tip=1&r=%s&_=%s' % (
- uuid, int(-localTime / 1579), localTime)
- headers = {'User-Agent': config.USER_AGENT}
- r = self.s.get(url, params=params, headers=headers)
- regx = r'window.code=(\d+)'
- data = re.search(regx, r.text)
- if data and data.group(1) == '200':
- if process_login_info(self, r.text):
- return '200'
- else:
- return '400'
- elif data:
- return data.group(1)
- else:
- return '400'
-
-
- def process_login_info(core, loginContent):
- ''' when finish login (scanning qrcode)
- * syncUrl and fileUploadingUrl will be fetched
- * deviceid and msgid will be generated
- * skey, wxsid, wxuin, pass_ticket will be fetched
- '''
- regx = r'window.redirect_uri="(\S+)";'
- core.loginInfo['url'] = re.search(regx, loginContent).group(1)
- headers = {'User-Agent': config.USER_AGENT,
- 'client-version': config.UOS_PATCH_CLIENT_VERSION,
- 'extspam': config.UOS_PATCH_EXTSPAM,
- 'referer': 'https://wx.qq.com/?&lang=zh_CN&target=t'
- }
- r = core.s.get(core.loginInfo['url'],
- headers=headers, allow_redirects=False)
- core.loginInfo['url'] = core.loginInfo['url'][:core.loginInfo['url'].rfind(
- '/')]
- for indexUrl, detailedUrl in (
- ("wx2.qq.com", ("file.wx2.qq.com", "webpush.wx2.qq.com")),
- ("wx8.qq.com", ("file.wx8.qq.com", "webpush.wx8.qq.com")),
- ("qq.com", ("file.wx.qq.com", "webpush.wx.qq.com")),
- ("web2.wechat.com", ("file.web2.wechat.com", "webpush.web2.wechat.com")),
- ("wechat.com", ("file.web.wechat.com", "webpush.web.wechat.com"))):
- fileUrl, syncUrl = ['https://%s/cgi-bin/mmwebwx-bin' %
- url for url in detailedUrl]
- if indexUrl in core.loginInfo['url']:
- core.loginInfo['fileUrl'], core.loginInfo['syncUrl'] = \
- fileUrl, syncUrl
- break
- else:
- core.loginInfo['fileUrl'] = core.loginInfo['syncUrl'] = core.loginInfo['url']
- core.loginInfo['deviceid'] = 'e' + repr(random.random())[2:17]
- core.loginInfo['logintime'] = int(time.time() * 1e3)
- core.loginInfo['BaseRequest'] = {}
- cookies = core.s.cookies.get_dict()
- res = re.findall('<skey>(.*?)</skey>', r.text, re.S)
- skey = res[0] if res else None
- res = re.findall(
- '<pass_ticket>(.*?)</pass_ticket>', r.text, re.S)
- pass_ticket = res[0] if res else None
- if skey is not None:
- core.loginInfo['skey'] = core.loginInfo['BaseRequest']['Skey'] = skey
- core.loginInfo['wxsid'] = core.loginInfo['BaseRequest']['Sid'] = cookies["wxsid"]
- core.loginInfo['wxuin'] = core.loginInfo['BaseRequest']['Uin'] = cookies["wxuin"]
- if pass_ticket is not None:
- core.loginInfo['pass_ticket'] = pass_ticket
- # A question : why pass_ticket == DeviceID ?
- # deviceID is only a randomly generated number
-
- # UOS PATCH By luvletter2333, Sun Feb 28 10:00 PM
- # for node in xml.dom.minidom.parseString(r.text).documentElement.childNodes:
- # if node.nodeName == 'skey':
- # core.loginInfo['skey'] = core.loginInfo['BaseRequest']['Skey'] = node.childNodes[0].data
- # elif node.nodeName == 'wxsid':
- # core.loginInfo['wxsid'] = core.loginInfo['BaseRequest']['Sid'] = node.childNodes[0].data
- # elif node.nodeName == 'wxuin':
- # core.loginInfo['wxuin'] = core.loginInfo['BaseRequest']['Uin'] = node.childNodes[0].data
- # elif node.nodeName == 'pass_ticket':
- # core.loginInfo['pass_ticket'] = core.loginInfo['BaseRequest']['DeviceID'] = node.childNodes[0].data
- if not all([key in core.loginInfo for key in ('skey', 'wxsid', 'wxuin', 'pass_ticket')]):
- logger.error(
- 'Your wechat account may be LIMITED to log in WEB wechat, error info:\n%s' % r.text)
- core.isLogging = False
- return False
- return True
-
-
- def web_init(self):
- url = '%s/webwxinit' % self.loginInfo['url']
- params = {
- 'r': int(-time.time() / 1579),
- 'pass_ticket': self.loginInfo['pass_ticket'], }
- data = {'BaseRequest': self.loginInfo['BaseRequest'], }
- headers = {
- 'ContentType': 'application/json; charset=UTF-8',
- 'User-Agent': config.USER_AGENT, }
- r = self.s.post(url, params=params, data=json.dumps(data), headers=headers)
- dic = json.loads(r.content.decode('utf-8', 'replace'))
- # deal with login info
- utils.emoji_formatter(dic['User'], 'NickName')
- self.loginInfo['InviteStartCount'] = int(dic['InviteStartCount'])
- self.loginInfo['User'] = wrap_user_dict(
- utils.struct_friend_info(dic['User']))
- self.memberList.append(self.loginInfo['User'])
- self.loginInfo['SyncKey'] = dic['SyncKey']
- self.loginInfo['synckey'] = '|'.join(['%s_%s' % (item['Key'], item['Val'])
- for item in dic['SyncKey']['List']])
- self.storageClass.userName = dic['User']['UserName']
- self.storageClass.nickName = dic['User']['NickName']
- # deal with contact list returned when init
- contactList = dic.get('ContactList', [])
- chatroomList, otherList = [], []
- for m in contactList:
- if m['Sex'] != 0:
- otherList.append(m)
- elif '@@' in m['UserName']:
- m['MemberList'] = [] # don't let dirty info pollute the list
- chatroomList.append(m)
- elif '@' in m['UserName']:
- # mp will be dealt in update_local_friends as well
- otherList.append(m)
- if chatroomList:
- update_local_chatrooms(self, chatroomList)
- if otherList:
- update_local_friends(self, otherList)
- return dic
-
-
- def show_mobile_login(self):
- url = '%s/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % (
- self.loginInfo['url'], self.loginInfo['pass_ticket'])
- data = {
- 'BaseRequest': self.loginInfo['BaseRequest'],
- 'Code': 3,
- 'FromUserName': self.storageClass.userName,
- 'ToUserName': self.storageClass.userName,
- 'ClientMsgId': int(time.time()), }
- headers = {
- 'ContentType': 'application/json; charset=UTF-8',
- 'User-Agent': config.USER_AGENT, }
- r = self.s.post(url, data=json.dumps(data), headers=headers)
- return ReturnValue(rawResponse=r)
-
-
- def start_receiving(self, exitCallback=None, getReceivingFnOnly=False):
- self.alive = True
-
- def maintain_loop():
- retryCount = 0
- while self.alive:
- try:
- i = sync_check(self)
- if i is None:
- self.alive = False
- elif i == '0':
- pass
- else:
- msgList, contactList = self.get_msg()
- if msgList:
- msgList = produce_msg(self, msgList)
- for msg in msgList:
- self.msgList.put(msg)
- if contactList:
- chatroomList, otherList = [], []
- for contact in contactList:
- if '@@' in contact['UserName']:
- chatroomList.append(contact)
- else:
- otherList.append(contact)
- chatroomMsg = update_local_chatrooms(
- self, chatroomList)
- chatroomMsg['User'] = self.loginInfo['User']
- self.msgList.put(chatroomMsg)
- update_local_friends(self, otherList)
- retryCount = 0
- except requests.exceptions.ReadTimeout:
- pass
- except:
- retryCount += 1
- logger.error(traceback.format_exc())
- if self.receivingRetryCount < retryCount:
- logger.error("Having tried %s times, but still failed. " % (
- retryCount) + "Stop trying...")
- self.alive = False
- else:
- time.sleep(1)
- self.logout()
- if hasattr(exitCallback, '__call__'):
- exitCallback()
- else:
- logger.info('LOG OUT!')
- if getReceivingFnOnly:
- return maintain_loop
- else:
- maintainThread = threading.Thread(target=maintain_loop)
- maintainThread.setDaemon(True)
- maintainThread.start()
-
-
- def sync_check(self):
- url = '%s/synccheck' % self.loginInfo.get('syncUrl', self.loginInfo['url'])
- params = {
- 'r': int(time.time() * 1000),
- 'skey': self.loginInfo['skey'],
- 'sid': self.loginInfo['wxsid'],
- 'uin': self.loginInfo['wxuin'],
- 'deviceid': self.loginInfo['deviceid'],
- 'synckey': self.loginInfo['synckey'],
- '_': self.loginInfo['logintime'], }
- headers = {'User-Agent': config.USER_AGENT}
- self.loginInfo['logintime'] += 1
- try:
- r = self.s.get(url, params=params, headers=headers,
- timeout=config.TIMEOUT)
- except requests.exceptions.ConnectionError as e:
- try:
- if not isinstance(e.args[0].args[1], BadStatusLine):
- raise
- # will return a package with status '0 -'
- # and value like:
- # 6f:00:8a:9c:09:74:e4:d8:e0:14:bf:96:3a:56:a0:64:1b:a4:25:5d:12:f4:31:a5:30:f1:c6:48:5f:c3:75:6a:99:93
- # seems like status of typing, but before I make further achievement code will remain like this
- return '2'
- except:
- raise
- r.raise_for_status()
- regx = r'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}'
- pm = re.search(regx, r.text)
- if pm is None or pm.group(1) != '0':
- logger.error('Unexpected sync check result: %s' % r.text)
- return None
- return pm.group(2)
-
-
- def get_msg(self):
- self.loginInfo['deviceid'] = 'e' + repr(random.random())[2:17]
- url = '%s/webwxsync?sid=%s&skey=%s&pass_ticket=%s' % (
- self.loginInfo['url'], self.loginInfo['wxsid'],
- self.loginInfo['skey'], self.loginInfo['pass_ticket'])
- data = {
- 'BaseRequest': self.loginInfo['BaseRequest'],
- 'SyncKey': self.loginInfo['SyncKey'],
- 'rr': ~int(time.time()), }
- headers = {
- 'ContentType': 'application/json; charset=UTF-8',
- 'User-Agent': config.USER_AGENT}
- r = self.s.post(url, data=json.dumps(data),
- headers=headers, timeout=config.TIMEOUT)
- dic = json.loads(r.content.decode('utf-8', 'replace'))
- if dic['BaseResponse']['Ret'] != 0:
- return None, None
- self.loginInfo['SyncKey'] = dic['SyncKey']
- self.loginInfo['synckey'] = '|'.join(['%s_%s' % (item['Key'], item['Val'])
- for item in dic['SyncCheckKey']['List']])
- return dic['AddMsgList'], dic['ModContactList']
-
-
- def logout(self):
- if self.alive:
- url = '%s/webwxlogout' % self.loginInfo['url']
- params = {
- 'redirect': 1,
- 'type': 1,
- 'skey': self.loginInfo['skey'], }
- headers = {'User-Agent': config.USER_AGENT}
- self.s.get(url, params=params, headers=headers)
- self.alive = False
- self.isLogging = False
- self.s.cookies.clear()
- del self.chatroomList[:]
- del self.memberList[:]
- del self.mpList[:]
- return ReturnValue({'BaseResponse': {
- 'ErrMsg': 'logout successfully.',
- 'Ret': 0, }})
|