系统功能更新

消息处理协程锁

由于在先前的版本中, 为了避免风控检测, 在处理消息前加了协程锁LegendSemaphore, 导致消息处理在有的时候会抽风堵塞, 因此在本次更新中, 移除了每条消息前的锁, 改为了在发送文字消息前的随机延迟等待

1
time.sleep(random.randint(1, 3) / random.randint(2, 10))

那协程锁白写了吗?当然不是, 改成AI消息处理限制, 防止api爆炸

消息处理逻辑优化

由于积分系统和黑白名单系统的加入, 因此在消息处理前增添了判断逻辑以及自动踢人

1
2
3
4
5
6
7
8
9
10
11
12
# utils/LegendBot.py
if (
(msg.from_group() and self.DB.get_chatroom_whitelist(to) and self.DB.get_black(msg.sender) <= config.RobotConfig['black']) # 群聊且满足条件
or (not msg.from_group() and self.DB.get_black(msg.sender) <= config.RobotConfig['black']) # 私聊且满足条件
or msg.sender in config.admin # 来自管理员
):
...

# 自动踢人
elif self.DB.get_black(msg.sender) > config.RobotConfig['black'] and msg.from_group():
self.bot.sendMsg('你坏事做尽, 被移除群聊, 欢迎找kanwuqing面议解封, 有偿解封所得分发给群友作精神补偿', to, at)
self.bot.del_chatroom_members(msg.roomid, msg.sender)

数据库积分相关方法更新

添加了积分相关的处理逻辑, 实现了积分的增删改查, 以及更新用户签到状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# database/LegendBotDB.py
# 查看积分
def get_points(self, wxid: str) -> int:
return self._execute_in_queue(self._get_points, wxid)

def _get_points(self, wxid: str) -> int:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
return user.points if user else 0
finally:
session.close()

# 增减积分
def add_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point addition"""
return self._execute_in_queue(self._add_points, wxid, num)

def _add_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point addition"""
session = self.DBSession()
try:
# Use UPDATE with atomic operation
result = session.execute(
update(User)
.where(User.wxid == wxid)
.values(points=User.points + num)
)
if result.rowcount == 0:
# User doesn't exist, create new
user = User(wxid=wxid, points=num)
session.add(user)
logger.info(f"数据库: 用户{wxid}积分增加{num}")
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}积分增加失败, 错误: {e}")
return False
finally:
session.close()

# 设置积分
def set_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point setting"""
return self._execute_in_queue(self._set_points, wxid, num)

def _set_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point setting"""
session = self.DBSession()
try:
result = session.execute(
update(User)
.where(User.wxid == wxid)
.values(points=num)
)
if result.rowcount == 0:
user = User(wxid=wxid, points=num)
session.add(user)
logger.info(f"数据库: 用户{wxid}积分设置为{num}")
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}积分设置失败, 错误: {e}")
return False
finally:
session.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# database/LegendBotDB.py
def get_signin_stat(self, wxid: str) -> datetime.datetime:
"""获取用户签到状态"""
return self._execute_in_queue(self._get_signin_stat, wxid)

def _get_signin_stat(self, wxid: str) -> datetime.datetime:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if not user:
return [None, None, None]
return [user.lastSign, user.maxSign, user.fortune]
finally:
session.close()

def set_signin_stat(self, wxid: str, fortune: str) -> bool:
"""更新用户签到状态"""
return self._execute_in_queue(self._set_signin_stat, wxid, fortune)

def _set_signin_stat(self, wxid: str, fortune: str) -> bool:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if user:
# 获取实际的 lastSign 值
last_sign = user.lastSign
max_sign = user.maxSign
points = user.points

# 计算新的 maxSign 和积分
new_max_sign = max_sign + 1 if last_sign and datetime.datetime.today().date() == (last_sign + datetime.timedelta(days=1)).date() else max_sign
new_points = points + config.RobotConfig['signPoint'] + min(10, new_max_sign)

# 更新用户数据
session.execute(
update(User)
.where(User.wxid == wxid)
.values(
fortune=fortune,
maxSign=new_max_sign,
lastSign=datetime.datetime.today(),
points=new_points
)
)
else:
# 如果用户不存在,则创建新用户
user = User(wxid=wxid, fortune=fortune, maxSign=1, lastSign=datetime.datetime.today(), points=11)
session.add(user)

logger.info(f"数据库: 用户{wxid}签到状态设置成功")
session.commit()
return [user.lastSign, user.maxSign, user.fortune]
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}状态设置失败, 错误: {e}")
return False
finally:
session.close()

插件更新

api版本

VVQuest

由于VVQuest项目作者Daniel提供了api版本, 为提升插件运行速度, 因此决定保留本地及其部署方式作备用与参考, 并将插件更新为api版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# plugins/VVQuest/main.py
async def VVQuest(self, bot, msg):
...
try:
"""这里用aiohttp很耗时不知道为什么"""
res = await run_sync(requests.get)(f'https://api.zvv.quest/search?q={query}&n=1', timeout=20)

logger.debug(res)
res = res.json()

bot.send_image(res['data'][0], to)
LegendBotDB().add_points(msg.sender, -1)

LegendBotDB().set_running(msg.sender, False)

"""超时则用本地搜索"""
except TimeoutError:
res = await run_sync(self.im.search)(query, 1)

if len(res) == 0:
bot.sendMsg('未找到相关表情包', to, at)
return

original_file = res[0].replace('\\', '/')
file_extension = os.path.splitext(original_file)[1]
hash_object = hashlib.md5(original_file.encode())
hashed_filename = hash_object.hexdigest() + file_extension
temp_dir = 'plugins/VVQuest/cache'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_file_path = os.path.join(temp_dir, hashed_filename)
shutil.copyfile(original_file, temp_file_path)

# 获取绝对路径
abs_temp_file_path = os.path.abspath(temp_file_path)

# 发送文件
bot.send_image(abs_temp_file_path, to)

# 删除临时文件
os.remove(abs_temp_file_path)

except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())

肯德基疯狂星期四文案

每周都要吃肯德基的朋友有福了, 以后每周都有不同的理由让朋友v你50了

api来自pearAPI提供, 感谢

首先是肯德基文案获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@on_text_message
async def kfc(self, bot: LegendWechatBot, msg: WxMsg):
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

if not self.enable:
return

if msg.content == 'kfc':
async with aiohttp.ClientSession() as session:
url = f"https://api.pearktrue.cn/api/kfc?type=json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()
bot.sendMsg(rsp1['text'].replace('\\n', '\n'), to, at)

每周四还有定时发送功能😄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@schedule('cron', day_of_week='thu', hour=17, minute=0, second=0, misfire_grace_time=None)
async def send_kfc(self, bot: LegendWechatBot):
if not self.enable:
return

for group in LegendBotDB().get_chatroom_list():
async with aiohttp.ClientSession() as session:
url = f"https://api.pearktrue.cn/api/kfc?type=json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()
bot.sendMsg(rsp1['text'], group)

天气预报

api来自free-api提供, 感谢

天气预报共实现了如下功能 - 查询天气

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# plugins/Weather/main.py
if msg.content.count(' ') == 1:
city, day = msg.content.split(' ')
else:
city, day = msg.content, 0
if day >= 3:
return

async with aiohttp.ClientSession() as session:
url = f"https://api.seniverse.com/v3/weather/daily.json?key={self.key}&location={city}&language=zh-Hans&unit=c"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()

if 'status_code' in rsp1 and rsp1['status_code'] == "AP010006":
bot.sendMsg("城市名错误, 请重新输入", to, at)
return

rsp = rsp1["results"][0]["daily"][day]
upd = rsp1['results'][0]['last_update']
res = f"{city}{rsp['date']}天气, 更新于{upd}\n白天天气:{rsp['text_day']}, 夜间天气:{rsp['text_night']}\n最高温: {rsp['high']}, 最低温: {rsp['low']}\n降水概率: {rsp['precip']}%, 湿度: {rsp['humidity']}\n风力风向: {rsp['wind_direction']}{rsp['rainfall']}级, 风速: {rsp['wind_speed']}"

bot.sendMsg(res, to, at)
LegendBotDB().add_points(msg.sender, -1)

  • 预订天气预报

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # plugins/Weather/main.py
    if msg.content.startswith("预报 ") and msg.from_group():
    city = msg.content[3:]

    async with aiohttp.ClientSession() as session:
    url = f"https://api.seniverse.com/v3/weather/daily.json?key={self.key}&location={city}&language=zh-Hans&unit=c"
    async with session.get(url) as resp:
    if resp.status != 200:
    logger.warning(f"天气查询失败: {resp.status}")
    return
    rsp1 = await resp.json()

    if 'status_code' in rsp1 and rsp1['status_code'] == "AP010006":
    bot.sendMsg("城市名错误, 请重新输入", to, at)
    return

    if to in self.subs:
    if city in self.subs[to]:
    bot.sendMsg("该城市已订阅, 请勿重复订阅", to, at)
    return
    if len(self.subs[to]) > 5:
    bot.sendMsg("该群聊已订阅5个城市, 请先取消订阅", to, at)
    return

    self.subs[to].append(city)
    with open('plugins/Weather/subs.json', 'w', encoding='utf-8') as f:
    json.dump(self.subs, f, ensure_ascii=False, indent=4)
    bot.sendMsg("订阅成功", to, at)
    return

    self.subs[to] = [city]

  • 取消预订天气预报

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # plugins/Weather/main.py
    elif msg.content.startswith("td "):
    city = msg.content[3:]
    if to in self.subs:
    if city in self.subs[to]:
    self.subs[to].remove(city)
    with open('plugins/Weather/subs.json', 'w', encoding='utf-8') as f:
    json.dump(self.subs, f, ensure_ascii=False, indent=4)
    bot.sendMsg("取消订阅成功", to, at)
    return
    else:
    bot.sendMsg("该城市未订阅, 请先订阅", to, at)
    return
    else:
    bot.sendMsg("该群聊未订阅任何城市, 请先订阅", to, at)
    return

