LegendWechatBot 项目进程 Week1(2025-03-01 ~ 2025-03-09)

项目灵感

  • 本项目深受官方Q群机器人以及微信机器人的启发, 但不满足于现有的功能, 因此决定自己动手写一个微信机器人, 以实现更多功能

项目总目标

实现一个微信机器人, 能够实现以下功能:

  • 接收并处理
    展开查看
    • 文本消息
    • 系统消息
      • 群聊新成员加入消息
      • 好友验证消息
      • 红包消息
      • 转账消息
    •       <li>图片消息
            <li>语音消息
            <li>视频消息
            <li>文件消息
            <li>位置消息
            <li>群聊at消息
            <li>私聊消息
            <li>违禁消息
        </ul>
  • 发送
    展开查看
    • 文本消息
    • 图片消息
    • 语音消息
    • 视频消息
    • 文件消息
    • 链接消息
    • 群聊at消息
    • 私聊消息
  • 系统
    展开查看
    • 群管理系统
    • 积分系统
    • 违禁词管理系统
    • 定时任务系统
    • 插件系统
    • 日志系统
    • 数据统计系统
    • 速率限制系统
    • 热更新系统
    • GUI系统

项目基础

编程语言: Python 3.11.9

微信调用框架: WCFerry

数据库: SQLite

日志集成: loguru

定时任务框架: APScheduler

项目开发日志 (Week 1)

项目初始化

刚刚有灵感不久, 我便确定了项目的基础框架, 唯独微信框架拿捏不定, 眼前有数个微信协议可供使用:

网页版协议 (以 itchat 为代表)

iPad协议 (以 Wechaty 为代表)

hook (以 ntchatwcferry 为代表)

综合分析之后, 我发现itchat以及其所属的网页版微信协议已经不再维护, 较晚注册的微信账号也无法登录网页版; Wechaty功能很全, 但需要购买Token, 仅有基础功能是开源的; ntchat也早在3年前也已经停止维护, 这些不符合项目需求, 因此我起初选择了近期仍在更新的wcferry作为我的微信协议, 并参照其demo进行了从0的开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
项目结构 (以后的日志中仅贴出做修改的结构部分)
LegendWechatBot
├─ config # 机器人主设置
│ ├─ config.py
│ └─ config.yaml
├─ README.md
├─ main.py # 主启动程序
├─ requirements.txt
├─ robot.py # 机器人核心程序
├─ try.py # (非必要) 用来离线测试
└─ utils
├─ asyncEnsure.py # 消息发送频率限制
├─ changeHandler.py # 热更新
├─ decorators.py # 插件处理消息类型装饰器
├─ dfa.py # 敏感词模块
├─ eventManager.py # 事件管理器
├─ LegendBot.py # 机器人核心类
├─ LegendMsg.py # 消息归一化
├─ logger.py # 日志模块
├─ plugin.py # 插件模块
└─ singleton.py # 单例归一化

难点&技术突破

1. 热更新

由于我需要频繁地修改插件, 因此我需要实现热更新, 以便在修改插件后无需重启机器人即可生效

文件改变检测使用了 watchdog 库, 在检测到文件改变后, 会调用重启回调函数, 重启程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#! utils/changeHandler.py
from watchdog.events import FileSystemEventHandler, FileSystemEvent

# 封装检测文件改变并判断是否重启的类
class ConfigChangeHandler(FileSystemEventHandler):
def __init__(self, restartCallback = restartProgram):
'''初始化配置文件变化处理器
:param restartCallback: 重启回调函数, 默认为None, 即不执行任何操作
'''
self.restartCallback = restartCallback
self.lastTriggered = 0
self.cooldown = 2 # 冷却时间(秒)
# 当发生错误时才会开始等待
self.waiting = False # 是否在等待文件改变

def onModified(self, event: FileSystemEvent):
if not event.is_directory:
# 判断是否在冷却时间内(防止抽风)
currentTime = time.time()
if currentTime - self.last_triggered < self.cooldown:
return

filePath = Path(event.src_path).resolve()
if (filePath.name == "config.yaml" or
"plugins" in str(filePath) and filePath.suffix in ['.py', '.yaml']):
logger.info(f"检测到文件变化: {filePath}")
self.last_triggered = currentTime
if self.waiting:
logger.info("检测到文件改变,正在重启...")
self.waiting = False
self.restartCallback()

重启程序方法

1
2
3
4
5
6
7
8
9
10
11
12
13
def restartProgram(observer=None):
"""重启程序, 别处也可调用"""
logger.info("正在重启程序...")
# 清理资源
if observer: # observer是watchdog中的对象
observer.stop()
try:
import multiprocessing.resource_tracker
multiprocessing.resource_tracker._resource_tracker.clear()
except Exception as e:
logger.warning(f"清理资源时出错: {e}")
# 重启程序
os.execv(sys.executable, [sys.executable] + sys.argv)

