重写了GetFile的解析逻辑。

This commit is contained in:
Pyhtagodzilla 2025-07-31 00:44:06 +08:00
parent 2dc3faaf03
commit 63b97d2d2f
14 changed files with 180 additions and 135 deletions

View File

@ -3,17 +3,23 @@
---
由于一上来就采用了远程开发的模式所以在bot连接的过程中需要在 ```bot.run()``` 函数中另外传入参数
```python
bot.run(
bt_uin = "bot account",
root = "bot's master account",
ws_uri = "ws://",
ws_token = "Your token",
webui_uri = "http://Your server:port",
webui_token = "Your token"
bt_uin="bot account",
root="bot's master account",
ws_uri="ws://",
ws_token="Your token",
webui_uri="http://Your server:port",
webui_token="Your token"
)
```
具体的信息在 [配置项文档](https://docs.ncatbot.xyz/guide/kfcvme50/#%E9%85%8D%E7%BD%AE%E9%A1%B9%E5%88%97%E8%A1%A8) 中提及。
一些报错是由于webui的token所导致的默认值为napcat另外连接都需要带上协议例如连接webui时需要带上 ```http://```
一些报错是由于webui的token所导致的默认值为napcat另外连接都需要带上协议例如连接webui时需要带上 ```http://```
---
官方文档已经给出了解析消息的示例但是我仍然使用了raw_message来手动解析消息。我觉得这个锅要甩***1/3***给文档,
---

View File

@ -1,4 +1,5 @@
requests~=2.32.4
ncatbot~=3.8.8.post7
dotenv~=0.9.9
python-dotenv~=1.1.1
python-dotenv~=1.1.1
sqlmodel~=0.0.24

View File

@ -0,0 +1,4 @@
# DataBase
> 用于开发功能而做的插件。只是为了便于重载和测试。
用于和数据库进行交互。

View File

@ -0,0 +1,3 @@
from .dataBase import DataBase
__all__ = ['DataBase']

View File

@ -0,0 +1,27 @@
import os
from sqlmodel import Field, SQLModel, create_engine
from dotenv import load_dotenv
from pathlib import Path
load_dotenv("../../../.env")
class ConnectDatabase(SQLModel, table=True):
__tablename__ = 'BasicInformation'
id: int | None = Field(primary_key=True)
fileName: str | None
includedText: str | None
def confirm_database_exists():
database_path = Path(os.getenv("DATABASE_PATH")) / "DataBase" / "data.db"
if not database_path.exists():
engine = create_engine(f"sqlite:///{database_path}")
SQLModel.metadata.create_all(engine)
if __name__ == "connect_database":
confirm_database_exists()

View File

@ -0,0 +1,16 @@
from sqlmodel import SQLModel, Session, select, create_engine
from ncatbot.plugin import BasePlugin, CompatibleEnrollment
from ncatbot.utils import get_log
bot = CompatibleEnrollment()
_log = get_log()
class Data(SQLModel, table=True):
__tablename__ = 'file'
class DataBase(BasePlugin):
name = "DataBase"
version = "0.0.1"
author = "pythagodzilla"
info = "A plugin to control data base."

View File

@ -0,0 +1,4 @@
ncatbot~=3.8.8.post7
dotenv~=0.9.9
python-dotenv~=1.1.1
sqlmodel~=0.0.24

View File

@ -0,0 +1,6 @@
# GetFile
> 用于开发功能而做的插件。只是为了便于重载和测试。
用来保存所有的照片和视频的插件。
> **特殊说明** 这个插件编写的时间略早,所以消息解析使用了一个极为不成熟的方法。不推荐再做更改。

View File

@ -0,0 +1,102 @@
from ncatbot.core import BotClient, GroupMessage, PrivateMessage
from ncatbot.utils import get_log
from ncatbot.plugin import BasePlugin, CompatibleEnrollment
from dotenv import load_dotenv
import requests
import os
# bot = BotClient()
bot = CompatibleEnrollment
_log = get_log()
class GetFile(BasePlugin):
name = "GetFile"
version = "0.0.1"
author = "pythagodzilla"
info = "A plugin to get images and videos"
async def on_load(self):
_log.info(f"Plugin {self.name} loaded! ")
@bot.private_event()
async def on_receive_images_private_event(self, msg: PrivateMessage):
if msg.message[0]["type"] == "image":
file_name = msg.message[0]["data"]["file"]
_log.info(f"Name of the image is: {file_name}")
image_url = await self.api.get_image(file_name)
_log.info(f"Image's information collected by \"get_image\" is: {image_url}")
real_url = image_url['data']['url']
_log.info(f"Image's real url is: {real_url}")
response = requests.get(real_url)
img = response.content
with open(f"../data/pics/{file_name}", 'wb') as f:
f.write(img)
@bot.group_event()
async def on_receive_images_group_event(self, msg: GroupMessage):
if msg.message[0]["type"] == "image":
file_name = msg.message[0]["data"]["file"]
_log.info(f"Name of the image is: {file_name}")
image_url = await self.api.get_image(file_name)
_log.info(f"Image's information collected by \"get_image\" is: {image_url}")
real_url = image_url['data']['url']
_log.info(f"Image's real url is: {real_url}")
response = requests.get(real_url)
img = response.content
with open(f"../data/pics/{file_name}", 'wb') as f:
f.write(img)
@bot.group_event()
async def on_receive_videos_group_event(self, msg: GroupMessage):
if msg.message[0]["type"] == "video":
file_name = msg.message[0]["data"]["file"]
_log.info(f"Name of the video is: {file_name}")
video_url = await self.api.get_file(file_name)
_log.info(f"Video's information collected by \"get_video\" is: {video_url}")
real_url = video_url['data']['url']
_log.info(f"Video's real url is: {real_url}")
response = requests.get(real_url)
video = response.content
with open(f"../data/videos/{file_name}", 'wb') as f:
f.write(video)
@bot.private_event()
async def on_receive_videos_private_event(self, msg: PrivateMessage):
if msg.message[0]["type"] == "video":
file_name = msg.message[0]["data"]["file"]
_log.info(f"Name of the video is: {file_name}")
video_url = await self.api.get_file(file_name)
_log.info(f"Video's information collected by \"get_video\" is: {video_url}")
real_url = video_url['data']['url']
_log.info(f"Video's real url is: {real_url}")
response = requests.get(real_url)
video = response.content
with open(f"../data/videos/{file_name}", 'wb') as f:
f.write(video)
"""
当单独运行这个程序时会连接napcat并保存图片和视频到本地
"""
if __name__ == "__main__":
load_dotenv(dotenv_path=r"../../../.env")
bot = BotClient()
api = bot.run(
bt_uin=os.getenv("BOT_ACCOUNT"),
root=os.getenv("ROOT_ACCOUNT"),
ws_uri=os.getenv("WS_URI"),
ws_token=os.getenv("WS_TOKEN"),
webui_uri=os.getenv("WEBUI_URI"),
webui_token=os.getenv("WEBUI_TOKEN")
)

View File

@ -0,0 +1,3 @@
ncatbot~=3.8.8.post7
dotenv~=0.9.9
python-dotenv~=1.1.1

View File

@ -1,4 +0,0 @@
from sqlmodel import SQLModel, Session, select, create_engine
class Data(SQLModel, table=True):
__tablename__ = 'file'

View File

@ -1,123 +0,0 @@
from ncatbot.core import BotClient, GroupMessage, PrivateMessage
from ncatbot.utils import get_log
from ncatbot.plugin import BasePlugin, CompatibleEnrollment
from dotenv import load_dotenv
import requests
import os
# bot = BotClient()
bot = CompatibleEnrollment
_log = get_log()
class GetFile(BasePlugin):
name = "GetFile"
version = "0.0.1"
author = "pythagodzilla"
info = "A plugin to get images and videos"
@bot.private_event()
async def on_receive_images_private_event(self, msg: PrivateMessage):
message = msg.raw_message[1: -1]
if message.startswith("CQ:image,"):
divided_message = message.split(',')
for item in divided_message:
if "file=" in item:
file_name = item.split("=")[1]
_log.info(f"Name of the image is: {file_name}")
image_url = await self.api.get_image(file_name)
_log.info(f"Image's information collected by \"get_image\" is: {image_url}")
real_url = image_url['data']['url']
_log.info(f"Image's real url is: {real_url}")
response = requests.get(real_url)
img = response.content
with open(f"../data/pics/{file_name}", 'wb') as f:
f.write(img)
@bot.group_event()
async def on_receive_images_group_event(self, msg: GroupMessage):
# if "[CQ:image," in msg.raw_message:
message = msg.raw_message[1: -1]
if message.startswith("CQ:image,"):
divided_message = message.split(',')
for item in divided_message:
# if "file=" in item:
if item.startswith("file="):
file_name = item.split("=")[1]
_log.info(f"Name of the image is: {file_name}")
image_url = await self.api.get_image(file_name)
_log.info(f"Image's information collected by \"get_image\" is: {image_url}")
real_url = image_url['data']['url']
_log.info(f"Image's real url is: {real_url}")
response = requests.get(real_url)
img = response.content
with open(f"../data/pics/{file_name}", 'wb') as f:
f.write(img)
@bot.group_event()
async def on_receive_videos_group_event(self, msg: GroupMessage):
message = msg.raw_message[1: -1]
if message.startswith("CQ:video,"):
divided_message = message.split(',')
for item in divided_message:
if item.startswith("file="):
file_name = item.split("=")[1]
_log.info(f"Name of the video is: {file_name}")
video_url = await self.api.get_file(file_name)
_log.info(f"Video's information collected by \"get_video\" is: {video_url}")
real_url = video_url['data']['url']
_log.info(f"Video's real url is: {real_url}")
response = requests.get(real_url)
video = response.content
with open(f"../data/videos/{file_name}", 'wb') as f:
f.write(video)
@bot.private_event()
async def on_receive_videos_private_event(self, msg: PrivateMessage):
message = msg.raw_message[1: -1]
if message.startswith("CQ:video,"):
divided_message = message.split(',')
for item in divided_message:
if item.startswith("file="):
file_name = item.split("=")[1]
_log.info(f"Name of the video is: {file_name}")
video_url = await self.api.get_file(file_name)
_log.info(f"Video's information collected by \"get_video\" is: {video_url}")
real_url = video_url['data']['url']
_log.info(f"Video's real url is: {real_url}")
response = requests.get(real_url)
video = response.content
with open(f"../data/videos/{file_name}", 'wb') as f:
f.write(video)
"""
当单独运行这个程序时会连接napcat并保存图片和视频到本地
"""
if __name__ == "__main__":
load_dotenv(dotenv_path=r"../../../.env")
bot = BotClient()
api = bot.run(
bt_uin=os.getenv("BOT_ACCOUNT"),
root=os.getenv("ROOT_ACCOUNT"),
ws_uri=os.getenv("WS_URI"),
ws_token=os.getenv("WS_TOKEN"),
webui_uri=os.getenv("WEBUI_URI"),
webui_token=os.getenv("WEBUI_TOKEN")
)