Python 处理NOAA气象数据

Posted by WT on March 25, 2024

问题

将NOAA中国区域的气象数据下载下来存入PostgreSQL

思路

根据NOAA全球气象数据按天记录数据的网页解析html,获得每个文件的文件名称,再加上url的前缀,组成下载链接,下载文件并解压缩。创建PostgreSQL数据表,并设置相关主键,将解压缩获得的csv文件中的中国站点数据插入到数据表中,并根据站点的经纬度数据反查所在的省市县。

步骤

运行环境:Python 3.6.13 PostgreSQL 16.2.1

按年下载NOAA气象数据

import requests
import os
from lxml import etree
import sys


#定义下载年份区间

years=[1985,2023]

print("downloading with requests")
tar_gz_url = 'https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/'
# 定义请求头部信息
headers = {'User-Agent': 'M'}
# 发送请求
resp= requests.get(tar_gz_url,headers=headers)
tar_gz_resp_text = resp.text
# 根据返回的text创建etree对象才能使用xpath分析
tar_gz_etree_element = etree.HTML(tar_gz_resp_text)
tar_gz_url_xpath = tar_gz_etree_element.xpath('//*/table/tr/td/a')
#for i in tar_gz_url_xpath:
    #print(i.xpath('@href'))
    #ste_name=i.xpath('@href')[0][:4]
    #print(ste_name)
for tar_gz in tar_gz_url_xpath[1:]:
    # 打印要下载的文件名
    # print(tar_gz.xpath('@href')[0])
    requests_url = tar_gz_url + tar_gz.xpath('@href')[0]
    file_name = tar_gz.xpath('@href')[0]
    year=int(file_name[:4])
    if year <years[0] or year >years[1]:
        continue
    # 打印拼接以后的下载链接
    # print(requests_url)
    # 通过下载链接创建对象r
    r = requests.get(requests_url)
    # 如果当前文件夹没有tar_gz目录则创建该目录
    if 'tar_gz' not in [x for x in os.listdir('.') if os.path.isdir(x)]:
        try:
            os.mkdir('tar_gz')
        except:
            print('创建文件夹失败')
    # 如果在目录tar_gz下已经有文件了则不重复下载,否则下载
    if file_name in [x for x in os.listdir('tar_gz')]:
        print('%s文件已下载'%(file_name))
    else:
        # 通过拼接的下载url下载文件,下载文件存储在目录tar_gz下
        with open(f'tar_gz/{file_name}', "wb") as code:
            code.write(r.content)
            print('下载文件%s成功'%(file_name))

解压缩文件

import tarfile
import os
def untar(fname, dirs):
    t = tarfile.open(fname)
    t.extractall(path = dirs)
 
# print(os.listdir('tar_gz'))
# t=tarfile.open('tar_gz/1929.tar.gz')
# t.extractall(path='tar_gz/1929')
 
def _decomp_tar_gz():
    # 遍历压缩包文件夹,获取的是所有压缩包文件名的list
    for tar_gz_file in os.listdir('tar_gz'):
        # print(tar_gz_file)
        # 把压缩的文件名使用.分割,取第一个元素作为解压缩文件的文件夹,例如文件2021.tar.gz则全部解压缩到文件夹tar_gz/2021下
        tar_gz_dir = tar_gz_file.split('.')[0]
        if os.path.isfile(f'tar_gz/{tar_gz_file}'):
            untar(f'tar_gz/{tar_gz_file}',f'tar_gz/{tar_gz_dir}')
_decomp_tar_gz()

PostgreSQL 数据表创建

字段注释来源于参考文献1

DROP TABLE IF EXISTS noaadata;
CREATE TABLE noaadata (
  station char(11) NOT NULL ,
  date date NOT NULL ,
  latitude float DEFAULT NULL ,
  longitude float DEFAULT NULL ,
  elevation  float DEFAULT NULL ,
  name  varchar(50) DEFAULT NULL ,
  temp float DEFAULT NULL ,
  temp_attributes int DEFAULT NULL ,
  dewp float DEFAULT NULL ,
  dewp_attributes int DEFAULT NULL ,
  slp float DEFAULT NULL ,
  slp_attributes int DEFAULT NULL ,
  stp float DEFAULT NULL ,
  stp_attributes int DEFAULT NULL ,
  visib float DEFAULT NULL ,
  visib_attributes int DEFAULT NULL ,
  wdsp float DEFAULT NULL ,
  wdsp_attributes int DEFAULT NULL ,
  mxspd float DEFAULT NULL ,
  gust float DEFAULT NULL ,
  max float DEFAULT NULL ,
  max_attributes varchar(10) DEFAULT NULL ,
  min float DEFAULT NULL ,
  min_attributes varchar(10) DEFAULT NULL ,
  prcp float DEFAULT 0 ,
  prcp_attributes char(1) DEFAULT NULL ,
  sndp float DEFAULT NULL ,
  frshtt char(6) DEFAULT '000000' ,
  PRIMARY KEY (station, date)
) ;
COMMENT ON TABLE noaadata IS '气象站点数据表';

