113 lines
3.3 KiB
Python
113 lines
3.3 KiB
Python
import asyncio
|
||
|
||
import aiohttp
|
||
from bs4 import BeautifulSoup
|
||
|
||
|
||
# import threading
|
||
|
||
class WeatherGet:
|
||
def __init__(self):
|
||
self.past_time = 0
|
||
self.latest_url = ""
|
||
|
||
async def get_china_satellite_weather_image(self) -> tuple[str, bool, str]:
|
||
"""
|
||
获取风云二号卫星的最新云图,以及更新状态。
|
||
:rtype: tuple[str, bool]
|
||
:rtype: tuple[str, bool]
|
||
:return: 最新云图的URL, bool 是否更新
|
||
"""
|
||
|
||
fy_2_url = "https://www.nmc.cn/publish/satellite/fy2.htm"
|
||
# web_meta_data = requests.get(fy_2_url).content\
|
||
|
||
# 异步连接获取图片页面准备解析。
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(fy_2_url) as response:
|
||
web_meta_data = await response.read()
|
||
|
||
print(web_meta_data)
|
||
|
||
soup = BeautifulSoup(web_meta_data, "lxml")
|
||
|
||
image = str(soup.find("img", attrs={"id": "imgpath"}))
|
||
time_element = soup.find("div", attrs={"class": "col-xs-12 time"})
|
||
|
||
# 应该会判断time_element是否为空,但是我的逻辑不是很清楚,因为我忘记当初写的逻辑了。所以存疑。
|
||
if time_element is None:
|
||
return "", False, "无法获取时间信息"
|
||
|
||
update_time = time_element.get_text() # 获取更新时间
|
||
|
||
if update_time != self.past_time: # update_time 和 past_time 是同源的,没有使用datetime初始化。
|
||
|
||
divided_parts = image.split()
|
||
for part in divided_parts:
|
||
|
||
if part.startswith("src="):
|
||
self.latest_url = part.split("=")[1]
|
||
|
||
# image = requests.get(self.latest_url[1:-1]).content
|
||
#
|
||
# with open(f"test.jpg", "wb") as f:
|
||
# f.write(image)
|
||
|
||
self.past_time = update_time
|
||
status = True
|
||
|
||
else:
|
||
status = False
|
||
|
||
return self.latest_url[1:-2], status, update_time
|
||
|
||
@staticmethod
|
||
async def request_content_sync(url: str):
|
||
"""
|
||
|
||
:param url:
|
||
:return:
|
||
"""
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url) as response:
|
||
if response.status == 200:
|
||
return await response.read()
|
||
else:
|
||
return None
|
||
|
||
|
||
Utils = WeatherGet()
|
||
|
||
|
||
async def main():
|
||
image_url, status, update_time = await Utils.get_china_satellite_weather_image()
|
||
|
||
print(
|
||
image_url, status, update_time
|
||
)
|
||
|
||
temp_name = update_time.split(" ")
|
||
|
||
month = temp_name[0].split("/")[0]
|
||
day = temp_name[0].split("/")[1]
|
||
hour = temp_name[1].split(":")[0]
|
||
minute = temp_name[1].split(":")[1]
|
||
|
||
# content_aga = requests.get(image_url).content
|
||
# with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
|
||
# f.write(content_aga)
|
||
|
||
if not image_url: # 如果没返回url
|
||
print(status)
|
||
content = await Utils.request_content_sync(image_url)
|
||
if not content:
|
||
with open(f"data/fy_2/{update_time}.jpg", "wb") as f:
|
||
f.write(content)
|
||
|
||
content = await Utils.request_content_sync(image_url)
|
||
with open(f"data/fy_2/{month}-{day}-{hour}-{minute}.jpg", "wb") as f:
|
||
f.write(content)
|
||
|
||
|
||
asyncio.run(main())
|