This commit is contained in:
Pyhtagodzilla 2025-08-16 12:21:54 +08:00
commit 4df0364c1f

112
main.py Normal file
View File

@ -0,0 +1,112 @@
import asyncio
import aiohttp
from bs4 import BeautifulSoup
# import threading
class WeatherGet:
def __init__(self):
self.past_time = 0
self.latest_url = ""
async def get_china_satellite_weather_image(self) -> tuple[str, bool, str]:
"""
获取风云二号卫星的最新云图以及更新状态
:rtype: tuple[str, bool]
:rtype: tuple[str, bool]
:return: 最新云图的URL, bool 是否更新
"""
fy_2_url = "https://www.nmc.cn/publish/satellite/fy2.htm"
# web_meta_data = requests.get(fy_2_url).content\
# 异步连接获取图片页面准备解析。
async with aiohttp.ClientSession() as session:
async with session.get(fy_2_url) as response:
web_meta_data = await response.read()
print(web_meta_data)
soup = BeautifulSoup(web_meta_data, "lxml")
image = str(soup.find("img", attrs={"id": "imgpath"}))
time_element = soup.find("div", attrs={"class": "col-xs-12 time"})
# 应该会判断time_element是否为空但是我的逻辑不是很清楚因为我忘记当初写的逻辑了。所以存疑。
if time_element is None:
return "", False, "无法获取时间信息"
update_time = time_element.get_text() # 获取更新时间
if update_time != self.past_time: # update_time 和 past_time 是同源的没有使用datetime初始化。
divided_parts = image.split()
for part in divided_parts:
if part.startswith("src="):
self.latest_url = part.split("=")[1]
# image = requests.get(self.latest_url[1:-1]).content
#
# with open(f"test.jpg", "wb") as f:
# f.write(image)
self.past_time = update_time
status = True
else:
status = False
return self.latest_url[1:-2], status, update_time
@staticmethod
async def request_content_sync(url: str):
"""
:param url:
:return:
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
else:
return None
Utils = WeatherGet()
async def main():
image_url, status, update_time = await Utils.get_china_satellite_weather_image()
print(
image_url, status, update_time
)
temp_name = update_time.split(" ")
month = temp_name[0].split("/")[0]
day = temp_name[0].split("/")[1]
hour = temp_name[1].split(":")[0]
minute = temp_name[1].split(":")[1]
# content_aga = requests.get(image_url).content
# with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
# f.write(content_aga)
if not image_url: # 如果没返回url
print(status)
content = await Utils.request_content_sync(image_url)
if not content:
with open(f"data/fy_2/{update_time}.jpg", "wb") as f:
f.write(content)
content = await Utils.request_content_sync(image_url)
with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
f.write(content)
asyncio.run(main())