-- 添加注释到列


  COMMENT ON COLUMN noaadata.station IS '站点ID';
  COMMENT ON COLUMN noaadata.date  IS '日期';
  COMMENT ON COLUMN noaadata.latitude IS '纬度';
  COMMENT ON COLUMN noaadata.longitude IS '经度';
  COMMENT ON COLUMN noaadata.elevation  IS '海拔';
  COMMENT ON COLUMN noaadata.name  IS '城市英文名';
  COMMENT ON COLUMN noaadata.temp IS '平均温度';
  COMMENT ON COLUMN noaadata.temp_attributes IS '计算平均温度的观测次数';
  COMMENT ON COLUMN noaadata.dewp IS '平均露点';
  COMMENT ON COLUMN noaadata.dewp_attributes IS '计算平均露点的观测次数';
  COMMENT ON COLUMN noaadata.slp IS '海平面压力';
  COMMENT ON COLUMN noaadata.slp_attributes IS '计算海平面压力的观测次数';
  COMMENT ON COLUMN noaadata.stp IS '当天平均站压';
  COMMENT ON COLUMN noaadata.stp_attributes IS '平均站压的观测次数';
  COMMENT ON COLUMN noaadata.visib IS '能见度';
  COMMENT ON COLUMN noaadata.visib_attributes IS '能见度观测次数';
  COMMENT ON COLUMN noaadata.wdsp IS '平均风速';
  COMMENT ON COLUMN noaadata.wdsp_attributes IS '计算平均风速的观测次数';
  COMMENT ON COLUMN noaadata.mxspd IS '最大持续风速';
  COMMENT ON COLUMN noaadata.gust IS '最大阵风';
  COMMENT ON COLUMN noaadata.max IS '最高气温';
  COMMENT ON COLUMN noaadata.max_attributes IS '空白表示最高明确温度而非小时数据';
  COMMENT ON COLUMN noaadata.min IS '最低气温';
  COMMENT ON COLUMN noaadata.min_attributes IS '空白表示最低明确温度而非小时数据';
  COMMENT ON COLUMN noaadata.prcp IS '降雨量';
  COMMENT ON COLUMN noaadata.prcp_attributes IS '取值ABCDEFGH分别代表A 1-6小时 B 2-6小时 C 3-6小时 D 4-6小时 E 1-12小时 F 2-12小时 G 1-24小时即全天 HI不完整降雨记录 一般值为G';
  COMMENT ON COLUMN noaadata.sndp IS '降雪量';
  COMMENT ON COLUMN noaadata.frshtt IS '发生气象事件的概率默认0代表未发生1代表发生,六位分别代表Fog雾Rain雨Snow雪Hail冰雹Thunder雷电Tornado龙卷风';
  
  
 --- 创建一个新表,保存站点经纬度对应的省市县数据 
DROP TABLE IF EXISTS stationinfo;
CREATE TABLE stationinfo (
  station_id char(11) NOT NULL ,
  name varchar(50) NULL ,
  latitude float DEFAULT NULL ,
  longitude float DEFAULT NULL ,
  country varchar(20) DEFAULT NULL ,
  province varchar(20) DEFAULT NULL ,
  city varchar(20) DEFAULT NULL ,
  district varchar(20) DEFAULT NULL ,
  PRIMARY KEY (station_id)
) ;

COMMENT ON TABLE stationinfo IS '气象站点行政区划';

COMMENT ON COLUMN stationinfo.station_id IS '站点ID';
COMMENT ON COLUMN stationinfo.name  IS '城市英文名';
COMMENT ON COLUMN stationinfo.latitude IS '纬度';
COMMENT ON COLUMN stationinfo.longitude IS '经度';
COMMENT ON COLUMN stationinfo.country IS '国家';
COMMENT ON COLUMN stationinfo.province IS '省';
COMMENT ON COLUMN stationinfo.city IS '城市';
COMMENT ON COLUMN stationinfo.district IS '县';

插入数据到NOAADATA表中

根据参考文献记录,NOAA全球气象数据csv文件名5为前缀的是中国区域的数据,插入数据只处理这些文件中的数据。

import csv
import os

import psycopg2
 
# 打开数据库连接
db = psycopg2.connect(host='localhost',
                      port='5432',
                      user='postgres',
                      password='postgres',
                      database='postgres')
 
# 创建游标对象

cursor = db.cursor()
 
# 插入数据库函数,传递参数为csv文件路径
def insert_data_from_csv(csv_file):
    f = csv.reader(open(csv_file,'r'))
    for i in f:
        # csv第一行是字段名,排除掉
        if i[0] == "STATION":
            continue
        sql = 'insert into noaadata values(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
        # print(i)
        # 插入到数据库
        try:
            cursor.execute(sql,list(i[n] for n in range(28)))
            
        except:
            print('插入数据错误')
       
