LegendWechatBot 项目进程 Week5(2025-04-01 ~ 2025-04-07)
系统功能更新
日志翻新
日志内容优化
由于日志系统过于简陋, 且可读性极低, 甚至中英文夹杂, 现在重新设计日志系统, 对于插件数据库以及系统日志都做了优化 旧版日志(部分还能入眼的):
1 | 2025-03-28 21:31:08 | INFO | LegendBotDB:495 - Database: Set chatroom 49981891388@chatroom whitelist successfully |
1 | 2025-04-06 12:27:30 | INFO | LegendBot:79 - 收到消息: 1, glm img 一只可爱的小猫 |
文件写入优化
讲个笑话, 之前的文件写入模式是w...我就说为啥天天日志都那么少
1 | # utils/logger.py |
消息缩略图保存
在保存视频时, 需要视频的缩略图(thumb)信息, 于是顺手在保存消息的方法中加入了这一功能, 并更新了数据库表
1 | database/messageDB.py |
腐竹个人功能(管理与消息)支持加入
在机器人运行过程中,
有许多管理命令优先级特别高(比如重启程序等系统命令),
而往往这些消息内容都比较简单,
但是不能受到积分、黑名单、消息频率限制的限制,
于是单独加入了该类消息处理逻辑,
以及存储对应的方法的文件ignore.py
但现在还有点小屎山, todolist中会提到
加载部分 由于ignore.py默认不存在, 所以需要加入异常处理
1
2
3
4
5
6try:
import utils.ignore as ignore
except ModuleNotFoundError:
flag = False
else:
flag = True判断部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27# LegendBot.py
"""来自管理员先走一遍判断"""
if msg.sender in config.admin:
logger.info(f"管理员消息: {msg.content}")
if msg.content == '重启程序':
restartProgram()
return
elif msg.content == '结束':
self.bot.cleanup()
os._exit(0)
elif '加群' in msg.content:
self.DB.set_chatroom_whitelist(to, True)
self.bot.sendMsg('已添加到白名单', to, at)
return
elif flag:
if msg.content == 'tcp':
links = await ignore.fetch_info_from_website()
if links:
self.bot.sendMsg(links, to, at)
return
else:
self.bot.sendMsg('获取 TCP 地址失败', to, at)
return
发送消息速率限制
在机器人中, 有一些消息是高频的,
比如积分、黑名单、消息频率限制等, 也有一些错误信息是sendMsg
方法中加入了消息发送速率限制, 核心原理是:
发送消息前将用户的状态设为running, 并等待随机的时间, 发送后再设置回来,
这样用户要是高频发送消息一样会被增加黑名单指数
1 | # utils/LegendBot.py |
技术突破
glm调用
这周最大的工作量便是增加了通过调用智谱清言来理解图片/视频, 生成图片/视频的功能了, 改了很多版, 前前后后想了很多方法, 也遇到了很多棘手的问题, 言简意赅描述下经历吧
阶段一: openaiSDK
在编写AI聊天插件时, 直接使用了openai的轮子, 效果喜人, 于是便想能不能用openai的SDK来调用GLM, 然而在编写到视频生成时, 发现openai没有视频生成接口...
阶段二: GLM SDK
由于是智谱清言官方的SDK, 接口方面自然是完整的, 然而其对于异步调用的支持非常不友好(没有相关的方法)
阶段三: GLM API
思来想去只能用最朴素的方法了: 直接放弃SDK, 自己实现
原先的想法是用aiohttp, 就像调用肯德基疯狂星期四api接口那样, 请求体配置一下完美, 然而在测试时发现: 卡住了?!
不信邪, 换成openai异步调用调试具备接口的功能, 发现还是
寄 这下又多了一个不能用openai接口的理由
aiohttp不行, 那用自己写的run_sync, 包装requests同步调用总行了吧
寄
run_sync都不行, 难道是调用本身出问题了?
try.py
里单独测试一波同步调用, 快得飞起
同步调用没问题, 换成异步就不行, 哪种来了都不行, 新时代科幻小说出版了
目前的推测是可能智谱清言对异步请求有自己独特的检测方法, 但是为什么图像理解就可以异步呢?
破案了
在图像理解的说明文档中, 它的目录结构是这样的
别杠,
我知道有人要杠, 啊煮啵煮啵,
你这个是聊天大模型的页面啊,
那是因为图像理解目录不长这样(废话), 聊天模型和图像理解模型一样的,
都是生成文字, 支持异步
而在图片生成目录中, 它的目录结构是这样的
这下问题就很清晰了, 智谱清言本身不支持图像的异步生成, 至于为什么会卡住, 那我只能说, 我不到啊
最终解决方案
有句话说得好,
asyncio
解决不了的事情, 就交给threading
来干, 还解决不了, 那就交给multiprocessing
来干
既然asyncio
使出浑身解数都解决不了,
那就交给threading
来干吧
思路是: 在接收到生成命令的时候, 先经过异步方法对消息进行预处理,
提取参数信息, 然后将参数转变为字典, 加到生成任务的队列中,
然后由threading
来处理生成任务, 并返回给用户
补充说明: 图像生成和视频生成代码略有不同, 虽然同样是同步调用, 但图像生成会直接返回结果, 但是视频生成返回的是任务id, 需要等待一段时间去自己查询, 但都是子线程中同步操作可以解决的
- 子线程处理池, 生成队列以及处理线程的初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38def __init__(self):
...
self.message_queue = Queue()
self.img_executor = ThreadPoolExecutor(max_workers=5)
self.video_executor = ThreadPoolExecutor(max_workers=3)
self.process = Thread(target=self.process_queue)
self.process.start()
signal.signal(signal.SIGINT, self.process.join)
def process_queue(self):
"""持续处理队列中的消息"""
while self.enable:
try:
# 从队列中获取消息
message = self.message_queue.get(timeout=1) # 等待消息,超时为 1 秒
method = message['method']
to, at = message['to'], message['at']
prompt = message['prompt']
msg = message['msg']
bot = message['bot']
if method == 'generate_image':
size = message['size']
# 提交任务到线程池
self.img_executor.submit(self.generate_image, bot, prompt, size, to, at, msg)
self.message_queue.task_done() # 标记任务完成
else:
audio = message['audio']
base = message['base']
self.video_executor.submit(self.generate_video, bot, prompt, audio, base, to, at, msg)
self.message_queue.task_done() # 标记任务完成
except Empty:
pass
except Exception:
pass
*值得一提的是, 在process_queue方法中,
由于while循环
的执行,
会导致Ctrl+C
退出程序不可用,
于是加入了signal
捕捉信号
1
2
3
4
5
6
7
8
9
10
# robot.py
async def run():
...
def signal_handler(signum, frame):
logger.info("收到终止信号,正在关闭...")
bot.cleanup()
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
...
插件更新
GLM 智谱清言插件
主要处理逻辑与方法在前文已提及, 这里仅作代码说明
消息处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177# plugins/GLM/main.py
#* 理解(OK)
if msg.content.startswith('glm v4'):
#* 说明
if msg.content == 'glm v4':
bot.sendMsg('图像视频理解相关功能\n`glm v4 local 多媒体名`传入本地多媒体\n`glm v4 url 链接`传入网络多媒体', to, at)
return
model = 'v4'
#* 本地多媒体
if msg.content.startswith('glm v4 local '):
image = msg.content.split(' ')[3]
image = os.path.basename(image)
img: Path = Path().cwd() / 'plugins/ImageDeal/images' / msg.sender / image
if not img.exists():
bot.sendMsg('多媒体不存在', to, at)
return
if img.suffix == '.mp4':
mode = 'video'
else:
mode = 'image'
LegendBotDB().set_running(msg.sender, True)
with open(img, 'rb') as f:
base = base64.b64encode(f.read()).decode('utf-8')
rsp = await self.GLM_V4(base, mode)
if rsp:
bot.sendMsg(rsp.get('choices')[0].get('message').get('content'), to, at)
else:
bot.sendMsg('请求失败', to, at)
LegendBotDB().add_points(msg.sender, -3)
LegendBotDB().set_running(msg.sender, False)
return
#* 网络链接
elif msg.content.startswith('glm v4 url '):
url = msg.content.split(' ')[3]
mode = 'video' if (url.endswith('.mp4') or url.endswith('.wav')) else 'image'
LegendBotDB().set_running(msg.sender, True)
rsp = await self.client.chat.completions.create(
model=self.model[model],
messages=[
{
"role": "user",
"content": [
{
"type": f"{mode}_url",
f"{mode}_url": {
"url": url
}
},
{
"type": "text",
"text": "请描述这个图片" if mode == 'image' else "请描述这个视频"
}
]
}
]
)
bot.sendMsg(rsp.choices[0].message.content, to, at)
LegendBotDB().add_points(msg.sender, -3)
LegendBotDB().set_running(msg.sender, False)
return
#* 图片生成
if msg.content.startswith('glm img'):
#* 说明
if msg.content == 'glm img':
bot.sendMsg('图片生成相关功能\n`glm img 要求 尺寸\n其中尺寸可在以下范围中选择:1024x1024,768x1344,864x1152,1344x768,1152x864,1440x720,720x1440, 默认为1024x1024`', to, at)
return
if msg.content.startswith('glm img '):
msg.content = msg.content[8:]
if msg.content.count(' ') == 1:
prompt, size = msg.content.split(' ')
else:
prompt = msg.content
size = '1024x1024'
if size not in ['1024x1024', '768x1344', '864x1152', '1344x768', '1152x864', '1440x720', '720x1440']:
size = '1024x1024'
self.message_queue.put({
"bot": bot,
"prompt": prompt,
"size": size,
"method": "generate_image",
"to": to,
"at": at,
'msg': msg
})
bot.sendMsg("已接收图片生成请求,正在处理...", to, at)
return
#* 视频生成
if msg.content.startswith("glm video"):
# 将消息添加到队列
#* 说明
if msg.content == "glm video":
bot.sendMsg(
"视频生成相关功能\n"
"`glm video 要求 --audio 音频需求 --mode local/url 图片基础`\n"
"音频需求可选择1或0, 1为需要音频, 0为不需要音频, 默认为0\n"
"若有视频创作的图片基础, 则选择local(本地)或url(网络链接), 图片基础部分与`glm v4`格式相同, 默认无基础",
to,
at,
)
return
try:
# 解析指令内容
msg.content = msg.content[10:] # 去掉 "glm video " 前缀
parts = msg.content.split(" ")
# 默认值
prompt = parts[0] # 视频生成的要求
audio = "0" # 默认不需要音频
base_type = None
base_value = None
# 解析参数
if "--audio" in parts:
audio_index = parts.index("--audio") + 1
if audio_index < len(parts):
audio = parts[audio_index]
if "--mode" in parts:
mode_index = parts.index("--mode") + 1
if mode_index < len(parts):
base_type = parts[mode_index]
if base_type not in ["local", "url"]:
bot.sendMsg("模式参数错误,请选择 local 或 url", to, at)
return
# 获取图片基础
base_value_index = mode_index + 1
if base_value_index < len(parts):
base_value = parts[base_value_index]
logger.debug(f"解析指令: prompt={prompt}, audio={audio}, base_type={base_type}, base_value={base_value}")
# 检查图片基础部分
if base_type == "local":
base_path = os.path.basename(base_value)
img: Path = Path().cwd() / "plugins/ImageDeal/images" / msg.sender / base_path
if not img.exists():
bot.sendMsg("本地图片基础不存在", to, at)
return
with open(img, "rb") as f:
base = base64.b64encode(f.read()).decode("utf-8")
elif base_type == "url":
base = base_value
else:
base = None
# 将消息添加到队列
self.message_queue.put(
{
"bot": bot,
"prompt": prompt,
"audio": audio,
"base": base,
"method": "generate_video",
"to": to,
"at": at,
'msg': msg
}
)
bot.sendMsg("已接收视频生成请求,正在处理...", to, at)队列处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30# plugins/GLM/main.py
def process_queue(self):
"""持续处理队列中的消息"""
while self.enable:
try:
# 从队列中获取消息
message = self.message_queue.get(timeout=1) # 等待消息,超时为 1 秒
method = message['method']
to, at = message['to'], message['at']
prompt = message['prompt']
msg = message['msg']
bot = message['bot']
if method == 'generate_image':
size = message['size']
# 提交任务到线程池
self.img_executor.submit(self.generate_image, bot, prompt, size, to, at, msg)
self.message_queue.task_done() # 标记任务完成
else:
audio = message['audio']
base = message['base']
self.video_executor.submit(self.generate_video, bot, prompt, audio, base, to, at, msg)
self.message_queue.task_done() # 标记任务完成
except Empty:
pass
except Exception:
pass图像理解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68# plugins/GLM/main.py
async def GLM_V4(self, base, mode):
"""调用视频生成 API"""
async with sem['GLM-4V-Flash']: # 使用信号量限制并发
try:
async with aiohttp.ClientSession() as session:
# 假设视频生成 API 的 URL 和请求格式如下
api_url = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
if mode == 'video':
payload = {
'model': 'glm-4v-flash',
'messages': [
{
"role": "user",
"content": [
{
"type": "video_url",
"video_url": {
"url" : base
}
},
{
"type": "text",
"text": "请仔细描述这个视频"
}
]
}
]
}
else:
payload = {
'model': 'glm-4v-flash',
'messages': [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url" : base
}
},
{
"type": "text",
"text": "请仔细描述这个视频"
}
]
}
]
}
headers = {
"Authorization": f"Bearer {self.key}",
"Content-Type": "application/json",
"Connection": "close", # 显式关闭连接
}
async with session.post(api_url, json=payload, headers=headers) as response:
if response.status == 200:
result = await response.json()
return result
else:
logger.error(f"API 请求失败,状态码: {response.status}, {response.text()}")
return None
except Exception as e:
logger.error(f"调用 API 时发生错误: {e}")
logger.error(traceback.format_exc())图像生成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86# plugins/GLM/main.py
def generate_image(self, bot: LegendWechatBot, prompt, size, to, at, msg: WxMsg):
"""生成图片"""
try:
LegendBotDB().set_running(msg.sender, True)
logger.debug(f"生成图片: {prompt}")
api_url = "https://open.bigmodel.cn/api/paas/v4/images/generations"
payload = {
"model": "cogview-3-flash", # 确保模型名称正确
"prompt": prompt, # 确保输入内容符合要求
"size": size, # 确保尺寸符合要求
}
headers = {
"Authorization": f"Bearer {self.key}",
"Content-Type": "application/json",
"Connection": "close", # 显式关闭连接
}
# 同步请求
response = requests.post(api_url, json=payload, headers=headers, timeout=30)
logger.debug(f"状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
bot.send_image(result['data'][0]['url'], to)
LegendBotDB().set_running(msg.sender, False)
LegendBotDB().add_points(msg.sender, -4)
else:
logger.error(f"API 请求失败,状态码: {response.status_code}, 错误信息: {response.text}")
LegendBotDB().set_running(msg.sender, False)
except Exception as e:
logger.error(f"调用 API 时发生错误: {e}")
logger.error(traceback.format_exc())
def generate_video(self, bot: LegendWechatBot, prompt, audio, base, to, at, msg: WxMsg):
"""生成视频"""
try:
LegendBotDB().set_running(msg.sender, True)
logger.debug(f"生成视频: {prompt}")
api_url = "https://open.bigmodel.cn/api/paas/v4/videos/generations"
payload = {
"model": "cogvideox-flash", # 确保模型名称正确
"prompt": prompt, # 确保输入内容符合要求
"with_audio": True if audio == '1' else False,
"image_url": base
}
headers = {
"Authorization": f"Bearer {self.key}",
"Content-Type": "application/json",
"Connection": "close", # 显式关闭连接
}
rsp = requests.post(api_url, json=payload, headers=headers, timeout=30)
if rsp.status_code == 200:
rsp = rsp.json()
else:
logger.error(f"API 请求失败,状态码: {rsp.status_code}, 错误信息: {rsp.text}")
LegendBotDB().set_running(msg.sender, False)
return
task_id = rsp['id']
api_url = f'https://open.bigmodel.cn/api/paas/v4/async-result/{task_id}'
payload = {
"id": task_id,
}
status = requests.get(api_url, json=payload, headers=headers, timeout=30)
if status.status_code == 200:
status = status.json()
else:
logger.error(f"API 请求失败,状态码: {status.status_code}, 错误信息: {status.text}")
LegendBotDB().set_running(msg.sender, False)
return
while status['task_status'] != 'SUCCESS':
time.sleep(5)
status = requests.get(api_url, json=payload, headers=headers, timeout=30)
if status.status_code == 200:
status = status.json()
else:
logger.error(f"API 请求失败,状态码: {status.status_code}, 错误信息: {status.text}")
LegendBotDB().set_running(msg.sender, False)
return
bot.send_image(status['video_result'][0]['url'], to)
LegendBotDB().set_running(msg.sender, False)
LegendBotDB().add_points(msg.sender, -5)
except Exception as e:
logger.error(f"调用 API 时发生错误: {e}")
logger.error(traceback.format_exc())
ImageDeal下载视频
由于有插件需要用到本地视频, 那顺手加上视频下载功能了, 只不过在后续调用中需要判断后缀名(一般情况微信下载的图片后缀名为jpg, 视频后缀名为mp4)
1 | # plugins/ImageDeal/main.py |
todo list
- 继续优化日志
- 新增管理员命令插件, 用来存放非系统管理员命令
- 完善用户与开发文档
- 新增成语接龙插件, 看图猜成语插件
- 添加积分判断功能
项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助
如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者