项目灵感
本项目深受官方Q群机器人以及微信机器人的启发, 但不满足于现有的功能,
因此决定自己动手写一个微信机器人, 以实现更多功能
项目总目标
实现一个微信机器人,
能够实现以下功能:
接收并处理
展开查看
文本消息
系统消息
群聊新成员加入消息
好友验证消息
红包消息
转账消息
<li>图片消息
<li>语音消息
<li>视频消息
<li>文件消息
<li>位置消息
<li>群聊at消息
<li>私聊消息
<li>违禁消息
</ul>
发送
展开查看
文本消息
图片消息
语音消息
视频消息
文件消息
链接消息
群聊at消息
私聊消息
系统
展开查看
群管理系统
积分系统
违禁词管理系统
定时任务系统
插件系统
日志系统
数据统计系统
速率限制系统
热更新系统
GUI系统
项目基础
编程语言: Python 3.11.9
项目开发日志 (Week 1)
项目初始化
刚刚有灵感不久, 我便确定了项目的基础框架, 唯独微信框架拿捏不定,
眼前有数个微信协议可供使用:
网页版协议 (以 itchat 为代表)
iPad协议 (以 Wechaty
为代表)
hook (以 ntchat 和
wcferry
为代表)
综合分析之后,
我发现itchat 以及其所属的网页版微信协议已经不再维护,
较晚注册的微信账号也无法登录网页版; Wechaty 功能很全,
但需要购买Token, 仅有基础功能是开源的;
ntchat 也早在3年前也已经停止维护, 这些不符合项目需求,
因此我起初选择了近期仍在更新的wcferry 作为我的微信协议,
并参照其demo进行了从0的开发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 项目结构 (以后的日志中仅贴出做修改的结构部分) LegendWechatBot ├─ config # 机器人主设置 │ ├─ config.py │ └─ config.yaml ├─ README.md ├─ main.py # 主启动程序 ├─ requirements.txt ├─ robot.py # 机器人核心程序 ├─ try.py # (非必要) 用来离线测试 └─ utils ├─ asyncEnsure.py # 消息发送频率限制 ├─ changeHandler.py # 热更新 ├─ decorators.py # 插件处理消息类型装饰器 ├─ dfa.py # 敏感词模块 ├─ eventManager.py # 事件管理器 ├─ LegendBot.py # 机器人核心类 ├─ LegendMsg.py # 消息归一化 ├─ logger.py # 日志模块 ├─ plugin.py # 插件模块 └─ singleton.py # 单例归一化
难点&技术突破
1. 热更新
由于我需要频繁地修改插件, 因此我需要实现热更新,
以便在修改插件后无需重启机器人即可生效
文件改变检测使用了 watchdog 库,
在检测到文件改变后, 会调用重启回调函数, 重启程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from watchdog.events import FileSystemEventHandler, FileSystemEventclass ConfigChangeHandler (FileSystemEventHandler ): def __init__ (self, restartCallback = restartProgram ): '''初始化配置文件变化处理器 :param restartCallback: 重启回调函数, 默认为None, 即不执行任何操作 ''' self .restartCallback = restartCallback self .lastTriggered = 0 self .cooldown = 2 self .waiting = False def onModified (self, event: FileSystemEvent ): if not event.is_directory: currentTime = time.time() if currentTime - self .last_triggered < self .cooldown: return filePath = Path(event.src_path).resolve() if (filePath.name == "config.yaml" or "plugins" in str (filePath) and filePath.suffix in ['.py' , '.yaml' ]): logger.info(f"检测到文件变化: {filePath} " ) self .last_triggered = currentTime if self .waiting: logger.info("检测到文件改变,正在重启..." ) self .waiting = False self .restartCallback()
重启程序方法
1 2 3 4 5 6 7 8 9 10 11 12 13 def restartProgram (observer=None ): """重启程序, 别处也可调用""" logger.info("正在重启程序..." ) if observer: observer.stop() try : import multiprocessing.resource_tracker multiprocessing.resource_tracker._resource_tracker.clear() except Exception as e: logger.warning(f"清理资源时出错: {e} " ) os.execv(sys.executable, [sys.executable] + sys.argv)
2. 插件管理
由于机器人项目中几乎所有功能都是插件化实现的,
因此我需要先实现插件管理功能
插件管理分为如下几个功能模块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class PluginBase (ABC ): """插件基类""" description: str = "暂无描述" author: str = "未知" version: str = "1.0.0" def __init__ (self ): self .enabled = False self ._scheduled_jobs = set () async def on_enable (self, bot=None ): """插件启用时调用""" for method_name in dir (self ): method = getattr (self , method_name) if hasattr (method, '_is_scheduled' ): job_id = getattr (method, '_job_id' ) trigger = getattr (method, '_schedule_trigger' ) trigger_args = getattr (method, '_schedule_args' ) add_job_safe(scheduler, job_id, method, bot, trigger, **trigger_args) self ._scheduled_jobs.add(job_id) if self ._scheduled_jobs: logger.success(f"插件 {self.__class__.__name__} 已加载定时任务: {self._scheduled_jobs} " ) async def on_disable (self ): """插件禁用时调用""" for job_id in self ._scheduled_jobs: remove_job_safe(scheduler, job_id) logger.info("已卸载定时任务: {}" , self ._scheduled_jobs) self ._scheduled_jobs.clear() async def async_init (self ): """插件异步初始化""" return
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 class PluginManager : def __init__ (self ): self .plugins: Dict [str , PluginBase] = {} self .plugin_classes: Dict [str , Type [PluginBase]] = {} self .plugin_info: Dict [str , dict ] = {} self .excluded_plugins = config.RobotConfig["disabledPlugins" ] async def load_plugin (self, bot: WechatAPIClient, plugin_class: Type [PluginBase], is_disabled: bool = False ) -> bool : """加载单个插件, 接受Type[PluginBase]""" try : plugin_name = plugin_class.__name__ if plugin_name in self .plugins: return False self .plugin_info[plugin_name] = { "name" : plugin_name, "description" : plugin_class.description, "author" : plugin_class.author, "version" : plugin_class.version, "enabled" : False , "class" : plugin_class } if is_disabled: return False plugin = plugin_class() EventManager.bind_instance(plugin) await plugin.on_enable(bot) await plugin.async_init() self .plugins[plugin_name] = plugin self .plugin_classes[plugin_name] = plugin_class self .plugin_info[plugin_name]["enabled" ] = True return True except : logger.error(f"加载插件时发生错误: {traceback.format_exc()} " ) return False async def unload_plugin (self, plugin_name: str ) -> bool : """卸载单个插件""" if plugin_name not in self .plugins: return False if plugin_name == "ManagePlugin" : logger.warning("ManagePlugin 不能被卸载" ) return False try : plugin = self .plugins[plugin_name] await plugin.on_disable() EventManager.unbind_instance(plugin) del self .plugins[plugin_name] del self .plugin_classes[plugin_name] if plugin_name in self .plugin_info.keys(): self .plugin_info[plugin_name]["enabled" ] = False return True except : logger.error(f"卸载插件 {plugin_name} 时发生错误: {traceback.format_exc()} " ) return False async def unload_all_plugins (self ) -> tuple [List [str ], List [str ]]: """卸载所有插件""" unloaded_plugins = [] failed_unloads = [] for plugin_name in list (self .plugins.keys()): if await self .unload_plugin(plugin_name): unloaded_plugins.append(plugin_name) else : failed_unloads.append(plugin_name) return unloaded_plugins, failed_unloads async def reload_plugin (self, bot: WechatAPIClient, plugin_name: str ) -> bool : """重载单个插件""" if plugin_name not in self .plugin_classes: return False if plugin_name == "ManagePlugin" : logger.warning("ManagePlugin 不能被重载" ) return False try : plugin_class = self .plugin_classes[plugin_name] module_name = plugin_class.__module__ if not await self .unload_plugin(plugin_name): return False module = importlib.import_module(module_name) importlib.reload(module) for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass (obj, PluginBase) and obj != PluginBase and obj.__name__ == plugin_name): return await self .load_plugin(bot, obj) return False except Exception as e: logger.error(f"重载插件 {plugin_name} 时发生错误: {e} " ) return False async def reload_all_plugins (self, bot: WechatAPIClient ) -> List [str ]: """重载所有插件 Returns: List[str]: 成功重载的插件名称列表 """ try : original_plugins = [name for name in self .plugins.keys() if name != "ManagePlugin" ] for plugin_name in original_plugins: await self .unload_plugin(plugin_name) for module_name in list (sys.modules.keys()): if module_name.startswith('plugins.' ) and not module_name.endswith('ManagePlugin' ): del sys.modules[module_name] return await self .load_plugins_from_directory(bot) except : logger.error(f"重载所有插件时发生错误: {traceback.format_exc()} " ) return []
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 async def load_plugins_from_directory (self, bot: WechatAPIClient, load_disabled_plugin: bool = True ) -> Union [List [str ], bool ]: """从plugins目录批量加载插件""" loaded_plugins = [] for dirname in os.listdir("plugins" ): if os.path.isdir(f"plugins/{dirname} " ) and os.path.exists(f"plugins/{dirname} /main.py" ): try : module = importlib.import_module(f"plugins.{dirname} .main" ) for name, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass (obj, PluginBase) and obj != PluginBase: is_disabled = False if not load_disabled_plugin: is_disabled = obj.__name__ in self .excluded_plugins if await self .load_plugin(bot, obj, is_disabled=is_disabled): loaded_plugins.append(obj.__name__) except : logger.error(f"加载 {dirname} 时发生错误: {traceback.format_exc()} " ) return False return loaded_plugins async def load_plugin_from_directory (self, bot: WechatAPIClient, plugin_name: str ) -> bool : """从plugins目录加载单个插件 Args: bot: 机器人实例 plugin_name: 插件类名称(不是文件名) Returns: bool: 是否成功加载插件 """ found = False for dirname in os.listdir("plugins" ): try : if os.path.isdir(f"plugins/{dirname} " ) and os.path.exists(f"plugins/{dirname} /main.py" ): module = importlib.import_module(f"plugins.{dirname} .main" ) importlib.reload(module) for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass (obj, PluginBase) and obj != PluginBase and obj.__name__ == plugin_name): found = True return await self .load_plugin(bot, obj) except : logger.error(f"检查 {dirname} 时发生错误: {traceback.format_exc()} " ) continue if not found: logger.warning(f"未找到插件类 {plugin_name} " ) return False
1 2 3 4 5 6 7 8 9 10 11 12 def get_plugin_info (self, plugin_name: str = None ) -> Union [dict , List [dict ]]: """获取插件信息 Args: plugin_name: 插件名称, 如果为None则返回所有插件信息 Returns: 如果指定插件名,返回单个插件信息字典;否则返回所有插件信息列表 """ if plugin_name: return self .plugin_info.get(plugin_name) return list (self .plugin_info.values())
(todo) 插件依赖获取
(todo) 插件设置GUI
3. 定时任务
在微信机器人中, 有很多任务都需要定时执行, 比如每天定时发送报纸,
清空用户签到状态等 因此需要封装一个类让插件方便地规定定时任务,
在此项目中, 我选择了APScheduler 和装饰器来实现 >
APScheduler有一个特色表达式: Cron, 具体语法和用法请参阅 cron表达式
定时任务增删 1 2 3 4 5 6 7 8 9 10 11 12 13 14 def add_job_safe (scheduler: AsyncIOScheduler, job_id: str , func: Callable , bot, trigger: Union [str , CronTrigger, IntervalTrigger], **trigger_args ): """添加函数到定时任务中,如果存在则先删除现有的任务""" try : scheduler.remove_job(job_id) except : pass scheduler.add_job(func, trigger, args=[bot], id =job_id, **trigger_args) def remove_job_safe (scheduler: AsyncIOScheduler, job_id: str ): """从定时任务中移除任务""" try : scheduler.remove_job(job_id) except : pass
定时任务装饰器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import AsyncIOSchedulerfrom apscheduler.triggers.cron import CronTriggerfrom apscheduler.triggers.interval import IntervalTriggerscheduler = AsyncIOScheduler() def schedule (trigger: Union [str , CronTrigger, IntervalTrigger], **trigger_args ) -> Callable : """ 定时任务装饰器 例子: - @schedule('interval', seconds=30) - @schedule('cron', hour=8, minute=30, second=30) - @schedule('date', run_date='2024-01-01 00:00:00') """ def decorator (func: Callable ): job_id = f"{func.__module__} .{func.__qualname__} " @wraps(func ) async def wrapper (self, *args, **kwargs ): return await func(self , *args, **kwargs) setattr (wrapper, '_is_scheduled' , True ) setattr (wrapper, '_schedule_trigger' , trigger) setattr (wrapper, '_schedule_args' , trigger_args) setattr (wrapper, '_job_id' , job_id) return wrapper return decorator
4. 事件映射
在插件化程序中, 事件处理是核心部分, 需要定义一个类来管理事件,
并将事件映射到插件中
消息处理装饰器 >装饰器可以用来给函数添加额外的功能,
在LegendWeChatBot中, 使用装饰器来规范并简化插件处理消息的逻辑
在装饰器中添加了优先级和时间类型, 用来对不同类型的事件进行排序和筛选
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def on_message (priority=50 ): """消息装饰器, 具体消息参见具体文件""" def decorator (func ): if callable (priority): f = priority setattr (f, '_event_type' , 'message' ) setattr (f, '_priority' , 50 ) return f setattr (func, '_event_type' , 'other_message' ) setattr (func, '_priority' , min (max (priority, 0 ), 99 )) return func return decorator if not callable (priority) else decorator(priority) '''demo''' @on_message async def demo (bot, message ): ...
事件映射管理 >缺对象吗? 来Python这儿找一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import copyfrom typing import Callable , Dict , List class EventManager : _handlers: Dict [str , List [tuple [Callable , object , int ]]] = {} @classmethod def bind_instance (cls, instance: object ): """将实例绑定到对应的事件处理函数""" for method_name in dir (instance): method = getattr (instance, method_name) if hasattr (method, '_event_type' ): event_type = getattr (method, '_event_type' ) priority = getattr (method, '_priority' , 50 ) if event_type not in cls._handlers: cls._handlers[event_type] = [] cls._handlers[event_type].append((method, instance, priority)) cls._handlers[event_type].sort(key=lambda x: x[2 ], reverse=True ) @classmethod async def emit (cls, event_type: str , *args, **kwargs ) -> None : """触发事件""" if event_type not in cls._handlers: return api_client, message = args for handler, instance, priority in cls._handlers[event_type]: handler_args = (api_client, copy.deepcopy(message)) new_kwargs = {k: copy.deepcopy(v) for k, v in kwargs.items()} result = await handler(*handler_args, **new_kwargs) if isinstance (result, bool ): if not result: break else : continue @classmethod def unbind_instance (cls, instance: object ): """解绑实例的所有事件处理函数""" for event_type in cls._handlers: cls._handlers[event_type] = [ (handler, inst, priority) for handler, inst, priority in cls._handlers[event_type] if inst is not instance ]
定时任务管理
>定时任务管理器,用于管理插件中需要实现的定时任务,包括添加、删除、执行任务等操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def schedule (trigger: Union [str , CronTrigger, IntervalTrigger], **trigger_args ) -> Callable : """ 定时任务装饰器 例子: - @schedule('interval', seconds=30) - @schedule('cron', hour=8, minute=30, second=30) - @schedule('date', run_date='2024-01-01 00:00:00') """ def decorator (func: Callable ): job_id = f"{func.__module__} .{func.__qualname__} " @wraps(func ) async def wrapper (self, *args, **kwargs ): return await func(self , *args, **kwargs) setattr (wrapper, '_is_scheduled' , True ) setattr (wrapper, '_schedule_trigger' , trigger) setattr (wrapper, '_schedule_args' , trigger_args) setattr (wrapper, '_job_id' , job_id) return wrapper return decorator def add_job_safe (scheduler: AsyncIOScheduler, job_id: str , func: Callable , bot, trigger: Union [str , CronTrigger, intervalTrigger], **trigger_args ): """添加函数到定时任务中,如果存在则先删除现有的任务""" try : scheduler.remove_job(job_id) except : pass scheduler.add_job(func, trigger, args=[bot], id =job_id, **trigger_args) def remove_job_safe (scheduler: AsyncIOScheduler, job_id: str ): """从定时任务中移除任务""" try : scheduler.remove_job(job_id) except : pass
todo list
整理微信数据库操作模块
初步添加功能
初步制定命令格式
完善开发和使用文档
项目已开源至 Github
,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便,
欢迎来 爱发电 赞助
如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者