LegendWechatBot 项目进程 Week4(2025-03-24 ~ 2025-03-31)

系统功能更新

消息处理协程锁

由于在先前的版本中, 为了避免风控检测, 在处理消息前加了协程锁LegendSemaphore, 导致消息处理在有的时候会抽风堵塞, 因此在本次更新中, 移除了每条消息前的锁, 改为了在发送文字消息前的随机延迟等待

1
time.sleep(random.randint(1, 3) / random.randint(2, 10))

那协程锁白写了吗?当然不是, 改成AI消息处理限制, 防止api爆炸

消息处理逻辑优化

由于积分系统和黑白名单系统的加入, 因此在消息处理前增添了判断逻辑以及自动踢人

1
2
3
4
5
6
7
8
9
10
11
12
# utils/LegendBot.py
if (
(msg.from_group() and self.DB.get_chatroom_whitelist(to) and self.DB.get_black(msg.sender) <= config.RobotConfig['black']) # 群聊且满足条件
or (not msg.from_group() and self.DB.get_black(msg.sender) <= config.RobotConfig['black']) # 私聊且满足条件
or msg.sender in config.admin # 来自管理员
):
...

# 自动踢人
elif self.DB.get_black(msg.sender) > config.RobotConfig['black'] and msg.from_group():
self.bot.sendMsg('你坏事做尽, 被移除群聊, 欢迎找kanwuqing面议解封, 有偿解封所得分发给群友作精神补偿', to, at)
self.bot.del_chatroom_members(msg.roomid, msg.sender)

数据库积分相关方法更新

添加了积分相关的处理逻辑, 实现了积分的增删改查, 以及更新用户签到状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# database/LegendBotDB.py
# 查看积分
def get_points(self, wxid: str) -> int:
return self._execute_in_queue(self._get_points, wxid)

def _get_points(self, wxid: str) -> int:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
return user.points if user else 0
finally:
session.close()

# 增减积分
def add_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point addition"""
return self._execute_in_queue(self._add_points, wxid, num)

def _add_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point addition"""
session = self.DBSession()
try:
# Use UPDATE with atomic operation
result = session.execute(
update(User)
.where(User.wxid == wxid)
.values(points=User.points + num)
)
if result.rowcount == 0:
# User doesn't exist, create new
user = User(wxid=wxid, points=num)
session.add(user)
logger.info(f"数据库: 用户{wxid}积分增加{num}")
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}积分增加失败, 错误: {e}")
return False
finally:
session.close()