AI对话 (初版)

有了AI的接入, 机器人能真正意义上回答所有问题了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""这句很重要"""
async with sem['processAI']:

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

if msg.content.startswith("ai"):
if msg.content == 'ai':
bot.sendMsg("与AI对话, 每次消耗3积分, 命令格式: `ai 模型名称(默认为deepseek V3) 内容`\n目前支持的模型:\n1.v3(deepseekV3), r1(deepseekR1)", to, at)
return

query = msg.content[3:]

if query.count(' ') == 1:
model, query = query.split(' ')

else:
model = "v3"

try:
LegendBotDB().set_running(msg.sender, True)
# 调用 OpenAI API
logger.debug(model)
response = await self.client.chat.completions.create(
model=self.models[model], # 使用的模型
messages=[
# {"role": "system", "content": "你是一个帮助用户回答问题的助手。"},
{"role": "user", "content": query}
],
)

logger.debug('AI已返回')

# 获取 API 返回的内容
result = response.choices[0].message.content.replace("\\n", "\n")

# 发送结果给用户
bot.sendMsg(f"{result}", to, at)
LegendBotDB().set_running(msg.sender, False)
LegendBotDB().add_points(msg.sender, -3)

except Exception as e:
logger.error(f"调用 OpenAI API 时发生错误: {e}")
logger.error(traceback.format_exc())
LegendBotDB().set_running(msg.sender, False)

签到查看运势

机器人的正常运营依赖于插件的正常使用, 插件的正常使用离不开积分的管控和激励, 每天的签到获取积分的同时, 还加入了运势来增加趣味性

运势获取功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# plugins/SignUp/main.py
def get_fortune(self):
w_list = [3, 5, 11, 19, 20, 18, 10, 8, 4, 2]
w_sum = 0
for i in w_list:
w_sum += i
randVal = random.randint(0, w_sum)
rward = 0
for i in range(len(w_list)):
if randVal <= w_list[i]:
rward = i
break
randVal -= w_list[i]
# print('§' + names[rward] + '§')
i1 = random.randint(0, 9)
i2 = random.randint(0, 9)
while i1 == i2:
i2 = random.randint(0, 9)
i3 = random.randint(1, 8)
i4 = random.randint(1, 8)
while i3 == i4:
i4 = random.randint(1, 8)
if rward == 0 or rward == 9:
i3 = rward
i4 = rward
return self.names[rward] + '$' + self.good[i3][i1] + '$' + self.good_text[i3][i1] + '$' + self.good[i3][i2] + '$' + self.good_text[i3][i2] + '$' + self.bad[i4][i1] + '$' + self.bad_text[i4][i1] + '$' + self.bad[i4][i2] + '$' + self.bad_text[i4][i2]

def get_calender(self):
tg='癸甲乙丙丁戊己庚辛壬'
dz='亥子丑寅卯辰已午未申酉戌'
month_list = [None, '一月大', '二月平', '三月大', '四月小', '五月大', '六月小',
'七月大', '八月大', '九月小', '十月大', '十一月小', '十二月大']
week_list = [None, '星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期日']
date = datetime.datetime.now().date()
year = date.year
month = month_list[date.month]
day = date.day
week = week_list[date.isoweekday()]
if week == '星期六' or week == '星期日':
color = 'red'
else:
color = 'green'
year1 = tg[(year - 3) % 10] + dz[(year - 3) % 12]
return {'year': year, 'year1': year1, 'month': month, 'day': day, 'week': week, 'color': color}
展开

在这里插一首歌吧

-

在两年前我就说过, 要让好多好多人看到我们一起构思的运势, 现在这个目标离我越来越近, 但你却离我愈发遥远了...

签到功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# plugins/SignUp/main.py
@on_text_message
async def processSignMsg(self, bot: LegendWechatBot, msg: WxMsg):

if not self.enable:
return

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

try:
if msg.content == '签到':
lastSign, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
if lastSign and datetime.datetime.today().date() == lastSign.date():
bot.sendMsg('你已经签过到了', to, at)
return
else:
fortune = self.get_fortune()
lastSign = self.get_calender()
LegendBotDB().set_signin_stat(msg.sender, fortune)
_, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
fortune = fortune.split('$')

res = '签到成功!\n已连续签到%d天, 积分+%d, 目前积分: %d\n%s-农历%s年\n%s月%s日\n今日运势:%s\n宜:%s-%s\n宜:%s-%s\n忌:%s-%s\n忌:%s-%s\n输入`签到 查看运势/积分`可查询运势和积分' % tuple([maxSign, config.RobotConfig['signPoint'] + min(maxSign, 10), LegendBotDB().get_points(msg.sender), lastSign['year'], lastSign['year1'], lastSign["month"], lastSign["day"]] + fortune)

bot.sendMsg(res, to, at)
return
elif msg.content == '签到 查看积分':
bot.sendMsg('当前积分:%d' % LegendBotDB().get_points(msg.sender), to, at)
return
elif msg.content == '签到 查看运势':
lastSign, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
fortune = fortune.split('$')
res = '今日运势:%s\n宜:%s-%s\n宜:%s-%s\n忌:%s-%s\n忌:%s-%s' % tuple(fortune)

bot.sendMsg(res, to, at)
return

except Exception as e:
logger.warning('签到失败: %s' % e)
logger.error(traceback.format_exc())

todo list

  • 添加AI群聊自定义角色功能
  • 添加AI作图
  • AI聊天上下文功能
  • 完善文档

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者

Python相对引用详解

起因

受害者视角

假设有这样一个项目结构:

1
2
3
4
5
project/
├── main.py
└── sub/
├── __init__.py
└── sub.py
main.py中,我们希望导入sub模块,并调用sub模块中的函数。我们可能会尝试以下代码:
1
2
import sub.sub
sub.sub.func()
这样是没问题的, 然而如果我们想要把这个项目移到另一个项目中呢?

项目结构变成了这样

1
2
3
4
5
6
7
project_NB/
├── main1.py
└── project
├── main.py
└── sub/
├── __init__.py
└── sub.py
main1.py中,我们希望导入project模块中的main.py,并调用sub模块中的函数。我们可能会尝试以下代码:
1
2
from sub.main import func
func()
# 寄.

问题出在了哪儿?

在修改前后分别加上print(os.getcwd()), 发现原来先前工作目录和命令行的环境目录相同, 因此导入sub包时可以使用顶层文件夹的导入方式, 而在修改后, 命令行环境目录为project_NB, 而工作目录为project_NB/project, 因此无法找到sub包.

解决方案

既然问题是工作目录的问题, 那么我们只需要把工作目录修改为顶层文件夹即可

1
2
import os
os.chdir(os.path.dirname(os.path.abspath(__file__)))
再跑一遍

寄.

为啥?

原来是少些了一行, 没有设置sys中的路径变量

1
2
3
4
5
import sys
import os

os.chdir(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

搞定

抬杠

那这时候就有人要问了, 改完之后我还要在顶层文件夹下运行代码呢, 这不是拆东墙补西墙吗?

你傻啊?找个变量暂存一下初始的, 引用完改回来不就得了

1
2
3
4
5
6
7
8
9
10
11
12
os1 = os.getcwd()
sys1 = sys.path
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

meme_processor = importlib.import_module("meme_processor")
self.handle_message = meme_processor.handle_message
if not os.path.exists('temp'):
os.mkdir('temp')

os.chdir(os1)
sys.path = sys1

demo请参阅 LegendWechatBot

SyncToAsync

方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# utils/decorators.py
from typing import Callable, TypeVar
from typing_extensions import ParamSpec
from collections.abc import Coroutine
from functools import partial
import asyncio

P = ParamSpec("P")
R = TypeVar("R")

def run_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]:
"""一个用于包装 sync function 为 async function 的装饰器
参数:
call: 被装饰的同步函数
"""

@wraps(call)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_running_loop()
pfunc = partial(call, *args, **kwargs)
result = await loop.run_in_executor(None, pfunc)
return result

return _wrapper

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from utils.decorators import run_sync
import asyncio, time

# 模拟处理消息耗时操作, 两条print是为了观察执行顺序
def w(i):
print("hello", i)
time.sleep(5)
print("world", i)

# 模拟消息处理
async def deal(i):
await run_sync(w)(str(i))

# 模拟消息队列
async def main():
for i in range(10):
asyncio.create_task(deal(i))
asyncio.run(main())

# 让程序不退出, 模拟接收消息
while 1:
pass

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9
// 过了5秒
world 0
world 1
world 2
world 3
world 4
world 5
world 6
world 7
world 8
world 9

其他方法

调用时既可以用await run_sync()()

也可以用asyncio.create_task(run_sync()())

总之, 把run_sync当作一个async函数即可

具体使用

请参阅 LegendWechatBot 的VVQuest&Meme插件

系统功能更新

数据库线程安全

在Week2中, 已经实现了数据库的增删改查, 但是在多线程环境下, 需要保证数据库的线程安全, 防止多个线程同时操作数据库导致数据不一致的问题. 否则会报错: sqlite3.OperationalError: database is locked

MessageDB

使用lock对数据库操作进行加锁, 保证数据库操作的线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def save_message(self, msg: WxMsg, self_wxid) -> bool:
"""异步保存消息到数据库"""
async with self._lock:
async with self._async_session_factory() as session:
try:
message = Message(
msg_id=msg.id,
type=msg.type,
xml=msg.xml,
content=msg.content,
extra=msg.extra,
sender=msg.sender,
roomid=msg.roomid,
is_at=msg.is_at(self_wxid),
timestamp=datetime.now()
)
session.add(message)
await session.commit()
return True
except Exception as e:
logger.error(f"保存消息失败: {str(e)}")
await session.rollback()
return False

LegendBotDB

使用队列来确保每个操作步骤不冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _execute_in_queue(self, method, *args, **kwargs):
"""在队列中执行数据库操作"""
future = self.executor.submit(method, *args, **kwargs)
try:
return future.result(timeout=20) # 20秒超时
except Exception as e:
logger.error(f"数据库操作失败: {method.__name__} - {str(e)}")
raise

def demo(self, wxid: str) -> int:
return self._execute_in_queue(self._demo, wxid)

def _demo(self, wxid: str) -> int:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if user:
...
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}状态获取失败, 错误: {e}")
return False
finally:
session.close()


重写数据库表

在之前的版本中, 由于使用的框架不同, 返回的字段也不尽相同, 此次针对Wcf框架更新了数据库User表格式

1
2
3
4
5
6
7
8
9
10
class User(Base):
__tablename__ = 'user'

wxid = Column(String(20), primary_key=True, nullable=False, unique=True, index=True, autoincrement=False, comment='wxid')
points = Column(Integer, nullable=False, default=0, comment='points')
running = Column(Integer, nullable=False, default=False, comment='running')
signin_stat = Column(DateTime, nullable=False, default=datetime.datetime.fromtimestamp(0), comment='signin_stat')
signin_streak = Column(Integer, nullable=False, default=0, comment='signin_streak')
blacked = Column(Integer, nullable=False, default=-2, comment='black')
llm_thread_id = Column(JSON, nullable=False, default=lambda: {}, comment='llm_thread_id')

技术突破

引用消息解析

在机器人中, 涉及到不那么复杂的上下文时, 引用消息(类型为49)是一种非常优雅快速的解决方案. 在wcf框架中, 引用消息的返回分为主消息被引用消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@on_quote_message
async def get_quote_msg(self, bot: LegendWechatBot, msg: WxMsg):
try:
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

# 必须用lxml-xml, 原因在Week2文章中已写明
bs = BeautifulSoup(msg.content, 'lxml-xml')

# 主消息
msg.content = bs.find('title').text

# 被引用消息
quote = bs.find('refermsg')
quote_type = quote.find('type') # 被引用的消息类型
quote_sender = quote.find('chatusr').text # 被引用消息的发送者
quote_id = int(quote.find('svrid').text) # 被引用消息的id

# 在数据库中根据id查询
quoteMsg = await MessageDB().get_messages(msg_id=quote_id)

if not quoteMsg:
return # 引用消息不存在
quoteMsg = quoteMsg[0]

sync->async

在机器人中, 有很多地方需要等待异步操作完成, 例如获取消息, 获取用户信息等, 在之前的版本中, 很多方法都是同步(sync)执行的, 导致程序运行缓慢, 也没有很好的办法将其完美转变成异步(async)执行. 在此次更新中, 封装了一个装饰器, 使任何函数都可以轻松地变成异步函数, 并完美呈现出异步效果 >爽飞了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# utils/decorators.py
from typing import Callable, TypeVar
from typing_extensions import ParamSpec
from collections.abc import Coroutine
from functools import partial
import asyncio

P = ParamSpec("P")
R = TypeVar("R")

def run_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]:
"""一个用于包装 sync function 为 async function 的装饰器
参数:
call: 被装饰的同步函数
"""

@wraps(call)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_running_loop()
pfunc = partial(call, *args, **kwargs)
result = await loop.run_in_executor(None, pfunc)
return result

return _wrapper
使用demo请参见 同步转异步

本地包导入相关问题

这个问题特别特别特别烦银, 最后被我斩于马下, 详细解决方案请参见 本地包导入相关问题


新增功能

菜单功能

在机器人中, 有许多插件功能, 需要提供一个整合好的字典对象, 存放所有插件的公开信息, 方便用户查看和操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# plugins/Menu/main.py
def load_folders(self):
self.info = {}
folders = os.listdir(os.path.join(os.path.abspath(__file__), '../..'))
folders.remove('Menu')

for folder in folders:
with open(os.path.join(os.getcwd(), 'plugins', folder, 'config.yaml'), "rb") as f:
plugin_config = yaml.safe_load(f)
name = list(plugin_config.keys())[0]
plugin_config = plugin_config[name]

# 如果插件同时包含简介, 基础命令以及启用, 才能被菜单识别
if 'description' not in list(plugin_config.keys()) or not plugin_config['enable'] or 'cmd' not in list(plugin_config.keys()):
logger.debug(folder)
continue

self.info[folder] = {
'name': name,
'cmd': plugin_config['cmd'],
'description': plugin_config["description"],
}
logger.debug(self.info)

图片系统

很早之前就想要做一个图片系统, 用来存放用户个人的图片, 方便用于其他插件, 现在在Meme插件的简介催促下终于实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# plugins/ImageDeal/main.py
@on_text_message
async def dealImage(self, bot: LegendWechatBot, msg: WxMsg):
if not self.enable:
return

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

if msg.content == '图片':
bot.sendMsg('图片相关功能, 详细请发送具体命令前缀查看`下载图片` 下载图片\n`删除图片 图片名.后缀名` 删除图片\n`重命名图片 图片名.后缀名 新图片名.后缀名` 重命名图片', to, at)

#* 下载图片
if msg.content == '下载图片':
bot.sendMsg('下载图片, 用于制作表情包等插件功能\n命令格式: `下载图片`, 并引用需要下载的图片(必须是自己发的, 引用他人发的无效, 引用文件无效)\n每个人最多同时存在5张图片, 总大小不超过20MB', to, at)
return

#* 删除图片
if msg.content == '删除图片':
bot.sendMsg('删除图片, 命令格式: `删除图片 图片名.后缀名(在下载成功后返回)`', to, at)
return
if msg.content.startswith('删除图片 '):
msg.content = msg.content.replace('删除图片 ', '')
if not os.path.exists(os.path.join(self.folder, msg.sender, msg.content)):
bot.sendMsg('图片不存在', to, at)
return

os.remove(os.path.join(self.folder, msg.sender, msg.content))
bot.sendMsg('删除成功', to, at)
return

#* 重命名图片
if msg.content == '重命名图片':
bot.sendMsg('重命名图片, 命令格式: `重命名图片 图片名.后缀名(在下载成功后返回) 新图片名.后缀名(命名限制为windows限制, 另外不能有空格)`', to, at)
return
if msg.content.startswith('重命名图片 '):
msg.content = msg.content[6: ]
if not os.path.exists(os.path.join(self.folder, msg.sender, msg.content.split(' ')[0])):
bot.sendMsg('图片不存在', to, at)
return

if len(msg.content.split(' ')) != 2:
bot.sendMsg('命令格式错误', to, at)
return

if not os.path.exists(os.path.join(self.folder, msg.sender)):
os.mkdir(os.path.join(self.folder, msg.sender))


if os.path.exists(os.path.join(self.folder, msg.sender, msg.content.split(' ')[1])):
bot.sendMsg('新图片名已存在', to, at)
return

filename = msg.content.split(' ')[1]
if self.is_valid_filename(filename) and (filename.endswith('.jpg') or filename.endswith('.png')):
os.rename(os.path.join(self.folder, msg.sender, msg.content.split(' ')[0]), os.path.join(self.folder, msg.sender, msg.content.split(' ')[1]))
bot.sendMsg('重命名成功', to, at)
return
else:
bot.sendMsg('新图片名包含非法字符', to, at)
return

在重命名文件时, 要对文件名进行合法性检查, 防止注入或其他风险

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# plugins/ImageDeal/main.py
def is_valid_filename(self, filename) -> bool:
# 检查文件名是否包含非法字符
if re.search(r'[<>:"/\\|?*]', filename):
return False

# 检查文件名是否以空格或句点结尾
if filename.endswith(' ') or filename.endswith('.'):
return False

# 检查文件名是否为保留名称
reserved_names = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"
]
if filename.upper() in reserved_names:
return False

return True

除此之外, 下载文件需要在收到引用消息时执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
on_quote_message
async def downloadImage(self, bot: LegendWechatBot, msg: WxMsg):
try:
if not self.enable:
return

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

bs = BeautifulSoup(msg.content, 'lxml-xml')

msg.content = bs.find('title').text
quote = bs.find('refermsg')

if msg.content == '下载图片':
if quote is None:
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return
if quote.find('type').text != '3':
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return
if quote.find('chatusr').text != msg.sender:
bot.sendMsg('引用无效, 这不是你发的', to, at)
return

if not os.path.exists(os.path.join(self.folder, msg.sender)):
os.mkdir(os.path.join(self.folder, msg.sender))

if len(os.listdir(os.path.join(self.folder, msg.sender))) > 5 or self.calcSize(msg.sender) > 20:
bot.sendMsg('图片数量或大小超过限制, 请删除一些图片', to, at)
return

msgId = int(quote.find('svrid').text)

logger.debug(f"msgId: {msgId}")

quoteMsg = await MessageDB().get_messages(msg_id=msgId)

if not quoteMsg:
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return

res = await run_sync(bot.download_image)(msgId, quoteMsg[0].extra, os.path.abspath(os.path.join(self.folder, msg.sender)), 10)

if not res:
bot.sendMsg('图片下载失败', to, at)
logger.warning(f"图片下载失败, msgId: {msgId}")
else:
bot.sendMsg(f'图片下载完成, 保存为{os.path.basename(res)}', to, at)
except Exception as e:
logger.error(f"图片下载失败, msgId: {msgId}, error: {e}, traceback: {traceback.format_exc()}")
bot.sendMsg('图片下载失败', to, at)
return

即便如此, 微信机器人终究是在自己机子上跑的, 不能用来被当作网盘, 人家网盘还有空间限制呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def calcSize(self, wxid):
# 计算文件夹总大小
size = 0
for root, _, files in os.walk(os.path.join(self.folder, wxid)):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size / 1024 / 1024

# 每天自动清理文件夹
@schedule('interval', days=1)
async def _del(self):
if self.autoDel:
try:
shutil.rmtree(self.folder)
os.mkdir(self.folder)
except:
logger.error("删除文件夹失败")

表情包生成

算是一个比较大的插件了, 项目来自于 meme-generator, 自己做了下接口的整合, 还有那个烦人的相对引入问题😡 表情包生成相关的方法参见meme项目仓库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# plugins/Meme/meme_processor.py
def generate_meme(
key: str, images: list[str], texts: list[str], args: dict[str, Any], msg: WxMsg
) -> str:
try:
meme = get_meme(key)
except NoSuchMeme:
return f'表情 "{key}" 不存在!', None

imgs = []
# 主要改了这里, 防止注入, 并兼容ImageDeal插件
for image in images:
image = os.path.basename(image)
img: Path = Path().cwd() / 'plugins/ImageDeal/images' / msg.sender / image
if not Path(img).exists():
return f'图片路径 "{image}" 不存在!', None
imgs.append(img)

try:
result = meme(images=imgs, texts=texts, args=args)
content = result.getvalue()
ext = filetype.guess_extension(content)
filename = f"plugins/Meme/temp/result{uuid.uuid4()}.{ext}"
with open(filename, "wb") as f:
f.write(content)
return f'表情制作成功!', filename
except MemeGeneratorException as e:
return str(e), None


todo list

  • api相关方法封装
  • 订阅相关方法封装
  • 部署文档
  • 规则文档
  • 使用文档

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者

在下文中, LWB代表 LegendWechatBot, LBW同理

命令处理规范

在LBW的前身作品中, 我曾经尝试过使用NLP技术来处理用户的指令, 效果却不尽人意. 原因是NLP在处理中文指令时, 效果并不理想, 同时还会遇到各种意想不到的语法或者逻辑问题. 因此, 在LBW中, 我决定采用一种更为简单直接的方法来处理用户的指令, 即使用命令处理规范.

指令格式

在LBW插件中, 有两种指令格式, 分别是简略版和详细版. 其中简略版是每个插件都需要具备的, 而详细版则是供要求参数较复杂, 对于返回结果要求更高, 更精细化的用户使用

简略版

该格式受到Python函数调用格式启发

简略版指令格式如下:

1
插件别名 指令参数1 指令参数2 ... 指令参数n
其中插件别名是写在main.py的__init__定义的cmd变量中的, 每个插件别名数量需<=1

调用简略版指令需要私聊或@

例如:

1
@bot 天气 查询 上海 今天

详细版

详细版指令受命令行命令格式启发

详细版指令格式如下:

1
/指令 -参数1 参数值1 ... -参数n 参数值n
例如:
1
/weather -m pre -c 上海 --date 2025-03-10
>其中各个参数的位置可以颠倒

指令处理

简略版指令很好处理, 不断地startswith然后split即可

详细版指令由于不知道每个插件所需的参数, 因此需要使用正则表达式来预解析指令, 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import re

def parse_command(command):
# 定义正则表达式模式
pattern = r'/(?P<command>\w+)(?:\s+-\w+\s+(?:"[^"]*"|\S+))*'
matches = re.finditer(pattern, command)

# 初始化结果字典
result = {}

for match in matches:
# 提取命令
cmd = match.group('command')
result['command'] = cmd

# 提取参数
params_pattern = r'-(?P<key>\w+)\s+(?P<value>"[^"]*"|\S+)'
params_matches = re.finditer(params_pattern, command)

params = {}
for param_match in params_matches:
key = param_match.group('key')
value = param_match.group('value')
# 去掉引号
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
params[key] = value

result['params'] = params

return result

项目新增结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
LegendWechatBot
├─ database # 机器人数据存储
│ ├─ keyval.db
│ ├─ keyvalDB.py
│ ├─ Legendbot.db
│ ├─ LegendBotDB.py # 用来存储好友以及群聊信息
│ ├─ message.db
│ ├─ messsagDB.py # 单独存储消息
│ └─ __init__.py
├─ docs
│ └─ ban.txt # 违禁词词库
├─ LICENSE
├─ logs # 存储运行日志
├─ plugins # 插件文件夹
│ ├─ AcceptFriend # 自动通过好友
│ │ ├─ config.yaml # 插件配置文件
│ │ ├─ main.py # 插件入口
│ │ └─ __init__.py # 使文件夹可被识别
│ ├─ GroupWelcome # 入群欢迎
│ │ ├─ config.yaml
│ │ ├─ main.py
│ │ └─ __init__.py
│ ├ TCM # 中药药方查询
│ │ ├─ config.yaml # 配置文件, 同时包括中药药方的决策树
│ │ ├─ main.py
│ │ └─ __init__.py
│ └─ VVQuest # 张维为表情包查询 (下周更新)
│ ├─ config.yaml # 插件配置文件
│ ├─ config
│ │ ├─ config.yaml # 项目配置文件
│ │ ├─ settings.py
│ │ └─ __init__.py
│ ├─ data
│ │ ├─ embeddings.pkl # api模型文件
│ │ ├─ embeddings_bge-large-zh-v1.5.pkl # 本地模型文件
│ │ ├─ images # 图片文件夹
│ │ ├─ image_dirs
│ │ └─ models
│ │ └─ bge-large-zh-v1.5 # 调用的预训练模型
│ ├─ LICENSE
│ ├─ README.md
│ ├─ services
│ │ ├─ embedding_service.py # 模型服务
│ │ ├─ image_search.py # 图片搜索服务
│ │ └─ utils.py # 其他方法
│ └─ main.py
└─ main.py # 主启动程序

插件规范

在LBW中, 我建议每个插件都需要遵循一定的规范, 以便更好地管理和维护. LBW的插件规范主要包括以下几个方面:

插件目录结构

每个插件都需要有一个独立的目录, 目录名即为插件名. 插件目录下需要包含以下文件:

  • config.yaml: 插件配置文件, 用于存储插件的配置信息.
  • __init__.py: 占位
  • main.py: 插件的主程序文件, 包含插件的入口函数和指令处理函数.

配置文件

配置文件用于存储插件的配置信息, 最外层为插件名, 其中比需包含 enable 字段用来管理该插件是否启用; 包含 description 以及 author 用来显示插件简介和作者; 包含 cmd 用来规定简略指令. 以下是为示例

1
2
3
4
5
6
7
8
TCM:
enable: true
description: 中药药方查询
author: Kanwuqing
cmd: 中药

otherSettings:
...

主程序文件

主程序文件是插件的入口函数和指令处理函数, 需要包含和插件名同名的类, 该类需继承自 plugin.py 中的 PluginBase 插件基类, 处理消息的函数需使用 on_message 装饰器并使用async关键字进行异步处理, 同时需要对简略指令进行SJ(特判), 以输出插件的详细指令用法, 以下为示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# plugins/Demo/main.py
from utils.decorators import on_system_message
from utils.plugin import PluginBase

class Demo(PluginBase):

def __init__(self):
super().__init__()

# 读取设置
with open("plugins/Demo/config.yaml", "rb") as f:
plugin_config = yaml.safe_load(f)

config = plugin_config["Demo"]
self.enable = config["enable"]
self.cmd = config["cmd"]

@on_system_message
async def demo(self, bot: LegendWechatBot, message: WxMsg):
if not self.enable:
return

if message.content == self.cmd:
bot.sendMsg('Demo使用方法', message.sender)
...

新增功能

数据库管理

有数据才有服务

在LegendWechatBot中, 核心维护增添的数据库共有两个: LegendBotDBmessageDB.

LegendBotDB

LegendBotDB是用于存储个性化用户和群聊信息的数据库 (插件相关的不存, 留给插件自己存)

LegendBotDB不会在每次启动时清空, 但提供了清空的方法 recreate (用于更新字段时)

Users 表主要需要维护的字段有: - id: 用户或群聊的唯一标识符 - points: 用户积分 - running: 用户或是否在运行大任务 - black: 用户是否在黑名单

Groups 表主要需要维护的字段有: - whitelist: 需要处理消息的群聊白名单

messageDB

messageDB是用于存储需要处理消息的数据库, 主要用于存储消息的发送者, 接收者, 消息内容, 消息时间, 消息类型, 消息状态等信息, 便于回溯

messageDB会在每次启动时清空

Messages 表主要需要维护的字段有: - id: 消息的唯一标识符 - from_id: 消息的发送者 - to_id: 消息的接收者 - content: 消息的内容 - timestamp: 消息的时间 - type: 消息的类型

自动添加好友

通过使用 bs4 解析好友验证消息(类型为37)content字段后可以得到 v3v4字段, 通过这两个字段可以调用微信接口从而实现自动添加好友

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# plugins/AcceptFriend/main.py
@on_other_message
async def handleRequest(self, bot: LegendWechatBot, msg: WxMsg):
# 如果功能未启用或消息类型不是37,则返回
if not self.enable or msg.type != 37:
return

# 使用BeautifulSoup解析消息内容
bs = BeautifulSoup(msg.content, 'lxml-xml')
# 查找消息元素
msg_ = bs.find('msg')
# 如果找到了消息元素
if msg_:
# 获取场景值
scene = int(msg_.get('scene'))
# 获取加密用户名
v3 = msg_.get('encryptusername')
# 获取票据
v4 = msg_.get('ticket')
# 获取用户名
userName = msg_.get('fromusername')
# 如果接受新好友请求成功
if bot.accept_new_friend(v3, v4, scene):
# 记录成功信息
logger.success(f'已接受好友请求: {userName}')
return
else:
# 记录失败信息
logger.warning(f'接受好友请求失败: {userName}')
return
else:
# 如果没有找到消息元素,则返回
return

在添加新好友后, 系统会发送一条消息(类型为10000), 消息内容为 你已添加了XXX,现在可以开始聊天了, 此时需要通过 bs4 解析消息内容, 获取到新好友的 idname, 同时发送欢迎消息

1
2
3
4
5
6
7
8
9
10
11
12
# plugins/AcceptFriend/main.py
@on_system_message
async def handleNewFriend(self, bot: LegendWechatBot, msg: WxMsg):
# 使用正则表达式匹配消息内容,获取新朋友的昵称
nickName = re.findall(r"你已添加了(.*),现在可以开始聊天了。", msg.content)
# 如果匹配成功,则将新朋友的昵称赋值给nickName变量
if nickName:
nickName = nickName[0]
# 如果新朋友的昵称存在,且启用了新朋友处理功能,且消息类型为10000或10002,则发送欢迎消息
if nickName and self.enable:
bot.sendMsg(f'你好呀, {nickName}', msg.sender, None)
return

群聊新成员欢迎

通过使用正则表达式解析消息内容, 获取到新成员的 idname, 并查询微信数据库, 获取到用户头像链接, 整合至欢迎消息中发送给新成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# plugins/GroupWelcome/main.py
@on_system_message
async def groupWelcome(self, bot: LegendWechatBot, message: WxMsg):
if not self.enable:
return

# 如果不是群聊消息,则返回
if not message.from_group():
return

# 如果消息类型不是10000或10002,则返回
if message.type != 10000 and message.type != 10002:
return

# 如果群聊不在白名单中,则返回
if not LegendBotDB().get_chatroom_whitelist(message.roomid):
return

# 如果消息内容中包含邀请加入群聊的信息,则解析成员信息
if re.findall(r'"(.*?)"邀请"(.*?)"加入了群聊', message.content): # 通过邀请加入群聊
new_members = self._parse_member_info(bot, message, "invitation")
elif re.findall(r'"(.*)"加入了群聊', message.content): # 直接加入群聊
new_members = self._parse_member_info(bot, message, "direct")
elif re.findall(r'"(.*?)"通过(.*?)加入群聊', message.content): # 通过邀请链接加入群聊
new_members = self._parse_member_info(bot, message, "inviters")
else:
logger.warning(f"未知的入群方式: ")
return

if not new_members:
return

for member in new_members:
wxid = member["wxid"]
nickname = member["nickname"]

now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 操作微信数据库, 从指定表里查询指定wxid的bigHeadImgUrl
img = bot.query_sql('MicroMsg.db', f'SELECT bigHeadImgUrl FROM ContactHeadImgUrl WHERE usrName="{wxid}";')
if img:
img = img[0]['bigHeadImgUrl']
else:
img = ''
logger.debug(f"{nickname} {str(wxid)} {img} 加入了群聊")
bot.send_rich_text(nickname, str(wxid), f"👏欢迎 {nickname} 加入群聊!🎉", f"⌚时间:{now}\n{self.welcome_message}\n🔗点击查看更多信息", 'https://kanwuqing.github.io', img, message.roomid)
return

@staticmethod
def _parse_member_info(bot: Wcf, message: WxMsg, link: str):
# 定义一个空列表,用于存储用户信息
user = []
# 直接加群
if link == "direct":
pattern = r'"(.*)"加入了群聊'
user = re.findall(pattern, message.content)[0].split("、")

# 匿名邀请加群
elif link == "inviters":
pattern = r'"(.*?)"通过(.*?)加入群聊'
user = re.findall(pattern, message.content)[0][0].split("、")

# 邀请加群
elif link == "invitation":
pattern = r'"(.*?)"邀请"(.*?)"加入了群聊'
user = re.findall(pattern, message.content)[0][1].split("、")

# 定义一个空列表,用于存储用户信息
users = []
# 暂停3秒
time.sleep(3)
# 获取群聊成员信息
mem = bot.get_chatroom_members(message.roomid)
# 打印群聊成员信息
logger.debug(mem)
# 遍历用户信息
for i in user:
# 将用户信息添加到users列表中
users.append({"wxid": get_key(mem, i), "nickname": i})
# 返回用户信息
return users

中药药方查询

在完成创新项目时, 我想到了通过微信来搭建中医问诊自动化的平台. 恰好本人在中医方面也略懂一些皮毛, 结合自己对于不同药方的病症的汇总与整理, 便通过决策树实现了这一功能.

  • 多轮对话实现方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # plugins\TCM\main.py
    self.users = {} # 正在问诊的用户
    if msg.content.startswith('q '):
    # 如果该用户已经在问诊中,则发送提示信息
    if msg.sender in self.users:
    bot.sendMsg("你已经处于一个问诊中了, 黑名单指数+1, 请注意", to, at)
    # 将该用户加入黑名单,黑名单指数+1
    await LegendBotDB.add_black(wxid=msg.sender, n=1)
    return

    # 更新问诊状态
    if msg.content in self.users[msg.sender]:
    self.users[msg.sender] = self.users[msg.sender][msg.content]

  • 决策树实现方式

    决策树

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    <!-- 以桂枝汤系为例 -->
    桂枝汤:
    q: 发热吗?(有/没有)
    有:
    q: 头部后颈有牵扯不适的感觉吗?(有/没有)
    有:
    q: 全身酸痛吗?(酸痛/不酸痛)
    酸痛:
    q: 平时有汗吗?(有/没有)
    有: 桂枝加葛根汤
    没有: 葛根汤
    不酸痛:
    q: 痉(挛), 角弓反张, 即头和下肢后弯, 躯干向前; 或是怕风/自发性出汗吗?(痉症/恶风有汗)
    痉症: 栝蒌桂枝汤
    恶风有汗: 桂枝汤
    没有:
    q: 平时体力体质好吗?(好/不好)
    好:
    q: 咽炎, 咽部难受; 或是自觉怕冷, 哪怕加衣服靠近暖气也不见好, 且同时发热?(咽炎/恶寒发热)
    咽炎: 桂枝二越婢一汤
    恶寒发热: 麻桂各半汤
    不好:
    q: 有时自觉怕冷, 哪怕加衣服靠近暖气也不见好, 有时发热; 或是肚子疼, 食欲不振?(往来寒热/腹痛)
    往来寒热: 柴胡桂枝汤
    腹痛: 小建中汤
    没有:
    q: 肚子疼吗?(疼/不疼)
    疼:
    q: 怕冷但是可通过穿衣服等操作缓解吗?(是/不是)
    是:
    q: 唾液多; 或是身体局部气血不通, 胸闷/手脚冷/痉(挛), 角弓反张, 即头和下肢后弯, 躯干向前等?(唾液多/气血不通)
    唾液多: 当归四逆加吴茱萸生姜汤
    气血不通: 当归四逆汤
    不是:
    q: 失眠吗?(有/没有)
    有: 桂枝加龙骨牡蛎汤
    没有:
    q: 肚子胀; 或是便秘?(肚子胀/便秘)
    肚子胀: 桂枝加芍药汤
    便秘: 桂枝加大黄汤
    不疼:
    q: 怕冷但是可通过穿衣服等操作缓解吗?(是/不是)
    是:
    q: 心烦; 或是小便困难, 四肢活动受限?(心烦/小便困难)
    心烦: 桂枝附子汤
    小便困难: 桂枝加附子汤
    不是:
    q: 有鼻涕, 较浓, 可能还会流到咽部; 或是皮肤表面有水肿, 有黄色的汗?(鼻涕/黄汗)
    鼻涕: 葛根汤加川芎辛夷
    黄汗: 桂枝加黄耆汤

  • 首轮问询中相关性匹配实现方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @staticmethod
    def find_most_similar_question(tree, user_input):
    # 遍历树形结构,返回所有问题及其路径
    def traverse_tree(tree, path=[]):
    # 定义一个空列表,用于存储问题及其路径
    questions = []
    # 遍历树形结构的每个节点
    for key, value in tree.items():
    # 如果节点的值是一个字典
    if isinstance(value, dict):
    # 如果字典中包含问题
    if 'q' in value:
    # 将问题及其路径添加到列表中
    questions.append((value['q'], path + [key]))
    # 递归调用traverse_tree函数,继续遍历子节点
    questions.extend(traverse_tree(value, path + [key]))
    # 返回问题及其路径的列表
    return questions

    questions = traverse_tree(tree)
    question_texts = [q[0] for q in questions]
    most_similar = difflib.get_close_matches(user_input, question_texts, n=3, cutoff=0.0)

    paths = []
    ques = []
    if most_similar:
    for question, path in questions:
    if question in most_similar:
    paths.append(path)
    ques.append(question)
    return paths, ques
    return [], []

张维为表情包查询

在开发过程中, 好友 Daniel 基于bge-large-zh中文嵌入模型模型开源了一个张维为表情包匹配网站 VVQuest, 我将其处理成插件形式, 删除了不需要的部分(如前端), 并将其集成到项目中.

  • 二次开发 首次运行前请自行下载bge-large-zh模型, 并将其放置在plugins/VVQuest/data/models目录下. 首次运行时会自动生成缓存, 请耐心等待

  • 代码出处及详细开发文档请参见 VVQuest

敏感词检测

由于基于微信平台, 并对公众开放, 所以需要针对某些损害公共利益以及环境的行为进行检测, 其中就包含了敏感词检测, 在开发过程中, 我编写了敏感词检测程序并集成了敏感词库存放在docs/ban.txt

  • 敏感词检测程序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    class DFA:
    def __init__(self):
    self.ban_words_set = set()
    self.ban_words_list = []
    self.ban_words_dict = {}
    self.path = 'docs/ban.txt'
    self.get_words()

    # 获取敏感词列表
    def get_words(self):
    with open(self.path, 'r', encoding='utf-8-sig') as f:
    for s in f:
    if s.find('\\r'):
    s = s.replace('\r', '')
    s = s.replace('\n', '')
    s = s.strip()
    if len(s) == 0:
    continue
    if str(s) and s not in self.ban_words_set:
    self.ban_words_set.add(s)
    self.ban_words_list.append(str(s))
    self.add_hash_dict(self.ban_words_list)


    # 将敏感词列表转换为DFA字典序
    def add_hash_dict(self, new_list):
    for x in new_list:
    self.add_new_word(x)

    # 添加单个敏感词
    def add_new_word(self, new_word):
    new_word = str(new_word)
    # print(new_word)
    now_dict = self.ban_words_dict
    i = 0
    for x in new_word:
    if x not in now_dict:
    x = str(x)
    new_dict = dict()
    new_dict['is_end'] = False
    now_dict[x] = new_dict
    now_dict = new_dict
    else:
    now_dict = now_dict[x]
    if i == len(new_word) - 1:
    now_dict['is_end'] = True
    i += 1

    # 寻找第一次出现敏感词的位置
    def find_illegal(self, _str):
    now_dict = self.ban_words_dict
    i = 0
    start_word = -1
    is_start = True # 判断是否是一个敏感词的开始
    while i < len(_str):
    if _str[i] not in now_dict:
    if is_start is True:
    i += 1
    continue
    i = start_word +1
    start_word = -1
    is_start = True
    now_dict = self.ban_words_dict
    else:
    if is_start is True:
    start_word = i
    is_start = False
    now_dict = now_dict[_str[i]]
    if now_dict['is_end'] is True:
    return start_word
    else:
    i += 1
    return -1

    # 查找是否存在敏感词
    def exists(self, s):
    pos = self.find_illegal(s)
    if pos == -1:
    return False
    else:
    return True

    # 将指定位置的敏感词替换为*
    def filter_words(self, filter_str, pos):
    now_dict = self.ban_words_dict
    end_str = int()
    for i in range(pos, len(filter_str)):
    if now_dict[filter_str[i]]['is_end'] is True:
    end_str = i
    break
    now_dict = now_dict[filter_str[i]]
    num = end_str - pos + 1
    filter_str = filter_str[:pos] + '*'*num + filter_str[end_str + 1:]
    return filter_str

    def filter_all(self, s):
    pos_list = list()
    ss = self.draw_words(s, pos_list)
    illegal_pos = self.find_illegal(ss)
    while illegal_pos != -1:
    ss = self.filter_words(ss, illegal_pos)
    illegal_pos = self.find_illegal(ss)
    i = 0
    while i < len(ss):
    if ss[i] == '*':
    start = pos_list[i]
    while i < len(ss) and ss[i] == '*':
    i += 1
    i -=1
    end = pos_list[i]
    num = end-start+1
    s = s[:start] + '*'*num + s[end+1:]
    i += 1
    return s

    @staticmethod
    def draw_words(_str, pos_list):
    ss = str()
    for i in range(len(_str)):
    if '\u4e00' <= _str[i] <= '\u9fa5' or '\u3400' <= _str[i] <= '\u4db5' or '\u0030' <= _str[i] <= '\u0039' \
    or '\u0061' <= _str[i] <= '\u007a' or '\u0041' <= _str[i] <= '\u005a':
    ss += _str[i]
    pos_list.append(i)
    return ss

敏感词需要在所有插件处理前单独判断, 因此不作为插件出现

  • 敏感词检测逻辑
    1
    2
    3
    4
    if dfa.exists(msg.content):
    self.DB.add_black(msg.sender, 2)
    self.bot.sendMsg('说话太刑了, 黑名单指数+2, 请注意', to, at)
    return

社区贡献指南

🚩 提交代码时请使用标准化 Commit Message:

  • feat: 新功能
  • fix: Bug修复
  • docs: 文档更新
  • style: 代码格式
  • ref: 代码重构
  • perf: 性能优化
  • test: 测试相关
  • chore: 构建/工具变更

提交贡献步骤

  • Fork项目仓库:在GitHub上找到你想要贡献的项目,点击项目页面上的"Fork"按钮
  • 克隆仓库到本地:使用git clone命令将你Fork的项目克隆到本地
  • 创建新分支:在本地仓库中创建一个新分支进行你的工作,这是一个良好的实践。使用git checkout -b branch-name创建并切换到新分支
  • 提交更改:使用git add和git commit命令提交你的更改
  • 推送更改到GitHub:使用git push origin branch-name将更改推送回你的GitHub仓库
  • 创建Pull Request:在GitHub上,你会看到一个"Compare & pull request"按钮。点击它,填写PR的标题和描述,然后提交

todo list

  • 黑名单阈值, 达到阈值后直接不响应并踢出群聊
  • 群管理制度, 结合群众力量管理社区氛围
  • 实现引用消息相关的功能, 如下载引用的图片
  • 表情包生成插件

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者

项目灵感

  • 本项目深受官方Q群机器人以及微信机器人的启发, 但不满足于现有的功能, 因此决定自己动手写一个微信机器人, 以实现更多功能

项目总目标

实现一个微信机器人, 能够实现以下功能:

  • 接收并处理
    展开查看
    • 文本消息
    • 系统消息
      • 群聊新成员加入消息
      • 好友验证消息
      • 红包消息
      • 转账消息
    •       <li>图片消息
            <li>语音消息
            <li>视频消息
            <li>文件消息
            <li>位置消息
            <li>群聊at消息
            <li>私聊消息
            <li>违禁消息
        </ul>
  • 发送
    展开查看
    • 文本消息
    • 图片消息
    • 语音消息
    • 视频消息
    • 文件消息
    • 链接消息
    • 群聊at消息
    • 私聊消息
  • 系统
    展开查看
    • 群管理系统
    • 积分系统
    • 违禁词管理系统
    • 定时任务系统
    • 插件系统
    • 日志系统
    • 数据统计系统
    • 速率限制系统
    • 热更新系统
    • GUI系统

项目基础

编程语言: Python 3.11.9

微信调用框架: WCFerry

数据库: SQLite

日志集成: loguru

定时任务框架: APScheduler

项目开发日志 (Week 1)

项目初始化

刚刚有灵感不久, 我便确定了项目的基础框架, 唯独微信框架拿捏不定, 眼前有数个微信协议可供使用:

网页版协议 (以 itchat 为代表)

iPad协议 (以 Wechaty 为代表)

hook (以 ntchatwcferry 为代表)

综合分析之后, 我发现itchat以及其所属的网页版微信协议已经不再维护, 较晚注册的微信账号也无法登录网页版; Wechaty功能很全, 但需要购买Token, 仅有基础功能是开源的; ntchat也早在3年前也已经停止维护, 这些不符合项目需求, 因此我起初选择了近期仍在更新的wcferry作为我的微信协议, 并参照其demo进行了从0的开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
项目结构 (以后的日志中仅贴出做修改的结构部分)
LegendWechatBot
├─ config # 机器人主设置
│ ├─ config.py
│ └─ config.yaml
├─ README.md
├─ main.py # 主启动程序
├─ requirements.txt
├─ robot.py # 机器人核心程序
├─ try.py # (非必要) 用来离线测试
└─ utils
├─ asyncEnsure.py # 消息发送频率限制
├─ changeHandler.py # 热更新
├─ decorators.py # 插件处理消息类型装饰器
├─ dfa.py # 敏感词模块
├─ eventManager.py # 事件管理器
├─ LegendBot.py # 机器人核心类
├─ LegendMsg.py # 消息归一化
├─ logger.py # 日志模块
├─ plugin.py # 插件模块
└─ singleton.py # 单例归一化

难点&技术突破

1. 热更新

由于我需要频繁地修改插件, 因此我需要实现热更新, 以便在修改插件后无需重启机器人即可生效

文件改变检测使用了 watchdog 库, 在检测到文件改变后, 会调用重启回调函数, 重启程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#! utils/changeHandler.py
from watchdog.events import FileSystemEventHandler, FileSystemEvent

# 封装检测文件改变并判断是否重启的类
class ConfigChangeHandler(FileSystemEventHandler):
def __init__(self, restartCallback = restartProgram):
'''初始化配置文件变化处理器
:param restartCallback: 重启回调函数, 默认为None, 即不执行任何操作
'''
self.restartCallback = restartCallback
self.lastTriggered = 0
self.cooldown = 2 # 冷却时间(秒)
# 当发生错误时才会开始等待
self.waiting = False # 是否在等待文件改变

def onModified(self, event: FileSystemEvent):
if not event.is_directory:
# 判断是否在冷却时间内(防止抽风)
currentTime = time.time()
if currentTime - self.last_triggered < self.cooldown:
return

filePath = Path(event.src_path).resolve()
if (filePath.name == "config.yaml" or
"plugins" in str(filePath) and filePath.suffix in ['.py', '.yaml']):
logger.info(f"检测到文件变化: {filePath}")
self.last_triggered = currentTime
if self.waiting:
logger.info("检测到文件改变,正在重启...")
self.waiting = False
self.restartCallback()

重启程序方法

1
2
3
4
5
6
7
8
9
10
11
12
13
def restartProgram(observer=None):
"""重启程序, 别处也可调用"""
logger.info("正在重启程序...")
# 清理资源
if observer: # observer是watchdog中的对象
observer.stop()
try:
import multiprocessing.resource_tracker
multiprocessing.resource_tracker._resource_tracker.clear()
except Exception as e:
logger.warning(f"清理资源时出错: {e}")
# 重启程序
os.execv(sys.executable, [sys.executable] + sys.argv)

2. 插件管理

由于机器人项目中几乎所有功能都是插件化实现的, 因此我需要先实现插件管理功能

插件管理分为如下几个功能模块:

  • 插件基类 & 插件启用/禁用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#! utils/plugin.py
class PluginBase(ABC):
"""插件基类"""

# 插件元数据
description: str = "暂无描述"
author: str = "未知"
version: str = "1.0.0"

def __init__(self):
self.enabled = False
self._scheduled_jobs = set()

async def on_enable(self, bot=None):
"""插件启用时调用"""

# 定时任务
for method_name in dir(self):
method = getattr(self, method_name)
if hasattr(method, '_is_scheduled'):
job_id = getattr(method, '_job_id')
trigger = getattr(method, '_schedule_trigger')
trigger_args = getattr(method, '_schedule_args')

add_job_safe(scheduler, job_id, method, bot, trigger, **trigger_args)
self._scheduled_jobs.add(job_id)
if self._scheduled_jobs:
logger.success(f"插件 {self.__class__.__name__} 已加载定时任务: {self._scheduled_jobs}")

async def on_disable(self):
"""插件禁用时调用"""

# 移除定时任务
for job_id in self._scheduled_jobs:
remove_job_safe(scheduler, job_id)
logger.info("已卸载定时任务: {}", self._scheduled_jobs)
self._scheduled_jobs.clear()

async def async_init(self):
"""插件异步初始化"""
return
  • 插件(批量)加载/卸载/重载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class PluginManager:
def __init__(self):
self.plugins: Dict[str, PluginBase] = {}
self.plugin_classes: Dict[str, Type[PluginBase]] = {}
self.plugin_info: Dict[str, dict] = {} # 新增:存储所有插件信息

self.excluded_plugins = config.RobotConfig["disabledPlugins"]

async def load_plugin(self, bot: WechatAPIClient, plugin_class: Type[PluginBase], is_disabled: bool = False) -> bool:
"""加载单个插件, 接受Type[PluginBase]"""
try:
plugin_name = plugin_class.__name__

# 防止重复加载插件
if plugin_name in self.plugins:
return False

# 记录插件信息,即使插件被禁用也会记录
self.plugin_info[plugin_name] = {
"name": plugin_name,
"description": plugin_class.description,
"author": plugin_class.author,
"version": plugin_class.version,
"enabled": False,
"class": plugin_class
}

# 如果插件被禁用则不加载
if is_disabled:
return False

plugin = plugin_class()
EventManager.bind_instance(plugin)
await plugin.on_enable(bot)
await plugin.async_init()
self.plugins[plugin_name] = plugin
self.plugin_classes[plugin_name] = plugin_class
self.plugin_info[plugin_name]["enabled"] = True
return True
except:
logger.error(f"加载插件时发生错误: {traceback.format_exc()}")
return False

async def unload_plugin(self, plugin_name: str) -> bool:
"""卸载单个插件"""
if plugin_name not in self.plugins:
return False

# 防止卸载 ManagePlugin
if plugin_name == "ManagePlugin":
logger.warning("ManagePlugin 不能被卸载")
return False

try:
plugin = self.plugins[plugin_name]
await plugin.on_disable()
EventManager.unbind_instance(plugin)
del self.plugins[plugin_name]
del self.plugin_classes[plugin_name]
if plugin_name in self.plugin_info.keys():
self.plugin_info[plugin_name]["enabled"] = False
return True
except:
logger.error(f"卸载插件 {plugin_name} 时发生错误: {traceback.format_exc()}")
return False

async def unload_all_plugins(self) -> tuple[List[str], List[str]]:
"""卸载所有插件"""
unloaded_plugins = []
failed_unloads = []
for plugin_name in list(self.plugins.keys()):
if await self.unload_plugin(plugin_name):
unloaded_plugins.append(plugin_name)
else:
failed_unloads.append(plugin_name)
return unloaded_plugins, failed_unloads

async def reload_plugin(self, bot: WechatAPIClient, plugin_name: str) -> bool:
"""重载单个插件"""
if plugin_name not in self.plugin_classes:
return False

# 防止重载 ManagePlugin
if plugin_name == "ManagePlugin":
logger.warning("ManagePlugin 不能被重载")
return False

try:
# 获取插件类所在的模块
plugin_class = self.plugin_classes[plugin_name]
module_name = plugin_class.__module__

# 先卸载插件
if not await self.unload_plugin(plugin_name):
return False

# 重新导入模块
module = importlib.import_module(module_name)
importlib.reload(module)

# 从重新加载的模块中获取插件类
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginBase) and
obj != PluginBase and
obj.__name__ == plugin_name):
# 使用新的插件类而不是旧的
return await self.load_plugin(bot, obj)

return False
except Exception as e:
logger.error(f"重载插件 {plugin_name} 时发生错误: {e}")
return False

async def reload_all_plugins(self, bot: WechatAPIClient) -> List[str]:
"""重载所有插件