2. 插件管理

由于机器人项目中几乎所有功能都是插件化实现的, 因此我需要先实现插件管理功能

插件管理分为如下几个功能模块:

  • 插件基类 & 插件启用/禁用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#! utils/plugin.py
class PluginBase(ABC):
"""插件基类"""

# 插件元数据
description: str = "暂无描述"
author: str = "未知"
version: str = "1.0.0"

def __init__(self):
self.enabled = False
self._scheduled_jobs = set()

async def on_enable(self, bot=None):
"""插件启用时调用"""

# 定时任务
for method_name in dir(self):
method = getattr(self, method_name)
if hasattr(method, '_is_scheduled'):
job_id = getattr(method, '_job_id')
trigger = getattr(method, '_schedule_trigger')
trigger_args = getattr(method, '_schedule_args')

add_job_safe(scheduler, job_id, method, bot, trigger, **trigger_args)
self._scheduled_jobs.add(job_id)
if self._scheduled_jobs:
logger.success(f"插件 {self.__class__.__name__} 已加载定时任务: {self._scheduled_jobs}")

async def on_disable(self):
"""插件禁用时调用"""

# 移除定时任务
for job_id in self._scheduled_jobs:
remove_job_safe(scheduler, job_id)
logger.info("已卸载定时任务: {}", self._scheduled_jobs)
self._scheduled_jobs.clear()

async def async_init(self):
"""插件异步初始化"""
return
  • 插件(批量)加载/卸载/重载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class PluginManager:
def __init__(self):
self.plugins: Dict[str, PluginBase] = {}
self.plugin_classes: Dict[str, Type[PluginBase]] = {}
self.plugin_info: Dict[str, dict] = {} # 新增:存储所有插件信息

self.excluded_plugins = config.RobotConfig["disabledPlugins"]

async def load_plugin(self, bot: WechatAPIClient, plugin_class: Type[PluginBase], is_disabled: bool = False) -> bool:
"""加载单个插件, 接受Type[PluginBase]"""
try:
plugin_name = plugin_class.__name__

# 防止重复加载插件
if plugin_name in self.plugins:
return False

# 记录插件信息,即使插件被禁用也会记录
self.plugin_info[plugin_name] = {
"name": plugin_name,
"description": plugin_class.description,
"author": plugin_class.author,
"version": plugin_class.version,
"enabled": False,
"class": plugin_class
}

# 如果插件被禁用则不加载
if is_disabled:
return False

plugin = plugin_class()
EventManager.bind_instance(plugin)
await plugin.on_enable(bot)
await plugin.async_init()
self.plugins[plugin_name] = plugin
self.plugin_classes[plugin_name] = plugin_class
self.plugin_info[plugin_name]["enabled"] = True
return True
except:
logger.error(f"加载插件时发生错误: {traceback.format_exc()}")
return False

async def unload_plugin(self, plugin_name: str) -> bool:
"""卸载单个插件"""
if plugin_name not in self.plugins:
return False

# 防止卸载 ManagePlugin
if plugin_name == "ManagePlugin":
logger.warning("ManagePlugin 不能被卸载")
return False

try:
plugin = self.plugins[plugin_name]
await plugin.on_disable()
EventManager.unbind_instance(plugin)
del self.plugins[plugin_name]
del self.plugin_classes[plugin_name]
if plugin_name in self.plugin_info.keys():
self.plugin_info[plugin_name]["enabled"] = False
return True
except:
logger.error(f"卸载插件 {plugin_name} 时发生错误: {traceback.format_exc()}")
return False

async def unload_all_plugins(self) -> tuple[List[str], List[str]]:
"""卸载所有插件"""
unloaded_plugins = []
failed_unloads = []
for plugin_name in list(self.plugins.keys()):
if await self.unload_plugin(plugin_name):
unloaded_plugins.append(plugin_name)
else:
failed_unloads.append(plugin_name)
return unloaded_plugins, failed_unloads

async def reload_plugin(self, bot: WechatAPIClient, plugin_name: str) -> bool:
"""重载单个插件"""
if plugin_name not in self.plugin_classes:
return False

# 防止重载 ManagePlugin
if plugin_name == "ManagePlugin":
logger.warning("ManagePlugin 不能被重载")
return False

try:
# 获取插件类所在的模块
plugin_class = self.plugin_classes[plugin_name]
module_name = plugin_class.__module__

# 先卸载插件
if not await self.unload_plugin(plugin_name):
return False

# 重新导入模块
module = importlib.import_module(module_name)
importlib.reload(module)

