Initial commit

This commit is contained in:
Bluemangoo 2026-06-20 23:05:07 +08:00
commit 61b73d0bef
Signed by: Bluemangoo
GPG Key ID: F2F7E46880A1C4CF
5 changed files with 204 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
__pycache__
*.dll
*.so
*.dylib

23
__init__.py Normal file
View File

@ -0,0 +1,23 @@
import nonebot
from nonebot import require
require("nonebot_plugin_apscheduler")
from pathlib import Path
from nonebot_plugin_apscheduler import scheduler
from .config import pconfig
from .lib import rlib
from .sync import sync
driver = nonebot.get_driver()
@driver.on_startup
def startup():
if pconfig.proxy:
rlib.set_proxy(pconfig.proxy)
if pconfig.github_token:
rlib.set_ghp(pconfig.github_token)
rlib.boot(str(Path.cwd()), "nonebot_plugin_sekai_update_notify", pconfig.assets_studio_path)
scheduler.add_job(sync, "interval", seconds=10, id="sekai_update_sync", replace_existing=True)

22
config.py Normal file
View File

@ -0,0 +1,22 @@
from nonebot import get_driver, get_plugin_config
from pydantic import BaseModel
_nickname: str = ""
_proxy: str | None = None
try:
_proxy = next(iter(get_driver().config.proxy), None)
_nickname: str = next(iter(get_driver().config.nickname), "")
except:
pass
class Config(BaseModel):
assets_studio_path: str
github_token: str | None = None
@property
def proxy(self) -> str | None:
return _proxy
@property
def nickname(self) -> str:
return _nickname
pconfig: Config = get_plugin_config(Config)

102
lib.py Normal file
View File

@ -0,0 +1,102 @@
import ctypes
import json
import os
import sys
from typing import Optional, Dict, Any, List
def get_lib_filename(base_name):
if sys.platform == "win32":
return f"{base_name}.dll"
elif sys.platform == "darwin":
return f"lib{base_name}.dylib"
else:
return f"lib{base_name}.so"
class FetchData:
event_id: int
card_paths: List[str]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'FetchData':
instance = cls()
instance.event_id = data.get("event_id", 0)
instance.card_paths = data.get("card_paths", [])
return instance
class SekaiSyncLib:
def __init__(self):
self.lib = ctypes.CDLL(os.path.join(os.path.dirname(__file__), get_lib_filename("sekai_sync_lib")))
self._setup_signatures()
def _setup_signatures(self):
# proxy(x: *const c_char)
self.lib.proxy.argtypes = [ctypes.c_char_p]
self.lib.proxy.restype = None
# ghp(x: *const c_char)
self.lib.ghp.argtypes = [ctypes.c_char_p]
self.lib.ghp.restype = None
# boot(cwd: *const c_char, name: *const c_char, as_path: *const c_char) -> i32
self.lib.boot.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p]
self.lib.boot.restype = ctypes.c_int32
# fetch_data() -> *mut c_char
# ⚠️ 这里必须用 c_void_p不能用 c_char_p否则会丢失指针地址导致内存泄漏
self.lib.fetch_data.argtypes = []
self.lib.fetch_data.restype = ctypes.c_void_p
# free_string(s: *mut c_char)
self.lib.free_string.argtypes = [ctypes.c_void_p]
self.lib.free_string.restype = None
def _str_to_bytes(self, s: Optional[str]) -> bytes:
"""辅助函数:将 Python 字符串安全转换为 C 字符串字节"""
if s is None:
# 根据 Rust 端解析逻辑,传入空字符串将被视为 None
return b""
return s.encode('utf-8')
def set_proxy(self, proxy_url: Optional[str]):
"""设置代理,传 None 或空字符串可清除"""
self.lib.proxy(self._str_to_bytes(proxy_url))
def set_ghp(self, token: Optional[str]):
"""设置 GitHub Token传 None 或空字符串可清除"""
self.lib.ghp(self._str_to_bytes(token))
def boot(self, cwd: str, name: str, as_path: str) -> int:
"""
启动主循环与初始化状态
"""
return self.lib.boot(
self._str_to_bytes(cwd),
self._str_to_bytes(name),
self._str_to_bytes(as_path)
)
def fetch_data(self) -> Optional[FetchData]:
# 1. 获取裸指针
ptr = self.lib.fetch_data()
# 指针为空 (std::ptr::null_mut)
if not ptr:
return None
try:
# 2. 将 void_p 转换回 c_char_p 并读取里面的 bytes
json_bytes = ctypes.cast(ptr, ctypes.c_char_p).value
if not json_bytes:
return None
# 3. 解码并反序列化 JSON
json_str = json_bytes.decode('utf-8')
return FetchData.from_dict(json.loads(json_str))
finally:
# 4. 无论是否发生 JSON 解析异常,绝对保证释放 Rust 内存!
self.lib.free_string(ptr)
rlib = SekaiSyncLib()

53
sync.py Normal file
View File

@ -0,0 +1,53 @@
import json
from pathlib import Path
from nonebot import get_bot
from nonebot.adapters.onebot.v11 import Message, MessageSegment
from nonebot_plugin_apscheduler import scheduler
from .config import pconfig
from .lib import rlib
group_file = Path("config") / "nonebot_sekai_update_notify" / "group.json"
group_file.parent.mkdir(parents=True, exist_ok=True)
if not group_file.exists():
group_file.write_text("[]")
enabled_groups = json.loads(group_file.read_text())
def add_group(group_id: int):
if group_id not in enabled_groups:
enabled_groups.append(group_id)
group_file.write_text(json.dumps(enabled_groups))
def remove_group(group_id: int):
if group_id in enabled_groups:
enabled_groups.remove(group_id)
group_file.write_text(json.dumps(enabled_groups))
async def sync():
data = rlib.fetch_data()
if data is None:
return
message = Message()
inner = [MessageSegment.text(f"活动 {data.event_id} 的卡牌更新了!\n")]
for item in data.card_paths:
p = Path(item)
n = MessageSegment.image(file=item)
n.data["summary"] = f"[{p.name}]"
inner.append(n)
for node in inner:
message.append(MessageSegment.node_custom(
user_id=int(get_bot().self_id),
nickname=pconfig.nickname,
content=Message([node])
))
for group in enabled_groups:
await get_bot().send_group_msg(message=message, group_id=group)