Weather/main.py
Pyhtagodzilla 4df0364c1f ooooo
2025-08-16 12:21:54 +08:00

113 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import aiohttp
from bs4 import BeautifulSoup
# import threading
class WeatherGet:
def __init__(self):
self.past_time = 0
self.latest_url = ""
async def get_china_satellite_weather_image(self) -> tuple[str, bool, str]:
"""
获取风云二号卫星的最新云图,以及更新状态。
:rtype: tuple[str, bool]
:rtype: tuple[str, bool]
:return: 最新云图的URL, bool 是否更新
"""
fy_2_url = "https://www.nmc.cn/publish/satellite/fy2.htm"
# web_meta_data = requests.get(fy_2_url).content\
# 异步连接获取图片页面准备解析。
async with aiohttp.ClientSession() as session:
async with session.get(fy_2_url) as response:
web_meta_data = await response.read()
print(web_meta_data)
soup = BeautifulSoup(web_meta_data, "lxml")
image = str(soup.find("img", attrs={"id": "imgpath"}))
time_element = soup.find("div", attrs={"class": "col-xs-12 time"})
# 应该会判断time_element是否为空但是我的逻辑不是很清楚因为我忘记当初写的逻辑了。所以存疑。
if time_element is None:
return "", False, "无法获取时间信息"
update_time = time_element.get_text() # 获取更新时间
if update_time != self.past_time: # update_time 和 past_time 是同源的没有使用datetime初始化。
divided_parts = image.split()
for part in divided_parts:
if part.startswith("src="):
self.latest_url = part.split("=")[1]
# image = requests.get(self.latest_url[1:-1]).content
#
# with open(f"test.jpg", "wb") as f:
# f.write(image)
self.past_time = update_time
status = True
else:
status = False
return self.latest_url[1:-2], status, update_time
@staticmethod
async def request_content_sync(url: str):
"""
:param url:
:return:
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.read()
else:
return None
Utils = WeatherGet()
async def main():
image_url, status, update_time = await Utils.get_china_satellite_weather_image()
print(
image_url, status, update_time
)
temp_name = update_time.split(" ")
month = temp_name[0].split("/")[0]
day = temp_name[0].split("/")[1]
hour = temp_name[1].split(":")[0]
minute = temp_name[1].split(":")[1]
# content_aga = requests.get(image_url).content
# with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
# f.write(content_aga)
if not image_url: # 如果没返回url
print(status)
content = await Utils.request_content_sync(image_url)
if not content:
with open(f"data/fy_2/{update_time}.jpg", "wb") as f:
f.write(content)
content = await Utils.request_content_sync(image_url)
with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
f.write(content)
asyncio.run(main())