From 4df0364c1f6a58b3e81da9dccaf0e4f0c842c2e8 Mon Sep 17 00:00:00 2001 From: Pyhtagodzilla <1670671958@qq.com> Date: Sat, 16 Aug 2025 12:21:54 +0800 Subject: [PATCH] ooooo --- main.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..969dd5c --- /dev/null +++ b/main.py @@ -0,0 +1,112 @@ +import asyncio + +import aiohttp +from bs4 import BeautifulSoup + + +# import threading + +class WeatherGet: + def __init__(self): + self.past_time = 0 + self.latest_url = "" + + async def get_china_satellite_weather_image(self) -> tuple[str, bool, str]: + """ + 获取风云二号卫星的最新云图,以及更新状态。 + :rtype: tuple[str, bool] + :rtype: tuple[str, bool] + :return: 最新云图的URL, bool 是否更新 + """ + + fy_2_url = "https://www.nmc.cn/publish/satellite/fy2.htm" + # web_meta_data = requests.get(fy_2_url).content\ + + # 异步连接获取图片页面准备解析。 + async with aiohttp.ClientSession() as session: + async with session.get(fy_2_url) as response: + web_meta_data = await response.read() + + print(web_meta_data) + + soup = BeautifulSoup(web_meta_data, "lxml") + + image = str(soup.find("img", attrs={"id": "imgpath"})) + time_element = soup.find("div", attrs={"class": "col-xs-12 time"}) + + # 应该会判断time_element是否为空,但是我的逻辑不是很清楚,因为我忘记当初写的逻辑了。所以存疑。 + if time_element is None: + return "", False, "无法获取时间信息" + + update_time = time_element.get_text() # 获取更新时间 + + if update_time != self.past_time: # update_time 和 past_time 是同源的,没有使用datetime初始化。 + + divided_parts = image.split() + for part in divided_parts: + + if part.startswith("src="): + self.latest_url = part.split("=")[1] + + # image = requests.get(self.latest_url[1:-1]).content + # + # with open(f"test.jpg", "wb") as f: + # f.write(image) + + self.past_time = update_time + status = True + + else: + status = False + + return self.latest_url[1:-2], status, update_time + + @staticmethod + async def request_content_sync(url: str): + """ + + :param url: + :return: + """ + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + return await response.read() + else: + return None + + +Utils = WeatherGet() + + +async def main(): + image_url, status, update_time = await Utils.get_china_satellite_weather_image() + + print( + image_url, status, update_time + ) + + temp_name = update_time.split(" ") + + month = temp_name[0].split("/")[0] + day = temp_name[0].split("/")[1] + hour = temp_name[1].split(":")[0] + minute = temp_name[1].split(":")[1] + + # content_aga = requests.get(image_url).content + # with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f: + # f.write(content_aga) + + if not image_url: # 如果没返回url + print(status) + content = await Utils.request_content_sync(image_url) + if not content: + with open(f"data/fy_2/{update_time}.jpg", "wb") as f: + f.write(content) + + content = await Utils.request_content_sync(image_url) + with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f: + f.write(content) + + +asyncio.run(main())