Returns:
List[str]: 成功重载的插件名称列表
"""
try:
# 记录当前加载的插件名称,排除 ManagePlugin
original_plugins = [name for name in self.plugins.keys() if name != "ManagePlugin"]

# 卸载除 ManagePlugin 外的所有插件
for plugin_name in original_plugins:
await self.unload_plugin(plugin_name)

# 重新加载所有模块
for module_name in list(sys.modules.keys()):
if module_name.startswith('plugins.') and not module_name.endswith('ManagePlugin'):
del sys.modules[module_name]

# 从目录重新加载插件
return await self.load_plugins_from_directory(bot)

except:
logger.error(f"重载所有插件时发生错误: {traceback.format_exc()}")
return []
  • 从文件夹中加载插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
async def load_plugins_from_directory(self, bot: WechatAPIClient, load_disabled_plugin: bool = True) -> Union[List[str], bool]:
"""从plugins目录批量加载插件"""
loaded_plugins = []

for dirname in os.listdir("plugins"):
if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"):
try:
module = importlib.import_module(f"plugins.{dirname}.main")
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, PluginBase) and obj != PluginBase:
is_disabled = False
if not load_disabled_plugin:
is_disabled = obj.__name__ in self.excluded_plugins

if await self.load_plugin(bot, obj, is_disabled=is_disabled):
loaded_plugins.append(obj.__name__)

except:
logger.error(f"加载 {dirname} 时发生错误: {traceback.format_exc()}")
return False

return loaded_plugins

async def load_plugin_from_directory(self, bot: WechatAPIClient, plugin_name: str) -> bool:
"""从plugins目录加载单个插件