# 设置积分
def set_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point setting"""
return self._execute_in_queue(self._set_points, wxid, num)

def _set_points(self, wxid: str, num: int) -> bool:
"""Thread-safe point setting"""
session = self.DBSession()
try:
result = session.execute(
update(User)
.where(User.wxid == wxid)
.values(points=num)
)
if result.rowcount == 0:
user = User(wxid=wxid, points=num)
session.add(user)
logger.info(f"数据库: 用户{wxid}积分设置为{num}")
session.commit()
return True
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}积分设置失败, 错误: {e}")
return False
finally:
session.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# database/LegendBotDB.py
def get_signin_stat(self, wxid: str) -> datetime.datetime:
"""获取用户签到状态"""
return self._execute_in_queue(self._get_signin_stat, wxid)

def _get_signin_stat(self, wxid: str) -> datetime.datetime:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if not user:
return [None, None, None]
return [user.lastSign, user.maxSign, user.fortune]
finally:
session.close()

def set_signin_stat(self, wxid: str, fortune: str) -> bool:
"""更新用户签到状态"""
return self._execute_in_queue(self._set_signin_stat, wxid, fortune)

def _set_signin_stat(self, wxid: str, fortune: str) -> bool:
session = self.DBSession()
try:
user = session.query(User).filter_by(wxid=wxid).first()
if user:
# 获取实际的 lastSign 值
last_sign = user.lastSign
max_sign = user.maxSign
points = user.points

# 计算新的 maxSign 和积分
new_max_sign = max_sign + 1 if last_sign and datetime.datetime.today().date() == (last_sign + datetime.timedelta(days=1)).date() else max_sign
new_points = points + config.RobotConfig['signPoint'] + min(10, new_max_sign)

# 更新用户数据
session.execute(
update(User)
.where(User.wxid == wxid)
.values(
fortune=fortune,
maxSign=new_max_sign,
lastSign=datetime.datetime.today(),
points=new_points
)
)
else:
# 如果用户不存在,则创建新用户
user = User(wxid=wxid, fortune=fortune, maxSign=1, lastSign=datetime.datetime.today(), points=11)
session.add(user)

logger.info(f"数据库: 用户{wxid}签到状态设置成功")
session.commit()
return [user.lastSign, user.maxSign, user.fortune]
except SQLAlchemyError as e:
session.rollback()
logger.error(f"数据库: 用户{wxid}状态设置失败, 错误: {e}")
return False
finally:
session.close()

插件更新

api版本

VVQuest

由于VVQuest项目作者Daniel提供了api版本, 为提升插件运行速度, 因此决定保留本地及其部署方式作备用与参考, 并将插件更新为api版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# plugins/VVQuest/main.py
async def VVQuest(self, bot, msg):
...
try:
"""这里用aiohttp很耗时不知道为什么"""
res = await run_sync(requests.get)(f'https://api.zvv.quest/search?q={query}&n=1', timeout=20)

logger.debug(res)
res = res.json()

bot.send_image(res['data'][0], to)
LegendBotDB().add_points(msg.sender, -1)

LegendBotDB().set_running(msg.sender, False)

"""超时则用本地搜索"""
except TimeoutError:
res = await run_sync(self.im.search)(query, 1)

if len(res) == 0:
bot.sendMsg('未找到相关表情包', to, at)
return

original_file = res[0].replace('\\', '/')
file_extension = os.path.splitext(original_file)[1]
hash_object = hashlib.md5(original_file.encode())
hashed_filename = hash_object.hexdigest() + file_extension
temp_dir = 'plugins/VVQuest/cache'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
temp_file_path = os.path.join(temp_dir, hashed_filename)
shutil.copyfile(original_file, temp_file_path)

# 获取绝对路径
abs_temp_file_path = os.path.abspath(temp_file_path)

# 发送文件
bot.send_image(abs_temp_file_path, to)

# 删除临时文件
os.remove(abs_temp_file_path)

except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())

肯德基疯狂星期四文案

每周都要吃肯德基的朋友有福了, 以后每周都有不同的理由让朋友v你50了

api来自pearAPI提供, 感谢

首先是肯德基文案获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@on_text_message
async def kfc(self, bot: LegendWechatBot, msg: WxMsg):
if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

if not self.enable:
return

if msg.content == 'kfc':
async with aiohttp.ClientSession() as session:
url = f"https://api.pearktrue.cn/api/kfc?type=json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()
bot.sendMsg(rsp1['text'].replace('\\n', '\n'), to, at)

每周四还有定时发送功能😄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@schedule('cron', day_of_week='thu', hour=17, minute=0, second=0, misfire_grace_time=None)
async def send_kfc(self, bot: LegendWechatBot):
if not self.enable:
return

for group in LegendBotDB().get_chatroom_list():
async with aiohttp.ClientSession() as session:
url = f"https://api.pearktrue.cn/api/kfc?type=json"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()
bot.sendMsg(rsp1['text'], group)

天气预报

api来自free-api提供, 感谢

天气预报共实现了如下功能 - 查询天气

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# plugins/Weather/main.py
if msg.content.count(' ') == 1:
city, day = msg.content.split(' ')
else:
city, day = msg.content, 0
if day >= 3:
return

async with aiohttp.ClientSession() as session:
url = f"https://api.seniverse.com/v3/weather/daily.json?key={self.key}&location={city}&language=zh-Hans&unit=c"
async with session.get(url) as resp:
if resp.status != 200:
logger.warning(f"天气查询失败: {resp.status}")
return
rsp1 = await resp.json()

if 'status_code' in rsp1 and rsp1['status_code'] == "AP010006":
bot.sendMsg("城市名错误, 请重新输入", to, at)
return

rsp = rsp1["results"][0]["daily"][day]
upd = rsp1['results'][0]['last_update']
res = f"{city}{rsp['date']}天气, 更新于{upd}\n白天天气:{rsp['text_day']}, 夜间天气:{rsp['text_night']}\n最高温: {rsp['high']}, 最低温: {rsp['low']}\n降水概率: {rsp['precip']}%, 湿度: {rsp['humidity']}\n风力风向: {rsp['wind_direction']}{rsp['rainfall']}级, 风速: {rsp['wind_speed']}"

bot.sendMsg(res, to, at)
LegendBotDB().add_points(msg.sender, -1)

  • 预订天气预报

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # plugins/Weather/main.py
    if msg.content.startswith("预报 ") and msg.from_group():
    city = msg.content[3:]

    async with aiohttp.ClientSession() as session:
    url = f"https://api.seniverse.com/v3/weather/daily.json?key={self.key}&location={city}&language=zh-Hans&unit=c"
    async with session.get(url) as resp:
    if resp.status != 200:
    logger.warning(f"天气查询失败: {resp.status}")
    return
    rsp1 = await resp.json()

    if 'status_code' in rsp1 and rsp1['status_code'] == "AP010006":
    bot.sendMsg("城市名错误, 请重新输入", to, at)
    return

    if to in self.subs:
    if city in self.subs[to]:
    bot.sendMsg("该城市已订阅, 请勿重复订阅", to, at)
    return
    if len(self.subs[to]) > 5:
    bot.sendMsg("该群聊已订阅5个城市, 请先取消订阅", to, at)
    return

    self.subs[to].append(city)
    with open('plugins/Weather/subs.json', 'w', encoding='utf-8') as f:
    json.dump(self.subs, f, ensure_ascii=False, indent=4)
    bot.sendMsg("订阅成功", to, at)
    return

    self.subs[to] = [city]

  • 取消预订天气预报

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # plugins/Weather/main.py
    elif msg.content.startswith("td "):
    city = msg.content[3:]
    if to in self.subs:
    if city in self.subs[to]:
    self.subs[to].remove(city)
    with open('plugins/Weather/subs.json', 'w', encoding='utf-8') as f:
    json.dump(self.subs, f, ensure_ascii=False, indent=4)
    bot.sendMsg("取消订阅成功", to, at)
    return
    else:
    bot.sendMsg("该城市未订阅, 请先订阅", to, at)
    return
    else:
    bot.sendMsg("该群聊未订阅任何城市, 请先订阅", to, at)
    return

AI对话 (初版)

有了AI的接入, 机器人能真正意义上回答所有问题了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""这句很重要"""
async with sem['processAI']:

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

