Source code for mirai_core.network

from typing import Dict
import aiohttp
from aiohttp import client_exceptions
from .log import create_logger
from io import BytesIO

from .exceptions import AuthenticationException, NetworkException, ServerException, \
    UnknownTargetException, PrivilegeException, BadRequestException, MiraiException, SessionException


error_code = {
                1: lambda: AuthenticationException('Incorrect authKey'),
                2: lambda: AuthenticationException('Bot does not exist, please login in console'),
                3: lambda: SessionException('Session does not exist or has expired'),
                4: lambda: AuthenticationException('Session is not verified'),
                5: lambda: UnknownTargetException('Message target does not exist'),
                10: lambda: PrivilegeException('Bot does not have corresponding privilege'),
                400: lambda: BadRequestException('Bad Request, please check arguments/url'),
            }


[docs]class HttpClient: """ Internal use only HttpClient implemented by aiohttp """ DEFAULT_TIMEOUT = 5 @staticmethod async def _check_response(result: aiohttp.ClientResponse, url, method) -> Dict: """ Check url response, and raise exceptions :param result: http response :param url: url to show in the log :param method: 'post', 'get' :return: json decoded result """ if result.status != 200: raise ServerException(f'{url} {method} failed, status code: {result.status}') result = await result.json() if not isinstance(result, dict): return result status_code = result.get('code') if method == 'post': if status_code is None: raise ServerException('Empty response') if status_code == 0: # normal return result elif method == 'get': if status_code is None or status_code == 0: return result if status_code in error_code: raise error_code[status_code]() else: raise MiraiException('HTTP API updated, please upgrade python-mirai-core') def __init__(self, base_url: str, timeout=DEFAULT_TIMEOUT, loop=None): self.base_url = base_url self.timeout = aiohttp.ClientTimeout(timeout) self.session = aiohttp.ClientSession(timeout=self.timeout, loop=loop) self.logger = create_logger('Network') self.loop = loop
[docs] async def get(self, url, headers=None, params=None): """ send http get request :param url: the sub url :param headers: request headers :param params: get params :return: json decoded response """ if url != '/fetchMessage': self.logger.debug(f'get {url} with params: {str(params)}') try: response = await self.session.get(self.base_url + url, headers=headers, params=params) except client_exceptions.ClientConnectorError: raise NetworkException('Unable to reach Mirai console') return await HttpClient._check_response(response, url, 'get')
[docs] async def post(self, url, headers=None, data=None): """ send http post request :param url: the sub url :param headers: request headers :param data: post params :return: json decoded response """ self.logger.debug(f'post {url} with data: {str(data)}') try: response = await self.session.post(self.base_url + url, headers=headers, json=data) except client_exceptions.ClientConnectorError: raise NetworkException('Unable to reach Mirai console') return await HttpClient._check_response(response, url, 'post')
[docs] async def upload(self, url, headers=None, data=None, file: str = None): """ upload using multipart upload :param url: the sub url :param headers: request headers :param data: post params :param file: file to attach :return: json decoded response """ if data is None: data = dict() data['img'] = BytesIO(open(str(file.absolute()), 'rb').read()) self.logger.debug(f'upload {url} with file: {file}') try: response = await self.session.post(self.base_url + url, headers=headers, data=data) except client_exceptions.ClientConnectorError: raise NetworkException('Unable to reach Mirai console') self.logger.debug(f'Image uploaded: {response.text}') return await response.json()
[docs] async def websocket(self, url: str, handler: callable, ws_close_handler: callable): """ Create websocket subscriber to url :param url: the sub url :param handler: request headers :param ws_close_handler: callback for connection close """ try: ws = await self.session.ws_connect(self.base_url + url) self.logger.debug('Websocket established') while True: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.TEXT: self.logger.debug(f'Websocket received {msg}') await handler(msg.json()) elif msg.type == aiohttp.WSMsgType.CLOSED: self.logger.debug('Websocket closed') await ws_close_handler() return else: self.logger.warning(f'Received unexpected type: {msg.type}') except client_exceptions.ClientConnectorError: raise NetworkException('Unable to reach Mirai console')
[docs] async def close(self): """ Close session """ await self.session.close()