需求背景:
公司是做外贸服装的,在亚马逊平台上有多个地区店铺运营,运营人员需要参考地区的天气情况,上新的服装.所以需要能够获取全球任意地区的天气情况.还需要预测未来10-15天的天气情况.
选型API:
天气API中有大把免费的api,如:国内的心知天气,国际的雅虎,还有今天的主角:wunderground
最终选择了wunderground,原因:1,需求是全球任意地区的(国内API请求国外地区需要收费才能访问), 2.wunderground提供是信息最全,最丰富的天气api.雅虎提供的天气API信息非常之简略.
直入主题:
这里的免费api只是说测试账号每天有500次的免费请求,要是公司需求大的话,那么就需要付费了.官网价格
准备工作,你需要在官网注册一个账号,然后随意打开一个API的文档, 你会见到
http://api.wunderground.com/api/Your_Key/conditions/q/CA/San_Francisco.json
当中Your_Key的位置有一串key.请保管记住.
python代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 18年3月6日 下午1:53
# @Author : dongyouyuan
# @email : 15099977712@163.com
# @File : weatherApi.py
# @Software: PyCharm
#
# 通过全球天气预报API:http://api.wunderground.com(信息最多最全) 来获取信息
import requests
import json
import datetime
import time
import operator
def timeStamp_to_dataTime(timestamp, format='%Y-%m-%d'):
"""
时间戳转换成格式化的时间
:param timestamp: 时间戳
:param format: 格式
:return:
"""
if timestamp is None or not timestamp:
return ''
return datetime.datetime.fromtimestamp(int(timestamp)).strftime(format)
def dataTime_to_timeStamp(data_tiem, format='%Y-%m-%d %H:%M'):
"""
格式化时间格式转换成时间戳
:param data_tiem: 格式化时间
:param format: 格式
:return:
"""
if not data_tiem:
return 0
else:
try:
time_array = time.strptime(data_tiem, format)
time_stamp = int(time.mktime(time_array))
return time_stamp
except Exception as error_msg:
print(error_msg)
return 0
def get_recent_10_days(date):
"""
获取前10天的时间格式 20180306
:return: 返回列表
"""
date_list = list()
for i in range(1, 11):
tmp_date = (datetime.datetime.strptime(date, "%Y%m%d") - datetime.timedelta(days=i)).strftime('%Y%m%d')
date_list.append(tmp_date)
return date_list
def sorted_for_list_dict(list_dict, key, reverse=False):
"""
为列表排序,按照元素中dict的key
:param list_dict:
:param key:
:return:
"""
return sorted(list_dict, key=operator.itemgetter(key), reverse=reverse)
class Weather(object):
def __init__(self):
self.search_url = 'http://autocomplete.wunderground.com/aq'
self._url = 'http://api.wunderground.com/api/'
self._key = '' # Your_ key
self.url = self._url + self._key + '/'
self.timeout = 10
def search(self, query):
"""
搜索地名,返回结果列表
:param query:
:return:
"""
params = {"query": query}
headers = {"Content-Type": "application/json"}
r = requests.get(url=self.search_url, params=params, headers=headers, timeout=self.timeout)
if r.status_code == 200:
return r.json().get('RESULTS', None)
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def get_today(self, zmw):
"""
根据zmw码获取今天的天气信息
:param zmw:
:return:
"""
# temp 温度
# humidity 湿度
# weather 天气情况
# wind 风
# dewpoint 露点
# UV 紫外线强度
# pressure 压力
url = self.url + 'conditions' + '/q/zmw:{}.json'.format(zmw)
headers = {"Content-Type": "application/json"}
r = requests.get(url=url, headers=headers, timeout=self.timeout)
if r.status_code == 200:
# print(r.json())
return r.json().get('current_observation')
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def get_hourly_today(self, zmw):
"""
根据zmw码获取今天每小时的天气信息
:param zmw:
:return:
"""
url = self.url + 'hourly' + '/q/zmw:{}.json'.format(zmw)
headers = {"Content-Type": "application/json"}
r = requests.get(url=url, headers=headers, timeout=self.timeout)
if r.status_code == 200:
# print(r.json())
return r.json().get('hourly_forecast')
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def _get_forecast_10day(self, zmw):
"""
根据zmw码获取未来10天的天气预报
:param zmw:
:return:
"""
# snow 降雪 (白天,晚上,全天)
# wind 风 (最大,最小)
# humidity 湿度(最大,最小,平均)
# conditions 天气情况 icon icon_url
# qpf 降水量(白天,晚上,全天)
# low, high 温度(最高,最低)
# pop 降水概率
url = self.url + 'forecast10day' + '/q/zmw:{}.json'.format(zmw)
headers = {"Content-Type": "application/json"}
r = requests.get(url=url, headers=headers, timeout=self.timeout)
if r.status_code == 200:
return sorted_for_list_dict(r.json().get('forecast').get('simpleforecast').get('forecastday'), 'period')
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def get_forecast_10day(self, zmw):
"""
根据zmw码获取未来10天的天气预报(并且整理数据输出)
:param zmw:
:return:
"""
f_day_list = self._get_forecast_10day(zmw)
if f_day_list:
result_list = list()
for f_day in f_day_list:
forecast_day = dict()
date_time = "{}{}{}".format(f_day['date']['year'], f_day['date']['month'], f_day['date']['day'])
time_stamp = dataTime_to_timeStamp(date_time, format='%Y%m%d')
date_format = timeStamp_to_dataTime(time_stamp, format='%Y%m%d')
# 当天日期
forecast_day['date_format'] = date_format
# 风
wind = {
'ave': f_day.get('avewind', {}),
'max': f_day.get('maxwind', {})
}
forecast_day['wind'] = wind
# 雪
snow = {
'day': f_day.get('snow_day', {}),
'night': f_day.get('snow_night', {}),
'allday': f_day.get('snow_allday', {})
}
forecast_day['snow'] = snow
# 湿度
humidity = {
'min': f_day.get('minhumidity', 0),
'max': f_day.get('maxhumidity', 0),
'ave': f_day.get('avehumidity', 0)
}
forecast_day['humidity'] = humidity
# 天气情况
conditions = {
'icon_url': f_day.get('icon_url', ''),
'icon': f_day.get('icon', ''),
'text': f_day.get('conditions', ''),
'skyicon': f_day.get('skyicon', '')
}
forecast_day['conditions'] = conditions
# 气压
qpf = {
'day': f_day['qpf_day'],
'night': f_day['qpf_night'],
'allday': f_day['qpf_allday']
}
forecast_day['qpf'] = qpf
# 气温
temp = {
'low': f_day.get('low', {'celsius': '0', 'fahrenheit': '0'}),
'high': f_day.get('high', {'celsius': '0', 'fahrenheit': '0'})
}
forecast_day['temp'] = temp
forecast_day['pop'] = f_day.get('pop', 0)
forecast_day['date'] = f_day.get('date', {})
result_list.append(forecast_day)
return result_list
else:
return None
def get_hourly_10day(self, zmw):
"""
根据zmw码获取未来10天,每小时的天气预报
:param zmw:
:return:
"""
url = self.url + 'hourly10day' + '/q/zmw:{}.json'.format(zmw)
headers = {"Content-Type": "application/json"}
r = requests.get(url=url, headers=headers, timeout=self.timeout)
if r.status_code == 200:
# print(r.json())
return r.json()
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def _get_history(self, zmw, date):
"""
根据zmw码获取历史天气
:param zmw:
:param dates: 查询时间 20170608
:return:
"""
url = self.url + 'history_{}'.format(date) + '/q/zmw:{}.json'.format(zmw)
headers = {"Content-Type": "application/json"}
r = requests.get(url=url, headers=headers, timeout=self.timeout)
if r.status_code == 200:
# print(r.json())
return r.json()
else:
print('请求失败,失败代码:{}'.format(r.status_code))
return None
def get_history(self, zmw, date):
"""
根据zmw码获取历史天气(并且整理数据输出)
:param zmw:
:param dates: 查询时间 20170608
:return:
"""
h_day = self._get_history(zmw, date)
if h_day['history']['dailysummary']:
h_day = h_day['history']['dailysummary'][0]
history_day = dict()
# 当天日期
history_day['date_format'] = date
# 风
wind = {
'ave': {'dir': h_day.get('meanwdire', ''), 'degrees': h_day.get('meanwdird', ''),
'kph': h_day.get('meanwindspdi', ''), 'mph': h_day.get('meanwindspdm', '')},
'max': {'dir': '', 'degrees': '0', 'kph': '0', 'mph': '0'}
}
history_day['wind'] = wind
# 雪
snow = {
'day': {'cm': h_day.get('snowdepthm', ''), 'in': h_day.get('snowdepthi', '')},
'night': {'cm': '', 'in': ''},
'allday': {'cm': '', 'in': ''}
}
history_day['snow'] = snow
# 湿度
humidity = {
'min': h_day.get('minhumidity', ''),
'max': h_day.get('maxhumidity', ''),
'ave': h_day.get('humidity', '')
}
history_day['humidity'] = humidity
# 天气情况
conditions = {
'icon_url': '',
'icon': '',
'text': '',
'skyicon': '',
'tornado': h_day.get('tornado', '0'),
'fog': h_day.get('fog', '0'),
'rain': h_day.get('rain', '0'),
'snow': h_day.get('snow', '0'),
'thunder': h_day.get('thunder', '0')
}
history_day['conditions'] = conditions
# 气压
qpf = {
'day': {'mm': h_day.get('maxpressurem', '0.0'), 'in': h_day.get('maxpressurei', '0.0')},
'night': {'mm': h_day.get('minpressurem', '0.0'), 'in': h_day.get('minpressurei', '0.0')},
'allday': {'mm': h_day.get('meanpressurem', '0.0'), 'in': h_day.get('meanpressurei', '0.0')}
}
history_day['qpf'] = qpf
# 温度
temp = {
'low': {'celsius': h_day.get('mintempm', '0'), 'fahrenheit': h_day.get('mintempi', '0')},
'high': {'celsius': h_day.get('maxtempm', '0'), 'fahrenheit': h_day.get('maxtempi', '0')}
}
history_day['temp'] = temp
# 可见度
visi = {
'min': {'km': h_day.get('minvism', '0.0'), 'mi': h_day.get('minvisi', '0.0')},
'mean': {'km': h_day.get('meanvism', '0.0'), 'mi': h_day.get('meanvisi', '0.0')},
'max': {'km': h_day.get('maxvism', '0.0'), 'mi': h_day.get('maxvisi', '0.0')}
}
history_day['visi'] = visi
# 时间信息
history_day['date'] = h_day.get('date', {})
return history_day
else:
return None
def _get_history_10day(self, zmw, date):
"""
根据zmw码获取10天的历史天气(不包括传入天)
:param zmw:
:param dates: 查询时间 20170608
:return:
"""
date_list = get_recent_10_days(date)
result_list = list()
for history_date in date_list:
result_list.append(self._get_history(zmw=zmw, date=history_date))
return result_list
def get_history_10day(self, zmw, date):
"""
根据zmw码获取10天的历史天气(不包括传入天)(并且整理输出)
:param zmw:
:param dates: 查询时间 20170608
:return:
"""
date_list = get_recent_10_days(date)
result_list = list()
for history_date in date_list:
result_list.append(self.get_history(zmw=zmw, date=history_date))
return result_list
if __name__ == '__main__':
weather = Weather()
search_result = weather.search('Paris')
if search_result:
zmw = search_result[0].get('zmw')
if zmw:
print('*'*100)
# 获取未来10天的天气预告
f_10daty = weather.get_forecast_10day(zmw=zmw)
print(json.dumps(f_10daty, indent=4))
print(len(f_10daty))
# 获取某一天的历史天气
# h_10daty = weather.get_history(zmw=zmw, date='20180306')
# print(json.dumps(h_10daty, indent=4))
# print(json.dumps(h_10daty, indent=4))
# print(len(h_10daty))
# 获取某天前10天的历史天气(不包括传入天)
h_10daty = weather.get_history_10day(zmw=zmw, date='20180306')
print(json.dumps(h_10daty, indent=4))
print(len(h_10daty))
代码当中封装了Weather类,你需要把刚才注册得到的key放到类的key中去.
代码解析:
类中并没有按照官网API(/q/地区/地名.json)的方法去做,因为发现这样得出的结果往往不准确的.而是通过查询你需要的字符,你自己获得到zmw(这是个唯一的编码)
然后再通过此码去查询.然后在方法 get_forecast_10day(),get_history(), get_history_10day()封装了一下,原因是官网的不同api获取的天气信息所叫的名称不一样(变量名)
这样用起来有点麻烦,所以统一整理,封装了一下.
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/179556.html原文链接:https://javaforall.cn