if msg.content.startswith("ai"):
if msg.content == 'ai':
bot.sendMsg("与AI对话, 每次消耗3积分, 命令格式: `ai 模型名称(默认为deepseek V3) 内容`\n目前支持的模型:\n1.v3(deepseekV3), r1(deepseekR1)", to, at)
return

query = msg.content[3:]

if query.count(' ') == 1:
model, query = query.split(' ')

else:
model = "v3"

try:
LegendBotDB().set_running(msg.sender, True)
# 调用 OpenAI API
logger.debug(model)
response = await self.client.chat.completions.create(
model=self.models[model], # 使用的模型
messages=[
# {"role": "system", "content": "你是一个帮助用户回答问题的助手。"},
{"role": "user", "content": query}
],
)

logger.debug('AI已返回')

# 获取 API 返回的内容
result = response.choices[0].message.content.replace("\\n", "\n")

# 发送结果给用户
bot.sendMsg(f"{result}", to, at)
LegendBotDB().set_running(msg.sender, False)
LegendBotDB().add_points(msg.sender, -3)

except Exception as e:
logger.error(f"调用 OpenAI API 时发生错误: {e}")
logger.error(traceback.format_exc())
LegendBotDB().set_running(msg.sender, False)

签到查看运势

机器人的正常运营依赖于插件的正常使用, 插件的正常使用离不开积分的管控和激励, 每天的签到获取积分的同时, 还加入了运势来增加趣味性