# 从重新加载的模块中获取插件类
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginBase) and
obj != PluginBase and
obj.__name__ == plugin_name):
# 使用新的插件类而不是旧的
return await self.load_plugin(bot, obj)

return False
except Exception as e:
logger.error(f"重载插件 {plugin_name} 时发生错误: {e}")
return False

async def reload_all_plugins(self, bot: WechatAPIClient) -> List[str]:
"""重载所有插件

Returns:
List[str]: 成功重载的插件名称列表
"""
try:
# 记录当前加载的插件名称,排除 ManagePlugin
original_plugins = [name for name in self.plugins.keys() if name != "ManagePlugin"]

# 卸载除 ManagePlugin 外的所有插件
for plugin_name in original_plugins:
await self.unload_plugin(plugin_name)

# 重新加载所有模块
for module_name in list(sys.modules.keys()):
if module_name.startswith('plugins.') and not module_name.endswith('ManagePlugin'):
del sys.modules[module_name]

# 从目录重新加载插件
return await self.load_plugins_from_directory(bot)

except:
logger.error(f"重载所有插件时发生错误: {traceback.format_exc()}")
return []
  • 从文件夹中加载插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
async def load_plugins_from_directory(self, bot: WechatAPIClient, load_disabled_plugin: bool = True) -> Union[List[str], bool]:
"""从plugins目录批量加载插件"""
loaded_plugins = []

for dirname in os.listdir("plugins"):
if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"):
try:
module = importlib.import_module(f"plugins.{dirname}.main")
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, PluginBase) and obj != PluginBase:
is_disabled = False
if not load_disabled_plugin:
is_disabled = obj.__name__ in self.excluded_plugins

if await self.load_plugin(bot, obj, is_disabled=is_disabled):
loaded_plugins.append(obj.__name__)

except:
logger.error(f"加载 {dirname} 时发生错误: {traceback.format_exc()}")
return False

return loaded_plugins

async def load_plugin_from_directory(self, bot: WechatAPIClient, plugin_name: str) -> bool:
"""从plugins目录加载单个插件

Args:
bot: 机器人实例
plugin_name: 插件类名称(不是文件名)

Returns:
bool: 是否成功加载插件
"""
found = False
for dirname in os.listdir("plugins"):
try:
if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"):
module = importlib.import_module(f"plugins.{dirname}.main")
importlib.reload(module)

for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginBase) and
obj != PluginBase and
obj.__name__ == plugin_name):
found = True
return await self.load_plugin(bot, obj)
except:
logger.error(f"检查 {dirname} 时发生错误: {traceback.format_exc()}")
continue

if not found:
logger.warning(f"未找到插件类 {plugin_name}")
return False
  • 插件信息获取
1
2
3
4
5
6
7
8
9
10
11
12
def get_plugin_info(self, plugin_name: str = None) -> Union[dict, List[dict]]:
"""获取插件信息

Args:
plugin_name: 插件名称, 如果为None则返回所有插件信息

Returns:
如果指定插件名,返回单个插件信息字典;否则返回所有插件信息列表
"""
if plugin_name:
return self.plugin_info.get(plugin_name)
return list(self.plugin_info.values())
  • (todo) 插件依赖获取
  • (todo) 插件设置GUI

3. 定时任务

在微信机器人中, 有很多任务都需要定时执行, 比如每天定时发送报纸, 清空用户签到状态等 因此需要封装一个类让插件方便地规定定时任务, 在此项目中, 我选择了APScheduler 和装饰器来实现 > APScheduler有一个特色表达式: Cron, 具体语法和用法请参阅 cron表达式

  • 定时任务增删

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def add_job_safe(scheduler: AsyncIOScheduler, job_id: str, func: Callable, bot, trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args):
    """添加函数到定时任务中,如果存在则先删除现有的任务"""
    try:
    scheduler.remove_job(job_id)
    except:
    pass
    scheduler.add_job(func, trigger, args=[bot], id=job_id, **trigger_args)

    def remove_job_safe(scheduler: AsyncIOScheduler, job_id: str):
    """从定时任务中移除任务"""
    try:
    scheduler.remove_job(job_id)
    except:
    pass

  • 定时任务装饰器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import AsyncIOScheduler
    from apscheduler.triggers.cron import CronTrigger
    from apscheduler.triggers.interval import IntervalTrigger

    scheduler = AsyncIOScheduler()


    def schedule(trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args) -> Callable:
    """
    定时任务装饰器

    例子:

    - @schedule('interval', seconds=30)
    - @schedule('cron', hour=8, minute=30, second=30)
    - @schedule('date', run_date='2024-01-01 00:00:00')
    """

    def decorator(func: Callable):
    job_id = f"{func.__module__}.{func.__qualname__}"

    @wraps(func)
    async def wrapper(self, *args, **kwargs):
    return await func(self, *args, **kwargs)

    setattr(wrapper, '_is_scheduled', True)
    setattr(wrapper, '_schedule_trigger', trigger)
    setattr(wrapper, '_schedule_args', trigger_args)
    setattr(wrapper, '_job_id', job_id)

    return wrapper

    return decorator

