LegendWechatBot 项目进程 Week3(2025-03-17 ~ 2025-03-23)
系统功能更新
数据库线程安全
在Week2中, 已经实现了数据库的增删改查, 但是在多线程环境下,
需要保证数据库的线程安全,
防止多个线程同时操作数据库导致数据不一致的问题. 否则会报错:
sqlite3.OperationalError: database is locked
MessageDB
使用lock对数据库操作进行加锁, 保证数据库操作的线程安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23async def save_message(self, msg: WxMsg, self_wxid) -> bool:
"""异步保存消息到数据库"""
async with self._lock:
async with self._async_session_factory() as session:
try:
message = Message(
msg_id=msg.id,
type=msg.type,
xml=msg.xml,
content=msg.content,
extra=msg.extra,
sender=msg.sender,
roomid=msg.roomid,
is_at=msg.is_at(self_wxid),
timestamp=datetime.now()
)
session.add(message)
await session.commit()
return True
except Exception as e:
logger.error(f"保存消息失败: {str(e)}")
await session.rollback()
return False
LegendBotDB
使用队列来确保每个操作步骤不冲突 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24def _execute_in_queue(self, method, *args, **kwargs):
"""在队列中执行数据库操作"""
future = self.executor.submit(method, *args, **kwargs)
try:
return future.result(timeout=20) # 20秒超时
except Exception as e:
logger.error(f"数据库操作失败: {method.__name__} - {str(e)}")
raise
def demo(self, wxid: str) -> int:
return self._execute_in_queue(self._demo, wxid)
def _demo(self, wxid: str) -> int:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if user:
...
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}状态获取失败, 错误: {e}")
return False
finally:
session.close()
重写数据库表
在之前的版本中, 由于使用的框架不同, 返回的字段也不尽相同,
此次针对Wcf框架更新了数据库User表格式
1
2
3
4
5
6
7
8
9
10class User(Base):
__tablename__ = 'user'
wxid = Column(String(20), primary_key=True, nullable=False, unique=True, index=True, autoincrement=False, comment='wxid')
points = Column(Integer, nullable=False, default=0, comment='points')
running = Column(Integer, nullable=False, default=False, comment='running')
signin_stat = Column(DateTime, nullable=False, default=datetime.datetime.fromtimestamp(0), comment='signin_stat')
signin_streak = Column(Integer, nullable=False, default=0, comment='signin_streak')
blacked = Column(Integer, nullable=False, default=-2, comment='black')
llm_thread_id = Column(JSON, nullable=False, default=lambda: {}, comment='llm_thread_id')
技术突破
引用消息解析
在机器人中, 涉及到不那么复杂的上下文时,
引用消息(类型为49)是一种非常优雅快速的解决方案. 在wcf框架中,
引用消息的返回分为主消息和被引用消息 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def get_quote_msg(self, bot: LegendWechatBot, msg: WxMsg):
try:
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None
# 必须用lxml-xml, 原因在Week2文章中已写明
bs = BeautifulSoup(msg.content, 'lxml-xml')
# 主消息
msg.content = bs.find('title').text
# 被引用消息
quote = bs.find('refermsg')
quote_type = quote.find('type') # 被引用的消息类型
quote_sender = quote.find('chatusr').text # 被引用消息的发送者
quote_id = int(quote.find('svrid').text) # 被引用消息的id
# 在数据库中根据id查询
quoteMsg = await MessageDB().get_messages(msg_id=quote_id)
if not quoteMsg:
return # 引用消息不存在
quoteMsg = quoteMsg[0]
sync->async
在机器人中, 有很多地方需要等待异步操作完成, 例如获取消息,
获取用户信息等, 在之前的版本中, 很多方法都是同步(sync)执行的,
导致程序运行缓慢, 也没有很好的办法将其完美转变成异步(async)执行.
在此次更新中, 封装了一个装饰器, 使任何函数都可以轻松地变成异步函数,
并完美呈现出异步效果 >爽飞了 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24# utils/decorators.py
from typing import Callable, TypeVar
from typing_extensions import ParamSpec
from collections.abc import Coroutine
from functools import partial
import asyncio
P = ParamSpec("P")
R = TypeVar("R")
def run_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]:
"""一个用于包装 sync function 为 async function 的装饰器
参数:
call: 被装饰的同步函数
"""
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_running_loop()
pfunc = partial(call, *args, **kwargs)
result = await loop.run_in_executor(None, pfunc)
return result
return _wrapper
本地包导入相关问题
这个问题特别特别特别烦银, 最后被我斩于马下, 详细解决方案请参见 本地包导入相关问题
新增功能
菜单功能
在机器人中, 有许多插件功能, 需要提供一个整合好的字典对象,
存放所有插件的公开信息, 方便用户查看和操作 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23# plugins/Menu/main.py
def load_folders(self):
self.info = {}
folders = os.listdir(os.path.join(os.path.abspath(__file__), '../..'))
folders.remove('Menu')
for folder in folders:
with open(os.path.join(os.getcwd(), 'plugins', folder, 'config.yaml'), "rb") as f:
plugin_config = yaml.safe_load(f)
name = list(plugin_config.keys())[0]
plugin_config = plugin_config[name]
# 如果插件同时包含简介, 基础命令以及启用, 才能被菜单识别
if 'description' not in list(plugin_config.keys()) or not plugin_config['enable'] or 'cmd' not in list(plugin_config.keys()):
logger.debug(folder)
continue
self.info[folder] = {
'name': name,
'cmd': plugin_config['cmd'],
'description': plugin_config["description"],
}
logger.debug(self.info)
图片系统
很早之前就想要做一个图片系统, 用来存放用户个人的图片,
方便用于其他插件,
现在在Meme插件的简介催促下终于实现了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63# plugins/ImageDeal/main.py
async def dealImage(self, bot: LegendWechatBot, msg: WxMsg):
if not self.enable:
return
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None
if msg.content == '图片':
bot.sendMsg('图片相关功能, 详细请发送具体命令前缀查看`下载图片` 下载图片\n`删除图片 图片名.后缀名` 删除图片\n`重命名图片 图片名.后缀名 新图片名.后缀名` 重命名图片', to, at)
#* 下载图片
if msg.content == '下载图片':
bot.sendMsg('下载图片, 用于制作表情包等插件功能\n命令格式: `下载图片`, 并引用需要下载的图片(必须是自己发的, 引用他人发的无效, 引用文件无效)\n每个人最多同时存在5张图片, 总大小不超过20MB', to, at)
return
#* 删除图片
if msg.content == '删除图片':
bot.sendMsg('删除图片, 命令格式: `删除图片 图片名.后缀名(在下载成功后返回)`', to, at)
return
if msg.content.startswith('删除图片 '):
msg.content = msg.content.replace('删除图片 ', '')
if not os.path.exists(os.path.join(self.folder, msg.sender, msg.content)):
bot.sendMsg('图片不存在', to, at)
return
os.remove(os.path.join(self.folder, msg.sender, msg.content))
bot.sendMsg('删除成功', to, at)
return
#* 重命名图片
if msg.content == '重命名图片':
bot.sendMsg('重命名图片, 命令格式: `重命名图片 图片名.后缀名(在下载成功后返回) 新图片名.后缀名(命名限制为windows限制, 另外不能有空格)`', to, at)
return
if msg.content.startswith('重命名图片 '):
msg.content = msg.content[6: ]
if not os.path.exists(os.path.join(self.folder, msg.sender, msg.content.split(' ')[0])):
bot.sendMsg('图片不存在', to, at)
return
if len(msg.content.split(' ')) != 2:
bot.sendMsg('命令格式错误', to, at)
return
if not os.path.exists(os.path.join(self.folder, msg.sender)):
os.mkdir(os.path.join(self.folder, msg.sender))
if os.path.exists(os.path.join(self.folder, msg.sender, msg.content.split(' ')[1])):
bot.sendMsg('新图片名已存在', to, at)
return
filename = msg.content.split(' ')[1]
if self.is_valid_filename(filename) and (filename.endswith('.jpg') or filename.endswith('.png')):
os.rename(os.path.join(self.folder, msg.sender, msg.content.split(' ')[0]), os.path.join(self.folder, msg.sender, msg.content.split(' ')[1]))
bot.sendMsg('重命名成功', to, at)
return
else:
bot.sendMsg('新图片名包含非法字符', to, at)
return
在重命名文件时, 要对文件名进行合法性检查, 防止注入或其他风险
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# plugins/ImageDeal/main.py
def is_valid_filename(self, filename) -> bool:
# 检查文件名是否包含非法字符
if re.search(r'[<>:"/\\|?*]', filename):
return False
# 检查文件名是否以空格或句点结尾
if filename.endswith(' ') or filename.endswith('.'):
return False
# 检查文件名是否为保留名称
reserved_names = [
"CON", "PRN", "AUX", "NUL",
"COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"
]
if filename.upper() in reserved_names:
return False
return True
除此之外, 下载文件需要在收到引用消息时执行 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55on_quote_message
async def downloadImage(self, bot: LegendWechatBot, msg: WxMsg):
try:
if not self.enable:
return
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None
bs = BeautifulSoup(msg.content, 'lxml-xml')
msg.content = bs.find('title').text
quote = bs.find('refermsg')
if msg.content == '下载图片':
if quote is None:
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return
if quote.find('type').text != '3':
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return
if quote.find('chatusr').text != msg.sender:
bot.sendMsg('引用无效, 这不是你发的', to, at)
return
if not os.path.exists(os.path.join(self.folder, msg.sender)):
os.mkdir(os.path.join(self.folder, msg.sender))
if len(os.listdir(os.path.join(self.folder, msg.sender))) > 5 or self.calcSize(msg.sender) > 20:
bot.sendMsg('图片数量或大小超过限制, 请删除一些图片', to, at)
return
msgId = int(quote.find('svrid').text)
logger.debug(f"msgId: {msgId}")
quoteMsg = await MessageDB().get_messages(msg_id=msgId)
if not quoteMsg:
bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at)
return
res = await run_sync(bot.download_image)(msgId, quoteMsg[0].extra, os.path.abspath(os.path.join(self.folder, msg.sender)), 10)
if not res:
bot.sendMsg('图片下载失败', to, at)
logger.warning(f"图片下载失败, msgId: {msgId}")
else:
bot.sendMsg(f'图片下载完成, 保存为{os.path.basename(res)}', to, at)
except Exception as e:
logger.error(f"图片下载失败, msgId: {msgId}, error: {e}, traceback: {traceback.format_exc()}")
bot.sendMsg('图片下载失败', to, at)
return
即便如此, 微信机器人终究是在自己机子上跑的, 不能用来被当作网盘,
人家网盘还有空间限制呢 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def calcSize(self, wxid):
# 计算文件夹总大小
size = 0
for root, _, files in os.walk(os.path.join(self.folder, wxid)):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size / 1024 / 1024
# 每天自动清理文件夹
async def _del(self):
if self.autoDel:
try:
shutil.rmtree(self.folder)
os.mkdir(self.folder)
except:
logger.error("删除文件夹失败")
表情包生成
算是一个比较大的插件了, 项目来自于 meme-generator,
自己做了下接口的整合, 还有那个烦人的相对引入问题😡
表情包生成相关的方法参见meme项目仓库 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28# plugins/Meme/meme_processor.py
def generate_meme(
key: str, images: list[str], texts: list[str], args: dict[str, Any], msg: WxMsg
) -> str:
try:
meme = get_meme(key)
except NoSuchMeme:
return f'表情 "{key}" 不存在!', None
imgs = []
# 主要改了这里, 防止注入, 并兼容ImageDeal插件
for image in images:
image = os.path.basename(image)
img: Path = Path().cwd() / 'plugins/ImageDeal/images' / msg.sender / image
if not Path(img).exists():
return f'图片路径 "{image}" 不存在!', None
imgs.append(img)
try:
result = meme(images=imgs, texts=texts, args=args)
content = result.getvalue()
ext = filetype.guess_extension(content)
filename = f"plugins/Meme/temp/result{uuid.uuid4()}.{ext}"
with open(filename, "wb") as f:
f.write(content)
return f'表情制作成功!', filename
except MemeGeneratorException as e:
return str(e), None
todo list
- api相关方法封装
- 订阅相关方法封装
- 部署文档
- 规则文档
- 使用文档
项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助
如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者