import os
import importlib
from pathlib import Path
import numpy as np
import operational_analysis.toolkits.met_data_processing as met
from operational_analysis.types import timeseries_table
from operational_analysis.toolkits import reanalysis_downloading
[docs]class ReanalysisData(object):
"""
This class houses the different reanalysis data products and their related functions
for use in the PRUF OA code. ReanalysisData holds an array of TimeseriesTable in the _product attribute.
The keys (names) of these attributes can be found in the _products attribute.
"""
def __init__(self, engine="pandas"):
self._products = ["merra2", "ncep2", "erai", "era5"]
self._engine = engine
self._product = {}
for product in self._products:
self._product[product] = timeseries_table.TimeseriesTable.factory(engine)
if engine == "spark":
self._sql = importlib.import_module("pyspark.sql")
self._pyspark = importlib.import_module("pyspark")
self._sc = self._pyspark.SparkContext.getOrCreate()
self._sqlContext = self._sql.SQLContext.getOrCreate(self._sc)
[docs] def load(self, path, name, format="csv", lat=None, lon=None, **kwargs):
"""
This method loads reanalysis data into the ReanalysisData object using one of two formats. When format is
"csv", reanalysis data will be imported from csv files in the specified path with file name "{name}_{product}.
csv", where product is the reanalysis product name. When format is "planetos", data will be imported from csv
files if they exist in the specified path. Otherwise, reanalysis data will be downloaded via the PlanetOS API,
saved as csv files in the specified path, and then loaded into the ReanalysisData object. Wind speed, wind
direction, and air density variables are also derived from the reanalysis data when using the "planetos" format
Note that only "merra2" and "era5" reanalysis products are supported when using the "planetos" format.
Args:
path (:obj:`string`): Path where reanalysis files are or will be saved
name (:obj:`string`): String used as a prefix when forming reanalysis data file names (e.g., the
project name).
format (:obj:`string`, optional): Format of the reanalysis data. If "csv", data will be imported from
existing csv files. If "planetos", data will be downloaded via the PlanetOS API. Defaults to "csv".
lat (:obj:`float`, optional): Latitude (degrees). Used when format = "planetos". Defaults to None.
lon (:obj:`float`, optional): Longitude (degrees). Used when format = "planetos". Defaults to None.
**kwargs: Optional keyword arguments passed to
:py:func:`operational_analysis.toolkits.reanalysis_downloading.download_reanalysis_data_planetos`
Raises:
NotImplementedError: When a format other than "csv" or "planetos" is specified
NotImplementedError: When the ReanalysisData object's engine is specified as "spark"
"""
if self._engine == "pandas":
if format == "csv":
for product in self._products:
self._product[product].load(path, "{}_{}".format(name, product))
elif format == "planetos":
for product in list(set(self._products) & set(("merra2", "era5"))):
# Download from PlanetOS if csv file doesn't already exist
if not (Path(path) / f"{name}_{product}.csv").exists():
reanalysis_downloading.download_reanalysis_data_planetos(
product,
lat=lat,
lon=lon,
calc_derived_vars=True,
save_pathname=path,
save_filename=f"{name}_{product}",
**kwargs,
)
self._product[product].load(path, "{}_{}".format(name, product))
else:
raise NotImplementedError(
'Not a valid format. Allowable formats are "csv" and "planetos".'
)
if self._engine == "spark":
raise NotImplementedError("Spark version of this function is not yet implemented")
[docs] def compute_derived_variables(
self,
products=None,
u_col="u_ms",
v_col="v_ms",
temperature_col="temperature_K",
surf_pres_col="surf_pres_Pa",
):
"""
This method computes the derived variables wind speed, wind direction, and air density from reanalysis
variables including the u and v wind speed components, temperature, and surface pressure for each reanalysis
data product loaded in the ReanalysisData object. The derived variables are added as columns to the reanalysis
product dataframes.
Args:
products (:obj:`list`, optional): List of reanalysis products to compute derived variables for. If none are
specified, all products for which dataframes exist will be used. Defaults to None.
u_col (:obj:`string`, optional): Name of dataframe column containing eastward wind component in m/s.
Defaults to "u_ms".
v_col (:obj:`string`, optional): Name of dataframe column containing northward wind component in m/s.
Defaults to "v_ms".
temperature_col (:obj:`string`, optional): Name of dataframe column containing temperature in Kelvins.
Defaults to "temperature_K".
surf_pres_col (:obj:`string`, optional): Latitude (degrees). Name of dataframe column containing surface
pressure in Pascals. Defaults to "surf_pres_Pa".
Raises:
NotImplementedError: When the ReanalysisData object's engine is specified as "spark"
"""
if products is None:
# By default, process reanalysis products for which a dataframe exists
products = [p for p in self._products if self._product[p].df is not None]
if self._engine == "pandas":
for product in products:
self._product[product].df["windspeed_ms"] = np.sqrt(
self._product[product].df[u_col] ** 2 + self._product[product].df[v_col] ** 2
)
self._product[product].df["winddirection_deg"] = met.compute_wind_direction(
self._product[product].df[u_col], self._product[product].df[v_col]
)
self._product[product].df["rho_kgm-3"] = met.compute_air_density(
self._product[product].df[temperature_col],
self._product[product].df[surf_pres_col],
)
if self._engine == "spark":
raise NotImplementedError("Spark version of this function is not yet implemented")
def save(self, path, name):
if self._engine == "pandas":
for product, table in self._product.items():
table.save(path, "{}_{}".format(name, product))
if self._engine == "spark":
raise NotImplementedError("Spark version of this function is not yet implemented")
def rename_columns(self, mapping):
for k in list(mapping.keys()):
if k != mapping[k]:
self._reanalysis[k] = self._reanalysis[mapping[k]]
self._reanalysis[mapping[k]] = None
def head(self):
return self._reanalysis.head()