# 遍历接压缩后的文件夹,获取csv文件,如果符合条件则把文件名作为参数传递给函数insert_data_from_csv插入数据
def insert_data():
    # 遍历文件夹tar_gz
    for tar_gz_dir in os.listdir('tar_gz'):
        # 如果是文件夹则继续遍历即只遍历下面的文件夹,排除压缩文件
        if os.path.isdir(f'tar_gz/{tar_gz_dir}'):
            # 排除压缩文件继续遍历文件夹,下面的遍历文件列表为csv文件
            for tar_gz_file in os.listdir(f'tar_gz/{tar_gz_dir}'):
                # 切割文件,如果csv文件是以5开头则代表是中国的气象站,则调用插入函数插入,否则不处理
                if f'tar_gz/{tar_gz_dir}/{tar_gz_file}'.split('/')[2][0] == '5':
                    print('是中国的气象站网数据库插入数据%s'%(f'tar_gz/{tar_gz_dir}/{tar_gz_file}'))
                    insert_data_from_csv(f'tar_gz/{tar_gz_dir}/{tar_gz_file}')
                    # 提交事务
                    db.commit()
                else:
                    print('外国气象站数据不处理')
insert_data();
 
# 释放游标及数据库连接
cursor.close()
db.close()

Postgres会拒绝文件中经纬度为空的数据插入,报“插入数据错误”的提示。对于里面数据为0的数据,会插入,需要自行删除

delete FROM noaadata  where latitude<0.1 OR longitude <0.1 ;

更新站点信息表

根据站点的经纬度信息,利用地图 WEB API提供的经纬度信息,可以解析出其所在的省、市、县。目前高德地图和百度地图对该服务采取了并发量和日调用量的显示,二者的并发限制均为30QPS,即每秒30次,日调用量高德为5000次/天,百度为300次/天。站点的信息大约600多个,因此,本方案选择了高德地图的API。

import json
import requests
 
# 高德API服务的Key
API_KEY = '你自己的Key'
 
# 逆地址解析接口
REVERSE_GEO_API = 'https://restapi.amap.com/v3/geocode/regeo'
 
# 调用逆地址解析服务
def reverse_geocode(lat: float, lng: float):
    params = {
        'key': API_KEY,
        'location': f'{lng},{lat}',  # 格式为'经度,纬度'
    }
    response = requests.get(REVERSE_GEO_API, params=params)
    if response.status_code == 200:
        data = json.loads(response.text)
        if data['status'] == '1':
            # 解析数据并返回结果
            print(data)
            address=data['regeocode']['addressComponent']
            return {
               item: address[item] for item in ['country', 'province', 'city', 'district']
                }
        else:
            return 'Error: ' + data['info']
    else:
        return 'Error: 请求失败'
 
# 示例使用
# 北京天安门的经纬度
address = reverse_geocode(36.55,106.15)
print(address)

由于每秒30次的并发限制,我写代码时加入了Sleep函数,限制每秒的并发次数。 插入数据时利用DISTINCT ON函数对重复站点进行了过滤,保证只插入一个站点。

import psycopg2
import time
 
# 打开数据库连接


def insert_station_info():
    db = psycopg2.connect(host='localhost',
                      port='5432',
                      user='postgres',
                      password='postgres',
                      database='postgres')
 
    # 创建游标对象

    cursor = db.cursor()
    #db = pymysql.connect(host=HOST, user=USER, passwd=PASSWORD, db=DB, charset=CHARSET)
    #cursor = db.cursor()
    # 查找数据库,如果station重复则保留一个结果
    station_number = cursor.execute('SELECT DISTINCT ON (station) * FROM noaadata ORDER BY station')
    # 所有查询结果
    station_result = cursor.fetchall()
    # 遍历所有结果,然后把数据插入info表,其中字段station_id name latitude longitude从原始表data取值不修改
    # 字段country province city district分别代表国家,省份,城市,县或区级信息是通过传递经纬度调用百度api取到的
    for i in station_result: 
        #print(baidu_api.get_location(i[2],i[3]))
        sql = 'insert into stationinfo values(%s, %s, %s, %s, %s, %s, %s, %s)'
        values = [i[0],i[5],i[2],i[3],reverse_geocode(i[2],i[3])['country'],reverse_geocode(i[2],i[3])['province'],reverse_geocode(i[2],i[3])['city'],reverse_geocode(i[2],i[3])['district']]
        print(values)
        time.sleep(0.1)
        cursor.execute(sql,values)
        db.commit()
    # 释放游标及数据库连接
    cursor.close()
    db.close()

insert_station_info()

引用文献:
[1] Python 获得NOAA全球开放气象数据
[2] NATIONAL CENTERS FOR ENVIRONMENTAL INFORMATION GLOBAL SURFACE SUMMARY OF DAY DATA (GSOD) (OVER 9000 WORLDWIDE STATIONS)
[3] Python数据库操作【四】—— PostgreSQL