4. 事件映射

在插件化程序中, 事件处理是核心部分, 需要定义一个类来管理事件, 并将事件映射到插件中

  • 消息处理装饰器 >装饰器可以用来给函数添加额外的功能, 在LegendWeChatBot中, 使用装饰器来规范并简化插件处理消息的逻辑

在装饰器中添加了优先级和时间类型, 用来对不同类型的事件进行排序和筛选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#! utils/decorators.py
def on_message(priority=50):
"""消息装饰器, 具体消息参见具体文件"""

def decorator(func):
if callable(priority):
f = priority
setattr(f, '_event_type', 'message')
setattr(f, '_priority', 50)
return f
setattr(func, '_event_type', 'other_message')
setattr(func, '_priority', min(max(priority, 0), 99))
return func

return decorator if not callable(priority) else decorator(priority)


'''demo'''
@on_message
async def demo(bot, message):
...

  • 事件映射管理 >缺对象吗? 来Python这儿找一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#! utils/eventManager.py
import copy
from typing import Callable, Dict, List

class EventManager:
_handlers: Dict[str, List[tuple[Callable, object, int]]] = {}

@classmethod
def bind_instance(cls, instance: object):
"""将实例绑定到对应的事件处理函数"""
for method_name in dir(instance):
method = getattr(instance, method_name)
if hasattr(method, '_event_type'):
event_type = getattr(method, '_event_type')
priority = getattr(method, '_priority', 50)

if event_type not in cls._handlers:
cls._handlers[event_type] = []
cls._handlers[event_type].append((method, instance, priority))
# 按优先级排序,优先级高的在前
cls._handlers[event_type].sort(key=lambda x: x[2], reverse=True)

@classmethod
async def emit(cls, event_type: str, *args, **kwargs) -> None:
"""触发事件"""
if event_type not in cls._handlers:
return

api_client, message = args
for handler, instance, priority in cls._handlers[event_type]:
# 只对 message 进行深拷贝,api_client 保持不变
handler_args = (api_client, copy.deepcopy(message))
new_kwargs = {k: copy.deepcopy(v) for k, v in kwargs.items()}

result = await handler(*handler_args, **new_kwargs)

if isinstance(result, bool):
# True 继续执行 False 停止执行
if not result:
break
else:
continue # 我也不知道你返回了个啥玩意,反正继续执行就是了

@classmethod
def unbind_instance(cls, instance: object):
"""解绑实例的所有事件处理函数"""
for event_type in cls._handlers:
cls._handlers[event_type] = [
(handler, inst, priority)
for handler, inst, priority in cls._handlers[event_type]
if inst is not instance
]
  • 定时任务管理 >定时任务管理器,用于管理插件中需要实现的定时任务,包括添加、删除、执行任务等操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#! utils/decorators.py
def schedule(trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args
) -> Callable:
"""
定时任务装饰器

例子:

- @schedule('interval', seconds=30)
- @schedule('cron', hour=8, minute=30, second=30)
- @schedule('date', run_date='2024-01-01 00:00:00')
"""

def decorator(func: Callable):
job_id = f"{func.__module__}.{func.__qualname__}"

@wraps(func)
async def wrapper(self, *args, **kwargs):
return await func(self, *args, **kwargs)

setattr(wrapper, '_is_scheduled', True)
setattr(wrapper, '_schedule_trigger', trigger)
setattr(wrapper, '_schedule_args', trigger_args)
setattr(wrapper, '_job_id', job_id)

return wrapper

return decorator


def add_job_safe(scheduler: AsyncIOScheduler, job_id: str, func: Callable, bot, trigger: Union[str, CronTrigger, intervalTrigger], **trigger_args):
"""添加函数到定时任务中,如果存在则先删除现有的任务"""
try:
scheduler.remove_job(job_id)
except:
pass
scheduler.add_job(func, trigger, args=[bot], id=job_id, **trigger_args)


def remove_job_safe(scheduler: AsyncIOScheduler, job_id: str):
"""从定时任务中移除任务"""
try:
scheduler.remove_job(job_id)
except:
pass

todo list

  • 整理微信数据库操作模块
  • 初步添加功能
  • 初步制定命令格式
  • 完善开发和使用文档

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者