Source code for mirai_core.bot

from typing import Union, List, Type, Dict
from datetime import timedelta
from pathlib import Path
from pydantic import parse_obj_as
import json
from functools import wraps
from .log import create_logger

from .models.Types import NewFriendRequestResponse, MemberJoinRequestResponse
from .models.Message import BotMessage, MessageChain, \
    Source, Image, Quote, Plain, BaseMessageComponent, FlashImage, At
from .models.Event import *
from .models.Entity import Friend, Group, GroupSetting, Member, MemberChangeableSetting
from .network import HttpClient
from .exceptions import AuthenticationException, MiraiException, NetworkException, SessionException

__ALL__ = [
    'Bot'
]


[docs]def retry_once(func): @wraps(func) async def wrapper(self, *args, **kwargs): try: return await func(self, *args, **kwargs) except (NetworkException, SessionException, AuthenticationException): self.logger.exception('Trying handshake due to the following exception') try: await self.handshake() return await func(self, *args, **kwargs) except (NetworkException, SessionException, AuthenticationException): self.logger.exception('Unable to handshake') return None return wrapper
[docs]class Bot: """ See https://github.com/mamoe/mirai-api-http for details """ def __init__(self, qq: int, host: str = '127.0.0.1', port: int = 8080, auth_key: str = 'abcdefgh', loop=None): self.qq = qq self.auth_key = auth_key self.base_url = f'http://{host}:{port}' self.loop = loop self.session = HttpClient(self.base_url, loop=self.loop) self.session_key = '' self.logger = create_logger('Bot')
[docs] async def handshake(self): """ Authenticate and verify the session_key Automatically called if session_key needs to be updated """ await self.auth() await self.verify()
[docs] async def auth(self) -> None: """ Post auth_key, and get session_key """ result = await self.session.post('/auth', data={'authKey': self.auth_key}) self.session_key = result.get('session')
[docs] async def verify(self) -> None: """ Post session_key to verify the session """ await self.session.post('/verify', data={ 'sessionKey': self.session_key, 'qq': self.qq })
[docs] async def release(self) -> None: """ Post session_key to release the session Needs to be called manually if Updater is not used """ await self.session.post('/release', data={ 'sessionKey': self.session_key, 'qq': self.qq })
@staticmethod def _handle_target_as(target: Union[Group, Friend, Member, int]): """ Internal use only, convert target to id :param target: Union[Group, Friend, Member, int] :return: id, int """ if isinstance(target, int): return target else: try: return target.id except: raise ValueError(f'target does not contain id attribute')
[docs] @retry_once async def send_message(self, target: Union[Friend, Member, Group, int], message_type: MessageType, message: Union[ MessageChain, BaseMessageComponent, List[BaseMessageComponent], str ] = '', temp_group: Optional[int] = None, quote_source: Union[int, Source] = None ) -> BotMessage: """ Send Group/Friend message, only keyword arguments are allowed Image ID is available in returned message if uploaded via file path :param target: Group, Member, Friend, int :param message_type: ChatType, specify the type of target :param temp_group: If message_type is Member and target is int, then temp group must be specified :param message: MessageChain, BaseMessageComponent, List of BaseMessageComponent or str, the content to send :param quote_source: int (the 64-bit int) or Source, the message to quote The purpose of this argument is to save image ids for future use. :return: BotMessage (contains message id) """ data = { 'sessionKey': self.session_key, } if message_type == MessageType.Friend: portal = '/sendFriendMessage' data['target'] = self._handle_target_as(target) elif message_type == MessageType.Temp: portal = '/sendTempMessage' if isinstance(target, int): data['qq'] = target if not isinstance(temp_group, int): raise ValueError('temp group must be specified if target is not Member type') data['group'] = temp_group else: data['group'] = target.group.id data['qq'] = target.id elif message_type == MessageType.Group: portal = '/sendGroupMessage' data['target'] = self._handle_target_as(target) else: raise ValueError('One of friend, member and group must not be empty') message_chain = await self._handle_message_chain(message, message_type) data['messageChain'] = message_chain if quote_source: if isinstance(quote_source, int): data['quote'] = quote_source elif isinstance(quote_source, Source): data['quote'] = quote_source.id result = await self.session.post(portal, data=data) bot_message = BotMessage.parse_obj(result) return bot_message
[docs] @retry_once async def recall(self, source: Union[Source, int]) -> None: """ Recall a message Success if no exception is raised :param source: int (the 64-bit int) or Source """ data = { 'sessionKey': self.session_key, } if isinstance(source, int): data['target'] = source elif isinstance(source, Source): data['target'] = source.id else: raise MiraiException('Invalid source argument') await self.session.post('/recall', data=data)
@property @retry_once async def groups(self) -> List[Group]: """ Get list of joined groups :return: List of Group """ params = { 'sessionKey': self.session_key, } result = await self.session.get('/groupList', params=params) return [Group.parse_obj(group_info) for group_info in result] @property @retry_once async def friends(self) -> List[Friend]: """ Get list of friends :return: List of Friend """ params = { 'sessionKey': self.session_key, } result = await self.session.get('/friendList', params=params) return [Friend.parse_obj(friend_info) for friend_info in result]
[docs] @retry_once async def get_members(self, target: Union[Group, int]) -> List[Member]: """ Get list of members of a group :param target: int or Group, the target group :return: List of Member """ if isinstance(target, int): group = target else: group = target.id params = { 'sessionKey': self.session_key, 'target': group } result = await self.session.get('/memberList', params=params) return [Member.parse_obj(member_info) for member_info in result]
[docs] @retry_once async def upload_image(self, message_type: MessageType, image_path: Union[Path, str]) -> Optional[Image]: """ Deprecated Upload a image to QQ server. The image between group and friend is not exchangeable This function can be called separately to acquire image uuids, or automatically if using LocalImage while sending :param message_type: MessageType, Friend, Group or Temp :param image_path: absolute path of the image :return: Image object """ if isinstance(image_path, str): image_path = Path(image_path) if not image_path.exists(): raise FileNotFoundError('Image not found.') data = { 'sessionKey': self.session_key, 'type': message_type.value } result = await self.session.upload('/uploadImage', file=image_path, data=data) return Image.parse_obj(result)
[docs] @retry_once async def fetch_message(self, count: int) -> List[BaseEvent]: """ Deprecated Fetch a list of messages This function is called automatically if using polling instead of websocket :param count: maximum count of one fetch :return: List of BaseEvent """ params = { 'sessionKey': self.session_key, 'count': count } result = await self.session.get('/fetchMessage', params=params) try: for index in range(len(result)): result[index] = self._parse_event(result[index]) except: self.logger.exception('Unhandled exception') return result
[docs] @retry_once async def mute_all(self, group: Union[Group, int]) -> None: """ Mute all non admin members in group Success if no exception is raised :param group: int or Group, the target group """ params = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group) } await self.session.get('/muteAll', params=params)
[docs] @retry_once async def unmute_all(self, group: Union[Group, int]) -> None: """ Unmute all non admin members in group Success if no exception is raised :param group: int or Group, the target group """ params = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group) } await self.session.get('/unmuteAll', params=params)
[docs] @retry_once async def get_member_info(self, group: Union[Group, int], member: Union[Member, int]) -> MemberChangeableSetting: """ Get the info of a member :param group: int or Group, target group :param member: int or Member, target member :return: MemberChangeableSetting """ params = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'memberId': self._handle_target_as(target=member) } result = await self.session.get('/memberInfo', params=params) return MemberChangeableSetting.parse_obj(result)
[docs] @retry_once async def get_bot_member_info(self, group: Union[Group, int]) -> MemberChangeableSetting: """ Get the info of this bot :param group: int or Group, target group :return: MemberChangeableSetting """ return await self.get_member_info(group, self.qq)
[docs] @retry_once async def set_member_info(self, group: Union[Group, int], member: Union[Member, int], setting: MemberChangeableSetting) -> None: """ Set the info of a member Success if no exception is raised :param group: int or Group, target group :param member: int or Member, target member :param setting: MemberChangeableSetting, the new settings """ data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'memberId': self._handle_target_as(target=member), 'info': json.loads(setting.json(ensure_ascii=False)) } await self.session.post('/memberInfo', data=data)
[docs] @retry_once async def get_group_config(self, group: Union[Group, int]) -> GroupSetting: """ Get the group config of a group :param group: int or Group, target group :return: GroupSetting """ params = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), } result = await self.session.get('/groupConfig', params=params) return GroupSetting.parse_obj(result)
[docs] @retry_once async def set_group_config(self, group: Union[Group, int], config: GroupSetting) -> None: """ Set the group config of a group Success if no exception is raised :param group: int or Group, target group :param config: GroupSetting """ data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'config': json.loads(config.json(ensure_ascii=False)) } await self.session.post('/groupConfig', data=data)
[docs] @retry_once async def mute(self, group: Union[Group, int], member: Union[Member, int], time: Union[timedelta, int]) -> None: """ Mute a member of a group Success if no exception is raised :param group: int or Group, target group :param member: int or Member, target member :param time: int or datetime.timedelta, must between 1 minutes and 30 days """ if isinstance(time, timedelta): time = int(time.total_seconds()) time = min(86400 * 30, max(60, time)) # time should between 1 minutes and 30 days data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'MemberId': Bot._handle_target_as(target=member), 'time': time } await self.session.post('/mute', data=data)
[docs] @retry_once async def unmute(self, group: Union[Group, int], member: Union[Member, int]) -> None: """ Unmute a member of a group Success if no exception is raised :param group: int or Group, target group :param member: int or Member, target member """ data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'MemberId': Bot._handle_target_as(target=member) } await self.session.post('/unmute', data=data)
[docs] @retry_once async def kick(self, group: Union[Group, int], member: Union[Member, int], message: str = '') -> None: """ Kick a member of a group :param group: int or Group, target group :param member: int or Member, target member :param message: string, message to the member """ data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group), 'MemberId': Bot._handle_target_as(target=member) } if message: data['msg'] = message await self.session.post('/kick', data=data)
[docs] @retry_once async def quit(self, group: Union[Group, int]): """ Quit a group :param group: int or Group, target group """ data = { 'sessionKey': self.session_key, 'target': Bot._handle_target_as(target=group) } await self.session.post('/quit', data=data)
[docs] @retry_once async def respond_request(self, request: Union[NewFriendRequestEvent, MemberJoinRequestEvent], response: Union[NewFriendRequestResponse, MemberJoinRequestResponse], message: str = ''): """ Respond NewFriendRequestEvent and MemberJoinRequestEvent :param request: NewFriendRequestEvent or MemberJoinRequestEvent :param response: NewFriendRequestResponse or MemberJoinRequestResponse :param message: text message for the response """ if isinstance(request, NewFriendRequestEvent): assert isinstance(response, (NewFriendRequestResponse, int)), f'Response type mismatch' response = response.value if isinstance(response, NewFriendRequestResponse) else response data = { 'sessionKey': self.session_key, 'eventId': request.requestId, 'fromId': request.supplicant, 'groupId': request.sourceGroup, 'operate': response, 'message': message } return await self.session.post('/resp/newFriendRequestEvent', data=data) elif isinstance(request, MemberJoinRequestEvent): assert isinstance(response, (MemberJoinRequestResponse, int)), f'Response type mismatch' response = response.value if isinstance(response, MemberJoinRequestResponse) else response data = { 'sessionKey': self.session_key, 'eventId': request.requestId, 'fromId': request.supplicant, 'groupId': request.sourceGroup, 'operate': response, 'message': message } return await self.session.post('/resp/memberJoinRequestEvent', data=data) else: raise TypeError(f'Unsupported event: {str(request)}')
async def _handle_message_component(self, message_component: BaseMessageComponent, message_type: MessageType) -> BaseMessageComponent: """ Internal use only Upload Image and get uuid for the image (only if the image is uploaded by path) :param message_component: BaseMessageComponent :param message_type: ImageType :return: BaseMessageComponent """ if not isinstance(message_component, (Image, FlashImage)): return message_component if message_component.imageId or message_component.url: return message_component image = await self.upload_image(message_type, message_component.path) message_component.imageId = image.imageId return message_component async def _handle_message_chain(self, message: Union[ MessageChain, BaseMessageComponent, List[BaseMessageComponent], str ], message_type: MessageType) -> MessageChain: """ Internal use only Convert MessageChain to json :param message: MessageChain :param message_type: the target chat type (to determine image upload args) :return: list """ if isinstance(message, MessageChain): return message elif isinstance(message, str): return MessageChain.parse_obj([Plain(text=message)]) elif isinstance(message, (BaseMessageComponent, tuple, list)): if isinstance(message, BaseMessageComponent): message = [message] return MessageChain.parse_obj([await self._handle_message_component(m, message_type=message_type) for m in message]) else: raise ValueError('Invalid message')
[docs] @retry_once async def get_config(self) -> dict: """ Get the config of http api :return: config """ params = { 'sessionKey': self.session_key } result = await self.session.get('/config', params=params) return result
[docs] @retry_once async def set_config(self, cache_size: int = 4096, enable_websocket: bool = True) -> None: """ Set the config of http api Success if no exception is raised :param cache_size: int, the size of message cache :param enable_websocket: bool, whether to enable websocket (will disable fetch_message accordingly) """ data = { 'sessionKey': self.session_key, 'cacheSize': cache_size, 'enableWebsocket': enable_websocket } await self.session.post('/config', data=data)
def _parse_event(self, result) -> BaseEvent: """ Internal use only Parse event or message from json to BaseEvent :param result: the json :return: BaseEvent """ try: result = parse_obj_as(Events, result) if isinstance(result, Message): # construct message chain # parse quote first if len(result.messageChain) > 2: first_component = result.messageChain[1] if isinstance(first_component, Quote): # FIXME: add the first two message part back try: if isinstance(result.messageChain[2], At): del result.messageChain[2] # delete duplicated at if len(result.messageChain) > 2: if isinstance(result.messageChain[2], Plain) and result.messageChain[2].text == ' ': del result.messageChain[2] # delete space after duplicated at except: self.logger.exception('Please open a github issue to report this error') except: self.logger.exception('Unhandled exception') return result def _websocket_handler(self, handler: callable) -> callable: """ Internal use only Wrap the handler, and convert json to BaseEvent :param handler: callable, the handler :return: wrapped handler """ async def _handler(result: Union[List, Dict]): """ an example handler for create_websocket :param result: json """ result = self._parse_event(result) await handler(result) return _handler
[docs] @retry_once async def create_websocket(self, handler, ws_close_handler=None, listen: str = 'all') -> None: """ Register callback for websocket. Once an BaseEvent or Message is received, the handler will be invoked :param handler: callable :param ws_close_handler: callable, websocket shutdown hook :param listen: 'all', 'event' or 'message' """ if listen not in ('all', 'event', 'message'): raise ValueError("listen must be one of 'all', 'event' or 'message'") if ws_close_handler is None: async def ws_close_handler(event): pass await self.session.websocket(f'/{listen}?sessionKey={self.session_key}', self._websocket_handler(handler), ws_close_handler)