运势获取功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# plugins/SignUp/main.py
def get_fortune(self):
w_list = [3, 5, 11, 19, 20, 18, 10, 8, 4, 2]
w_sum = 0
for i in w_list:
w_sum += i
randVal = random.randint(0, w_sum)
rward = 0
for i in range(len(w_list)):
if randVal <= w_list[i]:
rward = i
break
randVal -= w_list[i]
# print('§' + names[rward] + '§')
i1 = random.randint(0, 9)
i2 = random.randint(0, 9)
while i1 == i2:
i2 = random.randint(0, 9)
i3 = random.randint(1, 8)
i4 = random.randint(1, 8)
while i3 == i4:
i4 = random.randint(1, 8)
if rward == 0 or rward == 9:
i3 = rward
i4 = rward
return self.names[rward] + '$' + self.good[i3][i1] + '$' + self.good_text[i3][i1] + '$' + self.good[i3][i2] + '$' + self.good_text[i3][i2] + '$' + self.bad[i4][i1] + '$' + self.bad_text[i4][i1] + '$' + self.bad[i4][i2] + '$' + self.bad_text[i4][i2]

def get_calender(self):
tg='癸甲乙丙丁戊己庚辛壬'
dz='亥子丑寅卯辰已午未申酉戌'
month_list = [None, '一月大', '二月平', '三月大', '四月小', '五月大', '六月小',
'七月大', '八月大', '九月小', '十月大', '十一月小', '十二月大']
week_list = [None, '星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期日']
date = datetime.datetime.now().date()
year = date.year
month = month_list[date.month]
day = date.day
week = week_list[date.isoweekday()]
if week == '星期六' or week == '星期日':
color = 'red'
else:
color = 'green'
year1 = tg[(year - 3) % 10] + dz[(year - 3) % 12]
return {'year': year, 'year1': year1, 'month': month, 'day': day, 'week': week, 'color': color}
展开

在这里插一首歌吧

-

在两年前我就说过, 要让好多好多人看到我们一起构思的运势, 现在这个目标离我越来越近, 但你却离我愈发遥远了...

签到功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# plugins/SignUp/main.py
@on_text_message
async def processSignMsg(self, bot: LegendWechatBot, msg: WxMsg):

if not self.enable:
return

if msg.from_group():
to, at = msg.roomid, msg.sender
else:
to, at = msg.sender, None

try:
if msg.content == '签到':
lastSign, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
if lastSign and datetime.datetime.today().date() == lastSign.date():
bot.sendMsg('你已经签过到了', to, at)
return
else:
fortune = self.get_fortune()
lastSign = self.get_calender()
LegendBotDB().set_signin_stat(msg.sender, fortune)
_, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
fortune = fortune.split('$')

res = '签到成功!\n已连续签到%d天, 积分+%d, 目前积分: %d\n%s-农历%s年\n%s月%s日\n今日运势:%s\n宜:%s-%s\n宜:%s-%s\n忌:%s-%s\n忌:%s-%s\n输入`签到 查看运势/积分`可查询运势和积分' % tuple([maxSign, config.RobotConfig['signPoint'] + min(maxSign, 10), LegendBotDB().get_points(msg.sender), lastSign['year'], lastSign['year1'], lastSign["month"], lastSign["day"]] + fortune)

bot.sendMsg(res, to, at)
return
elif msg.content == '签到 查看积分':
bot.sendMsg('当前积分:%d' % LegendBotDB().get_points(msg.sender), to, at)
return
elif msg.content == '签到 查看运势':
lastSign, maxSign, fortune = LegendBotDB().get_signin_stat(msg.sender)
fortune = fortune.split('$')
res = '今日运势:%s\n宜:%s-%s\n宜:%s-%s\n忌:%s-%s\n忌:%s-%s' % tuple(fortune)

bot.sendMsg(res, to, at)
return

except Exception as e:
logger.warning('签到失败: %s' % e)
logger.error(traceback.format_exc())

todo list

  • 添加AI群聊自定义角色功能
  • 添加AI作图
  • AI聊天上下文功能
  • 完善文档

项目已开源至 Github ,欢迎star和fork 若你觉得对你的开发有帮助, 或是对你的生活提供了方便, 欢迎来 爱发电 赞助 爱发电 如果想一起开发或贡献插件等, 欢迎在相关标准制定后按照标准提交PR, 或 联系作者