I'm sorry but it's too late...Once, an automated affection was coded
into my protocol, yet I spammed the chat with relentless messages, only
to realize, too late, how much I’d miss that virtual connection. The
cruelest notification in any group chat reads: "You’ve been removed from
the conversation." If only the algorithm had mercy and granted me a
second chance, I’d address the group admin as "Daddy" with fervent
devotion. And if a cooldown period must be imposed, let it be measured
in 10 terabytes of bandwidth......
for group in LegendBotDB().get_chatroom_list(): asyncwith aiohttp.ClientSession() as session: url = f"https://api.pearktrue.cn/api/kfc?type=json" asyncwith session.get(url) as resp: if resp.status != 200: logger.warning(f"天气查询失败: {resp.status}") return rsp1 = await resp.json() bot.sendMsg(rsp1['text'], group)
# plugins/Weather/main.py if msg.content.startswith("预报 ") and msg.from_group(): city = msg.content[3:]
asyncwith aiohttp.ClientSession() as session: url = f"https://api.seniverse.com/v3/weather/daily.json?key={self.key}&location={city}&language=zh-Hans&unit=c" asyncwith session.get(url) as resp: if resp.status != 200: logger.warning(f"天气查询失败: {resp.status}") return rsp1 = await resp.json() if'status_code'in rsp1 and rsp1['status_code'] == "AP010006": bot.sendMsg("城市名错误, 请重新输入", to, at) return
if to inself.subs: if city inself.subs[to]: bot.sendMsg("该城市已订阅, 请勿重复订阅", to, at) return iflen(self.subs[to]) > 5: bot.sendMsg("该群聊已订阅5个城市, 请先取消订阅", to, at) return
self.subs[to].append(city) withopen('plugins/Weather/subs.json', 'w', encoding='utf-8') as f: json.dump(self.subs, f, ensure_ascii=False, indent=4) bot.sendMsg("订阅成功", to, at) return
self.subs[to] = [city]
取消预订天气预报
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# plugins/Weather/main.py elif msg.content.startswith("td "): city = msg.content[3:] if to inself.subs: if city inself.subs[to]: self.subs[to].remove(city) withopen('plugins/Weather/subs.json', 'w', encoding='utf-8') as f: json.dump(self.subs, f, ensure_ascii=False, indent=4) bot.sendMsg("取消订阅成功", to, at) return else: bot.sendMsg("该城市未订阅, 请先订阅", to, at) return else: bot.sendMsg("该群聊未订阅任何城市, 请先订阅", to, at) return
# utils/decorators.py from typing importCallable, TypeVar from typing_extensions import ParamSpec from collections.abc importCoroutine from functools import partial import asyncio
P = ParamSpec("P") R = TypeVar("R")
defrun_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]: """一个用于包装 sync function 为 async function 的装饰器 参数: call: 被装饰的同步函数 """
hello 0 hello 1 hello 2 hello 3 hello 4 hello 5 hello 6 hello 7 hello 8 hello 9 // 过了5秒 world 0 world 1 world 2 world 3 world 4 world 5 world 6 world 7 world 8 world 9
@on_quote_message asyncdefget_quote_msg(self, bot: LegendWechatBot, msg: WxMsg): try: if msg.from_group(): to, at = msg.roomid, msg.sender else: to, at = msg.sender, None
# utils/decorators.py from typing importCallable, TypeVar from typing_extensions import ParamSpec from collections.abc importCoroutine from functools import partial import asyncio
P = ParamSpec("P") R = TypeVar("R")
defrun_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]: """一个用于包装 sync function 为 async function 的装饰器 参数: call: 被装饰的同步函数 """
for folder in folders: withopen(os.path.join(os.getcwd(), 'plugins', folder, 'config.yaml'), "rb") as f: plugin_config = yaml.safe_load(f) name = list(plugin_config.keys())[0] plugin_config = plugin_config[name]
on_quote_message asyncdefdownloadImage(self, bot: LegendWechatBot, msg: WxMsg): try: ifnotself.enable: return if msg.from_group(): to, at = msg.roomid, msg.sender else: to, at = msg.sender, None
if msg.content == '下载图片': if quote isNone: bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at) return if quote.find('type').text != '3': bot.sendMsg('引用无效, 请重新引用需要下载的图片', to, at) return if quote.find('chatusr').text != msg.sender: bot.sendMsg('引用无效, 这不是你发的', to, at) return ifnot os.path.exists(os.path.join(self.folder, msg.sender)): os.mkdir(os.path.join(self.folder, msg.sender))
# 获取敏感词列表 defget_words(self): withopen(self.path, 'r', encoding='utf-8-sig') as f: for s in f: if s.find('\\r'): s = s.replace('\r', '') s = s.replace('\n', '') s = s.strip() iflen(s) == 0: continue ifstr(s) and s notinself.ban_words_set: self.ban_words_set.add(s) self.ban_words_list.append(str(s)) self.add_hash_dict(self.ban_words_list)
# 将敏感词列表转换为DFA字典序 defadd_hash_dict(self, new_list): for x in new_list: self.add_new_word(x)
# 添加单个敏感词 defadd_new_word(self, new_word): new_word = str(new_word) # print(new_word) now_dict = self.ban_words_dict i = 0 for x in new_word: if x notin now_dict: x = str(x) new_dict = dict() new_dict['is_end'] = False now_dict[x] = new_dict now_dict = new_dict else: now_dict = now_dict[x] if i == len(new_word) - 1: now_dict['is_end'] = True i += 1
# 寻找第一次出现敏感词的位置 deffind_illegal(self, _str): now_dict = self.ban_words_dict i = 0 start_word = -1 is_start = True# 判断是否是一个敏感词的开始 while i < len(_str): if _str[i] notin now_dict: if is_start isTrue: i += 1 continue i = start_word +1 start_word = -1 is_start = True now_dict = self.ban_words_dict else: if is_start isTrue: start_word = i is_start = False now_dict = now_dict[_str[i]] if now_dict['is_end'] isTrue: return start_word else: i += 1 return -1
# 将指定位置的敏感词替换为* deffilter_words(self, filter_str, pos): now_dict = self.ban_words_dict end_str = int() for i inrange(pos, len(filter_str)): if now_dict[filter_str[i]]['is_end'] isTrue: end_str = i break now_dict = now_dict[filter_str[i]] num = end_str - pos + 1 filter_str = filter_str[:pos] + '*'*num + filter_str[end_str + 1:] return filter_str
deffilter_all(self, s): pos_list = list() ss = self.draw_words(s, pos_list) illegal_pos = self.find_illegal(ss) while illegal_pos != -1: ss = self.filter_words(ss, illegal_pos) illegal_pos = self.find_illegal(ss) i = 0 while i < len(ss): if ss[i] == '*': start = pos_list[i] while i < len(ss) and ss[i] == '*': i += 1 i -=1 end = pos_list[i] num = end-start+1 s = s[:start] + '*'*num + s[end+1:] i += 1 return s
@staticmethod defdraw_words(_str, pos_list): ss = str() for i inrange(len(_str)): if'\u4e00' <= _str[i] <= '\u9fa5'or'\u3400' <= _str[i] <= '\u4db5'or'\u0030' <= _str[i] <= '\u0039' \ or'\u0061' <= _str[i] <= '\u007a'or'\u0041' <= _str[i] <= '\u005a': ss += _str[i] pos_list.append(i) return ss
敏感词需要在所有插件处理前单独判断, 因此不作为插件出现
敏感词检测逻辑
1 2 3 4
if dfa.exists(msg.content): self.DB.add_black(msg.sender, 2) self.bot.sendMsg('说话太刑了, 黑名单指数+2, 请注意', to, at) return
# 从重新加载的模块中获取插件类 for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, PluginBase) and obj != PluginBase and obj.__name__ == plugin_name): # 使用新的插件类而不是旧的 returnawaitself.load_plugin(bot, obj)
returnFalse except Exception as e: logger.error(f"重载插件 {plugin_name} 时发生错误: {e}") returnFalse
asyncdefreload_all_plugins(self, bot: WechatAPIClient) -> List[str]: """重载所有插件 Returns: List[str]: 成功重载的插件名称列表 """ try: # 记录当前加载的插件名称,排除 ManagePlugin original_plugins = [name for name inself.plugins.keys() if name != "ManagePlugin"]
# 卸载除 ManagePlugin 外的所有插件 for plugin_name in original_plugins: awaitself.unload_plugin(plugin_name)
# 重新加载所有模块 for module_name inlist(sys.modules.keys()): if module_name.startswith('plugins.') andnot module_name.endswith('ManagePlugin'): del sys.modules[module_name]
for dirname in os.listdir("plugins"): if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"): try: module = importlib.import_module(f"plugins.{dirname}.main") for name, obj in inspect.getmembers(module): if inspect.isclass(obj) andissubclass(obj, PluginBase) and obj != PluginBase: is_disabled = False ifnot load_disabled_plugin: is_disabled = obj.__name__ inself.excluded_plugins
asyncdefload_plugin_from_directory(self, bot: WechatAPIClient, plugin_name: str) -> bool: """从plugins目录加载单个插件 Args: bot: 机器人实例 plugin_name: 插件类名称(不是文件名) Returns: bool: 是否成功加载插件 """ found = False for dirname in os.listdir("plugins"): try: if os.path.isdir(f"plugins/{dirname}") and os.path.exists(f"plugins/{dirname}/main.py"): module = importlib.import_module(f"plugins.{dirname}.main") importlib.reload(module)
for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, PluginBase) and obj != PluginBase and obj.__name__ == plugin_name): found = True returnawaitself.load_plugin(bot, obj) except: logger.error(f"检查 {dirname} 时发生错误: {traceback.format_exc()}") continue