Args:
bot: 机器人实例
plugin_name: 插件类名称(不是文件名)

Returns:
bool: 是否成功加载插件
"""
found = False
for dirname in os.listdir("plugins"):
try:
if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"):
module = importlib.import_module(f"plugins.{dirname}.main")
importlib.reload(module)

for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginBase) and
obj != PluginBase and
obj.__name__ == plugin_name):
found = True
return await self.load_plugin(bot, obj)
except:
logger.error(f"检查 {dirname} 时发生错误: {traceback.format_exc()}")
continue

if not found:
logger.warning(f"未找到插件类 {plugin_name}")
return False
  • 插件信息获取
1
2
3
4
5
6
7
8
9
10
11
12
def get_plugin_info(self, plugin_name: str = None) -> Union[dict, List[dict]]:
"""获取插件信息

Args:
plugin_name: 插件名称, 如果为None则返回所有插件信息

Returns:
如果指定插件名,返回单个插件信息字典;否则返回所有插件信息列表
"""
if plugin_name:
return self.plugin_info.get(plugin_name)
return list(self.plugin_info.values())
  • (todo) 插件依赖获取
  • (todo) 插件设置GUI

3. 定时任务

在微信机器人中, 有很多任务都需要定时执行, 比如每天定时发送报纸, 清空用户签到状态等 因此需要封装一个类让插件方便地规定定时任务, 在此项目中, 我选择了APScheduler 和装饰器来实现 > APScheduler有一个特色表达式: Cron, 具体语法和用法请参阅 cron表达式

  • 定时任务增删

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def add_job_safe(scheduler: AsyncIOScheduler, job_id: str, func: Callable, bot, trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args):
    """添加函数到定时任务中,如果存在则先删除现有的任务"""
    try:
    scheduler.remove_job(job_id)
    except:
    pass
    scheduler.add_job(func, trigger, args=[bot], id=job_id, **trigger_args)

    def remove_job_safe(scheduler: AsyncIOScheduler, job_id: str):
    """从定时任务中移除任务"""
    try:
    scheduler.remove_job(job_id)
    except:
    pass

  • 定时任务装饰器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import AsyncIOScheduler
    from apscheduler.triggers.cron import CronTrigger
    from apscheduler.triggers.interval import IntervalTrigger

    scheduler = AsyncIOScheduler()


    def schedule(trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args) -> Callable:
    """
    定时任务装饰器

    例子:

    - @schedule('interval', seconds=30)
    - @schedule('cron', hour=8, minute=30, second=30)
    - @schedule('date', run_date='2024-01-01 00:00:00')
    """

    def decorator(func: Callable):
    job_id = f"{func.__module__}.{func.__qualname__}"

    @wraps(func)
    async def wrapper(self, *args, **kwargs):
    return await func(self, *args, **kwargs)

    setattr(wrapper, '_is_scheduled', True)
    setattr(wrapper, '_schedule_trigger', trigger)
    setattr(wrapper, '_schedule_args', trigger_args)
    setattr(wrapper, '_job_id', job_id)

    return wrapper

    return decorator

4. 事件映射

在插件化程序中, 事件处理是核心部分, 需要定义一个类来管理事件, 并将事件映射到插件中

  • 消息处理装饰器 >装饰器可以用来给函数添加额外的功能, 在LegendWeChatBot中, 使用装饰器来规范并简化插件处理消息的逻辑

在装饰器中添加了优先级和时间类型, 用来对不同类型的事件进行排序和筛选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#! utils/decorators.py
def on_message(priority=50):
"""消息装饰器, 具体消息参见具体文件"""

def decorator(func):
if callable(priority):
f = priority
setattr(f, '_event_type', 'message')
setattr(f, '_priority', 50)
return f
setattr(func, '_event_type', 'other_message')
setattr(func, '_priority', min(max(priority, 0), 99))
return func

return decorator if not callable(priority) else decorator(priority)


'''demo'''
@on_message
async def demo(bot, message):
...

  • 事件映射管理 >缺对象吗? 来Python这儿找一个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#! utils/eventManager.py
import copy
from typing import Callable, Dict, List

class EventManager:
_handlers: Dict[str, List[tuple[Callable, object, int]]] = {}

@classmethod
def bind_instance(cls, instance: object):
"""将实例绑定到对应的事件处理函数"""
for method_name in dir(instance):
method = getattr(instance, method_name)
if hasattr(method, '_event_type'):
event_type = getattr(method, '_event_type')
priority = getattr(method, '_priority', 50)

if event_type not in cls._handlers:
cls._handlers[event_type] = []
cls._handlers[event_type].append((method, instance, priority))
# 按优先级排序,优先级高的在前
cls._handlers[event_type].sort(key=lambda x: x[2], reverse=True)

@classmethod
async def emit(cls, event_type: str, *args, **kwargs) -> None:
"""触发事件"""
if event_type not in cls._handlers:
return

api_client, message = args
for handler, instance, priority in cls._handlers[event_type]:
# 只对 message 进行深拷贝,api_client 保持不变
handler_args = (api_client, copy.deepcopy(message))
new_kwargs = {k: copy.deepcopy(v) for k, v in kwargs.items()}

result = await handler(*handler_args, **new_kwargs)

if isinstance(result, bool):
# True 继续执行 False 停止执行
if not result:
break
else:
continue # 我也不知道你返回了个啥玩意,反正继续执行就是了

@classmethod
def unbind_instance(cls, instance: object):
"""解绑实例的所有事件处理函数"""
for event_type in cls._handlers:
cls._handlers[event_type] = [
(handler, inst, priority)
for handler, inst, priority in cls._handlers[event_type]
if inst is not instance
]
  • 定时任务管理 >定时任务管理器,用于管理插件中需要实现的定时任务,包括添加、删除、执行任务等操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#! utils/decorators.py
def schedule(trigger: Union[str, CronTrigger, IntervalTrigger], **trigger_args
) -> Callable:
"""
定时任务装饰器

例子:

- @schedule('interval', seconds=30)
- @schedule('cron', hour=8, minute=30, second=30)
- @schedule('date', run_date='2024-01-01 00:00:00')
"""

def decorator(func: Callable):
job_id = f"{func.__module__}.{func.__qualname__}"

@wraps(func)
async def wrapper(self, *args, **kwargs):
return await func(self, *args, **kwargs)

setattr(wrapper, '_is_scheduled', True)
setattr(wrapper, '_schedule_trigger', trigger)
setattr(wrapper, '_schedule_args', trigger_args)
setattr(wrapper, '_job_id', job_id)

return wrapper

return decorator


def add_job_safe(scheduler: AsyncIOScheduler, job_id: str, func: Callable, bot, trigger: Union[str, CronTrigger, intervalTrigger], **trigger_args):
"""添加函数到定时任务中,如果存在则先删除现有的任务"""
try:
scheduler.remove_job(job_id)
except:
pass
scheduler.add_job(func, trigger, args=[bot], id=job_id, **trigger_args)


def remove_job_safe(scheduler: AsyncIOScheduler, job_id: str):
"""从定时任务中移除任务"""
try:
scheduler.remove_job(job_id)
except:
pass

todo list

  • 整理微信数据库操作模块
  • 初步添加功能
  • 初步制定命令格式
  • 完善开发和使用文档

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者