# PrufPlantAnalysis
#
# This class defines key analytical routines for the PRUF/WRA Benchmarking
# standard operational assessment.
#
# The PrufPlantAnalysis object is a factory which instantiates either the Pandas, Dask, or Spark
# implementation depending on what the user prefers.
#
# The resulting object is loaded as a plugin into each PlantData object.
import random
import numpy as np
import pandas as pd
import statsmodels.api as sm
from tqdm import tqdm
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold
from operational_analysis import logging, logged_method_call
from operational_analysis.types import timeseries_table
from operational_analysis.toolkits import filters
from operational_analysis.toolkits import timeseries as tm
from operational_analysis.toolkits import unit_conversion as un
from operational_analysis.toolkits import met_data_processing as mt
from operational_analysis.toolkits.machine_learning_setup import MachineLearningSetup
logger = logging.getLogger(__name__)
[docs]def get_annual_values(data):
"""
This function returns annual summations of values in a pandas Series (or each column of a pandas DataFrame) with a
DatetimeIndex index starting from the first row. The purpose of the function is to correctly resample to annual
values when the first index does not fall on the beginning of the month.
Args:
data(:obj:`pandas.Series` or :obj:`pandas.DataFrame`): Input data with a DatetimeIndex index.
Returns:
:obj:`numpy.ndarray`: Array containing annual summations for each column of the input data.
"""
# shift time index to beginning of first month so resampling by 'MS' groups the data into full years
# starting from the beginning of the time series
ix_start = data.index[0]
month_start = ix_start.floor("d") + pd.offsets.MonthEnd(0) - pd.offsets.MonthBegin(1)
data.index = data.index - pd.Timedelta(ix_start - month_start)
return data.resample("12MS").sum().values
[docs]class MonteCarloAEP(object):
"""
A serial (Pandas-driven) implementation of the benchmark PRUF operational
analysis implementation. This module collects standard processing and
analysis methods for estimating plant level operational AEP and uncertainty.
The preprocessing should run in this order:
1. Process revenue meter energy - creates monthly/daily data frame, gets revenue meter on monthly/daily basis, and adds
data flag
2. Process loss estimates - add monthly/daily curtailment and availabilty losses to monthly/daily data frame
3. Process reanalysis data - add monthly/daily density-corrected wind speeds, temperature (if used) and wind direction (if used)
from several reanalysis products to the monthly data frame
4. Set up Monte Carlo - create the necessary Monte Carlo inputs to the OA process
5. Run AEP Monte Carlo - run the OA process iteratively to get distribution of AEP results
The end result is a distribution of AEP results which we use to assess expected AEP and associated uncertainty
"""
@logged_method_call
def __init__(
self,
plant,
reanal_products=["merra2", "ncep2", "erai", "era5"],
uncertainty_meter=0.005,
uncertainty_losses=0.05,
uncertainty_windiness=(10, 20),
uncertainty_loss_max=(10, 20),
outlier_detection=False,
uncertainty_outlier=(1, 3),
uncertainty_nan_energy=0.01,
time_resolution="M",
end_date_lt=None,
reg_model="lin",
ml_setup_kwargs={},
reg_temperature=False,
reg_winddirection=False,
):
"""
Initialize APE_MC analysis with data and parameters.
Args:
plant(:obj:`PlantData object`): PlantData object from which PlantAnalysis should draw data.
reanal_products(obj:`list`) : List of reanalysis products to use for Monte Carlo sampling. Defaults to ["merra2", "ncep2", "erai"].
uncertainty_meter(:obj:`float`): uncertainty on revenue meter data
uncertainty_losses(:obj:`float`): uncertainty on long-term losses
uncertainty_windiness(:obj:`tuple`): number of years to use for the windiness correction
uncertainty_loss_max(:obj:`tuple`): threshold for the combined availabilty and curtailment monthly loss threshold
outlier_detection(:obj:`bool`): whether to perform (True) or not (False - default) outlier detection filtering
uncertainty_outlier(:obj:`tuple`): min and max thresholds (Monte-Carlo sampled) for the outlier detection filter. At monthly resolution, this is the tuning constant for Huber’s t function for a robust linear regression. At daily/hourly resolution, this is the number of stdev of wind speed used as threshold for the bin filter.
uncertainty_nan_energy(:obj:`float`): threshold to flag days/months based on NaNs
time_resolution(:obj:`string`): whether to perform the AEP calculation at monthly ('M'), daily ('D') or hourly ('H') time resolution
end_date_lt(:obj:`string` or :obj:`pandas.Timestamp`): The last date to use for the long-term correction. Note that only the component of the date corresponding to the time_resolution argument is considered. If None, the end of the last complete month of reanalysis data will be used.
reg_model(:obj:`string`): which model to use for the regression ('lin' for linear, 'gam', 'gbm', 'etr'). At monthly time resolution only linear regression is allowed because of the reduced number of data points.
ml_setup_kwargs(:obj:`kwargs`): keyword arguments to MachineLearningSetup class
reg_temperature(:obj:`bool`): whether to include temperature (True) or not (False) as regression input
reg_winddirection(:obj:`bool`): whether to include wind direction (True) or not (False) as regression input
"""
logger.info("Initializing MonteCarloAEP Analysis Object")
self._aggregate = timeseries_table.TimeseriesTable.factory(plant._engine)
self._plant = plant # defined at runtime
self._reanal_products = reanal_products # set of reanalysis products to use
# Memo dictionaries help speed up computation
self.outlier_filtering = {} # Combinations of outlier filter results
self.long_term_sampling = {} # Combinations of long-term reanalysis data sampling
self.opt_model = {} # Optimized ML model hyperparameters for each reanalysis product
# Define relevant uncertainties, data ranges and max thresholds to be applied in Monte Carlo sampling
self.uncertainty_meter = np.float64(uncertainty_meter)
self.uncertainty_losses = np.float64(uncertainty_losses)
self.uncertainty_windiness = np.array(uncertainty_windiness, dtype=np.float64)
self.uncertainty_outlier = np.array(uncertainty_outlier, dtype=np.float64)
self.uncertainty_loss_max = np.array(uncertainty_loss_max, dtype=np.float64)
self.uncertainty_nan_energy = np.float64(uncertainty_nan_energy)
self.outlier_detection = outlier_detection
# Check that selected time resolution is allowed
if time_resolution not in ["M", "D", "H"]:
raise ValueError(
"time_res has to either be M (monthly, default) or D (daily) or H (hourly)"
)
self.time_resolution = time_resolution
self._resample_freq = {"M": "MS", "D": "D", "H": "H"}[self.time_resolution]
self._hours_in_res = {"M": 30 * 24, "D": 1 * 24, "H": 1}[self.time_resolution]
self._calendar_samples = {"M": 12, "D": 365, "H": 365 * 24}[self.time_resolution]
self.num_days_lt = (31, 28.25, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
if end_date_lt is not None:
self.end_date_lt = pd.to_datetime(end_date_lt).replace(minute=0) # drop minute field
else:
self.end_date_lt = end_date_lt
# Check that choices for regression inputs are allowed
if reg_temperature not in [True, False]:
raise ValueError(
"reg_temperature has to either be True (if temperature is considered in the regression), or False (if temperature is omitted"
)
if reg_winddirection not in [True, False]:
raise ValueError(
"reg_winddirection has to either be True (if wind direction is considered in the regression), or False (if wind direction is omitted"
)
self.reg_winddirection = reg_winddirection
self.reg_temperature = reg_temperature
# Build list of regression variables
self._rean_vars = []
if self.reg_temperature:
self._rean_vars += ["temperature_K"]
if self.reg_winddirection:
self._rean_vars += ["u_ms", "v_ms"]
# Check that selected regression model is allowed
if reg_model not in ["lin", "gbm", "etr", "gam"]:
raise ValueError(
"reg_model has to either be lin (Linear regression, default), gbm (Gradient boosting model), etr (Extra trees regressor) or gam (Generalized additive model)"
)
self.reg_model = reg_model
self.ml_setup_kwargs = ml_setup_kwargs
# Monthly data can only use robust linear regression because of limited number of data
if (time_resolution == "M") & (reg_model != "lin"):
raise ValueError("For monthly time resolution, only linear regression is allowed!")
# Run preprocessing step
self.calculate_aggregate_dataframe()
# Store start and end of period of record
self._start_por = self._aggregate.df.index.min()
self._end_por = self._aggregate.df.index.max()
# Create a data frame to store monthly/daily reanalysis data over plant period of record
self._reanalysis_por = self._aggregate.df.loc[
(self._aggregate.df.index >= self._start_por)
& (self._aggregate.df.index <= self._end_por)
]
@logged_method_call
def run(self, num_sim, reanal_subset=None):
"""
Perform pre-processing of data into an internal representation for which the analysis can run more quickly.
Args:
reanal_subset(:obj:`list`): list of str data indicating which reanalysis products to use in OA
num_sim(:obj:`int`): number of simulations to perform
Returns:
None
"""
self.num_sim = num_sim
if reanal_subset is None:
self.reanal_subset = self._reanal_products
else:
self.reanal_subset = reanal_subset
# Write parameters of run to the log file
logged_self_params = [
"uncertainty_meter",
"uncertainty_losses",
"uncertainty_loss_max",
"uncertainty_windiness",
"uncertainty_nan_energy",
"num_sim",
"reanal_subset",
]
logged_params = {name: getattr(self, name) for name in logged_self_params}
logger.info("Running with parameters: {}".format(logged_params))
# Start the computation
self.calculate_long_term_losses()
self.setup_monte_carlo_inputs()
self.results = self.run_AEP_monte_carlo()
# Log the completion of the run
logger.info("Run completed")
[docs] def plot_reanalysis_normalized_rolling_monthly_windspeed(self):
"""
Make a plot of annual average wind speeds from reanalysis data to show general trends for each
Highlight the period of record for plant data
Returns:
matplotlib.pyplot object
"""
import matplotlib.pyplot as plt
project = self._plant
# Define parameters needed for plot
min_val = 1 # Default parameter providing y-axis minimum for shaded plant POR region
max_val = 1 # Default parameter providing y-axis maximum for shaded plant POR region
por_start = self._aggregate.df.index[0] # Start of plant POR
por_end = self._aggregate.df.index[-1] # End of plant POR
plt.figure(figsize=(14, 6))
for key in self._reanal_products:
rean_df = project._reanalysis._product[key].df # Set reanalysis product
ann_mo_ws = (
rean_df.resample("MS")["ws_dens_corr"].mean().to_frame()
) # Take monthly average wind speed
ann_roll = ann_mo_ws.rolling(12).mean() # Calculate rolling 12-month average
ann_roll_norm = (
ann_roll["ws_dens_corr"] / ann_roll["ws_dens_corr"].mean()
) # Normalize rolling 12-month average
# Update min_val and max_val depending on range of data
if ann_roll_norm.min() < min_val:
min_val = ann_roll_norm.min()
if ann_roll_norm.max() > max_val:
max_val = ann_roll_norm.max()
# Plot wind speed
plt.plot(ann_roll_norm, label=key)
# Plot dotted line at y=1 (i.e. average wind speed)
plt.plot((ann_roll.index[0], ann_roll.index[-1]), (1, 1), "k--")
# Fill in plant POR region
plt.fill_between(
[por_start, por_end],
[min_val, min_val],
[max_val, max_val],
alpha=0.1,
label="Plant POR",
)
# Final touches to plot
plt.xlabel("Year")
plt.ylabel("Normalized wind speed")
plt.legend()
plt.tight_layout()
return plt
[docs] def plot_reanalysis_gross_energy_data(self, outlier_thres):
"""
Make a plot of gross energy vs wind speed for each reanalysis product,
with outliers highlighted
Args:
outlier_thres (float): outlier threshold (typical range of 1 to 4) which adjusts outlier sensitivity detection
Returns:
matplotlib.pyplot object
"""
import matplotlib.pyplot as plt
valid_aggregate = self._aggregate.df
plt.figure(figsize=(9, 9))
# Loop through each reanalysis product and make a scatterplot of monthly wind speed vs plant energy
for p in np.arange(0, len(list(self._reanal_products))):
col_name = self._reanal_products[p] # Reanalysis column in monthly data frame
# Plot
plt.subplot(2, 2, p + 1)
if (
self.time_resolution == "M"
): # Monthly case: apply robust linear regression for outliers detection
x = sm.add_constant(
valid_aggregate[col_name]
) # Define 'x'-values (constant needed for regression function)
y = (
valid_aggregate["gross_energy_gwh"] * 30 / valid_aggregate["num_days_expected"]
) # Normalize energy data to 30-days
rlm = sm.RLM(
y, x, M=sm.robust.norms.HuberT(t=outlier_thres)
) # Robust linear regression with HuberT algorithm (threshold equal to outlier_thres)
rlm_results = rlm.fit()
r2 = np.corrcoef(
x.loc[rlm_results.weights == 1, col_name], y[rlm_results.weights == 1]
)[
0, 1
] # Get R2 from valid data
# Continue plotting
plt.plot(
x.loc[rlm_results.weights != 1, col_name],
y[rlm_results.weights != 1],
"rx",
label="Outlier",
)
plt.plot(
x.loc[rlm_results.weights == 1, col_name],
y[rlm_results.weights == 1],
".",
label="Valid data",
)
plt.title(col_name + ", R2=" + str(np.round(r2, 3)))
plt.ylabel("30-day normalized gross energy (GWh)")
else: # Daily/hourly case: apply bin filter for outliers detection
x = valid_aggregate[col_name]
y = valid_aggregate["gross_energy_gwh"]
plant_capac = self._plant._plant_capacity / 1000.0 * self._hours_in_res
# Apply bin filter
flag = filters.bin_filter(
bin_col=y,
value_col=x,
bin_width=0.06 * plant_capac,
threshold=outlier_thres, # wind bin threshold (stdev outside the median)
center_type="median",
bin_min=0.01 * plant_capac,
bin_max=0.85 * plant_capac,
threshold_type="std",
direction="all", # both left and right (from the median)
)
# Continue plotting
plt.plot(
x.loc[flag],
y[flag],
"rx",
label="Outlier",
)
plt.plot(
x.loc[~flag],
y[~flag],
".",
label="Valid data",
)
if self.time_resolution == "D":
plt.ylabel("Daily gross energy (GWh)")
elif self.time_resolution == "H":
plt.ylabel("Hourly gross energy (GWh)")
plt.title(col_name)
plt.xlabel("Wind speed (m/s)")
plt.tight_layout()
return plt
[docs] def plot_result_aep_distributions(self):
"""
Plot a distribution of AEP values from the Monte-Carlo OA method
Returns:
matplotlib.pyplot object
"""
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(14, 12))
sim_results = self.results
ax = fig.add_subplot(2, 2, 1)
ax.hist(sim_results["aep_GWh"], 40, density=1)
ax.text(
0.05,
0.9,
"AEP mean = " + str(np.round(sim_results["aep_GWh"].mean(), 1)) + " GWh/yr",
transform=ax.transAxes,
)
ax.text(
0.05,
0.8,
"AEP unc = "
+ str(np.round(sim_results["aep_GWh"].std() / sim_results["aep_GWh"].mean() * 100, 1))
+ "%",
transform=ax.transAxes,
)
plt.xlabel("AEP (GWh/yr)")
ax = fig.add_subplot(2, 2, 2)
ax.hist(sim_results["avail_pct"] * 100, 40, density=1)
ax.text(
0.05,
0.9,
"Mean = " + str(np.round((sim_results["avail_pct"].mean()) * 100, 1)) + " %",
transform=ax.transAxes,
)
plt.xlabel("Availability Loss (%)")
ax = fig.add_subplot(2, 2, 3)
ax.hist(sim_results["curt_pct"] * 100, 40, density=1)
ax.text(
0.05,
0.9,
"Mean: " + str(np.round((sim_results["curt_pct"].mean()) * 100, 2)) + " %",
transform=ax.transAxes,
)
plt.xlabel("Curtailment Loss (%)")
plt.tight_layout()
return plt
[docs] def plot_aep_boxplot(self, param, lab):
"""
Plot box plots of AEP results sliced by a specified Monte Carlo parameter
Args:
param(:obj:`list`): The Monte Carlo parameter on which to split the AEP results
lab(:obj:`str`): The name to use for the parameter when producing the figure
Returns:
(none)
"""
import matplotlib.pyplot as plt
sim_results = self.results
tmp_df = pd.DataFrame(data={"aep": sim_results.aep_GWh, "param": param})
tmp_df.boxplot(column="aep", by="param", figsize=(8, 6))
plt.ylabel("AEP (GWh/yr)")
plt.xlabel(lab)
plt.title("AEP estimates by %s" % lab)
plt.suptitle("")
plt.tight_layout()
return plt
[docs] def plot_aggregate_plant_data_timeseries(self):
"""
Plot timeseries of monthly/daily gross energy, availability and curtailment
Returns:
matplotlib.pyplot object
"""
import matplotlib.pyplot as plt
valid_aggregate = self._aggregate.df
plt.figure(figsize=(12, 9))
# Gross energy
plt.subplot(2, 1, 1)
plt.plot(valid_aggregate.gross_energy_gwh, ".-")
plt.grid("on")
plt.xlabel("Year")
plt.ylabel("Gross energy (GWh)")
# Availability and curtailment
plt.subplot(2, 1, 2)
plt.plot(valid_aggregate.availability_pct * 100, ".-", label="Availability")
plt.plot(valid_aggregate.curtailment_pct * 100, ".-", label="Curtailment")
plt.grid("on")
plt.xlabel("Year")
plt.ylabel("Loss (%)")
plt.legend()
plt.tight_layout()
return plt
@logged_method_call
def groupby_time_res(self, df):
"""
Group pandas dataframe based on the time resolution chosen in the calculation.
Args:
df(:obj:`dataframe`): dataframe that needs to be grouped based on time resolution used
Returns:
None
"""
if self.time_resolution == "M":
df_grouped = df.groupby(df.index.month).mean()
elif self.time_resolution == "D":
df_grouped = df.groupby([(df.index.month), (df.index.day)]).mean()
elif self.time_resolution == "H":
df_grouped = df.groupby([(df.index.month), (df.index.day), (df.index.hour)]).mean()
return df_grouped
@logged_method_call
def calculate_aggregate_dataframe(self):
"""
Perform pre-processing of the plant data to produce a monthly/daily data frame to be used in AEP analysis.
Args:
(None)
Returns:
(None)
"""
# Average to monthly/daily, quantify NaN data
self.process_revenue_meter_energy()
# Average to monthly/daily, quantify NaN data, merge with revenue meter energy data
self.process_loss_estimates()
# Density correct wind speeds, process temperature and wind direction, average to monthly/daily
self.process_reanalysis_data()
# Remove first and last reporting months if only partial month reported
# (only for monthly time resolution calculations)
if self.time_resolution == "M":
self.trim_monthly_df()
# Drop any data that have NaN gross energy values or NaN reanalysis data
self._aggregate.df = self._aggregate.df.dropna(
subset=["gross_energy_gwh"] + [product for product in self._reanal_products]
)
@logged_method_call
def process_revenue_meter_energy(self):
"""
Initial creation of monthly data frame:
1. Populate monthly/daily data frame with energy data summed from 10-min QC'd data
2. For each monthly/daily value, find percentage of NaN data used in creating it and flag if percentage is
greater than 0
Args:
(None)
Returns:
(None)
"""
df = getattr(self._plant, "meter").df # Get the meter data frame
# Create the monthly/daily data frame by summing meter energy
self._aggregate.df = (
df.resample(self._resample_freq)["energy_kwh"].sum() / 1e6
).to_frame() # Get monthly energy values in GWh
self._aggregate.df.rename(
columns={"energy_kwh": "energy_gwh"}, inplace=True
) # Rename kWh to MWh
# Determine how much 10-min data was missing for each year-month/daily energy value. Flag accordigly if any is missing
self._aggregate.df["energy_nan_perc"] = df.resample(self._resample_freq)[
"energy_kwh"
].apply(
tm.percent_nan
) # Get percentage of meter data that were NaN when summing to monthly/daily
if self.time_resolution == "M":
# Create a column with expected number of days per month (to be used when normalizing to 30-days for regression)
days_per_month = (pd.Series(self._aggregate.df.index)).dt.daysinmonth
days_per_month.index = self._aggregate.df.index
self._aggregate.df["num_days_expected"] = days_per_month
# Get actual number of days per month in the raw data
# (used when trimming beginning and end of monthly data frame)
# If meter data has higher resolution than monthly
if (self._plant._meter_freq == "1MS") | (self._plant._meter_freq == "1M"):
self._aggregate.df["num_days_actual"] = self._aggregate.df["num_days_expected"]
else:
self._aggregate.df["num_days_actual"] = df.resample("MS")["energy_kwh"].apply(
tm.num_days
)
@logged_method_call
def process_loss_estimates(self):
"""
Append availability and curtailment losses to monthly data frame
Args:
(None)
Returns:
(None)
"""
df = getattr(self._plant, "curtail").df
curt_aggregate = np.divide(
df.resample(self._resample_freq)[["availability_kwh", "curtailment_kwh"]].sum(), 1e6
) # Get sum of avail and curt losses in GWh
curt_aggregate.rename(
columns={"availability_kwh": "availability_gwh", "curtailment_kwh": "curtailment_gwh"},
inplace=True,
)
# Merge with revenue meter monthly/daily data
self._aggregate.df = self._aggregate.df.join(curt_aggregate)
# Add gross energy field
self._aggregate.df["gross_energy_gwh"] = un.compute_gross_energy(
self._aggregate.df["energy_gwh"],
self._aggregate.df["availability_gwh"],
self._aggregate.df["curtailment_gwh"],
"energy",
"energy",
)
# Calculate percentage-based losses
self._aggregate.df["availability_pct"] = np.divide(
self._aggregate.df["availability_gwh"], self._aggregate.df["gross_energy_gwh"]
)
self._aggregate.df["curtailment_pct"] = np.divide(
self._aggregate.df["curtailment_gwh"], self._aggregate.df["gross_energy_gwh"]
)
self._aggregate.df["avail_nan_perc"] = df.resample(self._resample_freq)[
"availability_kwh"
].apply(
tm.percent_nan
) # Get percentage of 10-min meter data that were NaN when summing to monthly/daily
self._aggregate.df["curt_nan_perc"] = df.resample(self._resample_freq)[
"curtailment_kwh"
].apply(
tm.percent_nan
) # Get percentage of 10-min meter data that were NaN when summing to monthly/daily
self._aggregate.df["nan_flag"] = False # Set flag to false by default
self._aggregate.df.loc[
(self._aggregate.df["energy_nan_perc"] > self.uncertainty_nan_energy)
| (self._aggregate.df["avail_nan_perc"] > self.uncertainty_nan_energy)
| (self._aggregate.df["curt_nan_perc"] > self.uncertainty_nan_energy),
"nan_flag",
] = True # If more than 1% of data are NaN, set flag to True
# By default, assume all reported losses are representative of long-term operational
self._aggregate.df["availability_typical"] = True
self._aggregate.df["curtailment_typical"] = True
# By default, assume combined availability and curtailment losses are below the threshold to be considered valid
self._aggregate.df["combined_loss_valid"] = True
@logged_method_call
def process_reanalysis_data(self):
"""
Process reanalysis data for use in PRUF plant analysis:
- calculate density-corrected wind speed and wind components
- get monthly/daily average wind speeds and components
- calculate monthly/daily average wind direction
- calculate monthly/daily average temperature
- append monthly/daily averages to monthly/daily energy data frame
Args:
(None)
Returns:
(None)
"""
# Identify start and end dates for long-term correction
# First find date range common to all reanalysis products and drop minute field of start date
start_date = max(
[self._plant._reanalysis._product[key].df.index.min() for key in self._reanal_products]
).replace(minute=0)
end_date = min(
[self._plant._reanalysis._product[key].df.index.max() for key in self._reanal_products]
)
# Next, update the start date to make sure it corresponds to a full time period
start_date_minus = start_date - pd.DateOffset(hours=1)
if (self.time_resolution == "M") & (start_date.month == start_date_minus.month):
# If not at the beginning of a month, use the beginning of the next month as the start date
start_date = start_date.replace(day=1, hour=0, minute=0) + pd.DateOffset(months=1)
elif (self.time_resolution == "D") & (start_date.day == start_date_minus.day):
# If not at the beginning of a day, use the beginning of the next day as the start date
start_date = start_date.replace(hour=0, minute=0) + pd.DateOffset(days=1)
# Now determine the end date based on either the user-defined end date or the end of the last full month
if self.end_date_lt is not None:
# If valid (before the last full time period in the reanalysis data), use the specified end date
end_date_lt_plus = self.end_date_lt + pd.DateOffset(hours=1)
if (self.time_resolution == "M") & (self.end_date_lt.month == end_date_lt_plus.month):
# If not at the end of a month, use the end of the month as the new end date
self.end_date_lt = (
self.end_date_lt.replace(day=1, hour=0, minute=0)
+ pd.DateOffset(months=1)
- pd.DateOffset(hours=1)
)
elif (self.time_resolution == "D") & (self.end_date_lt.day == end_date_lt_plus.day):
# If not at the end of a day, use the end of the day as the new end date
self.end_date_lt = self.end_date_lt.replace(hour=23, minute=0)
if self.end_date_lt > end_date:
raise ValueError(
"Invalid end date for long-term correction. The end date cannot exceed the last full time period (defined by the time resolution) in the provided reanalysis data."
)
else:
# replace end date
end_date = self.end_date_lt
else:
# If not at the end of a month, use the end of the previous month as the end date
if end_date.month == (end_date + pd.DateOffset(hours=1)).month:
end_date = end_date.replace(day=1, hour=0, minute=0) - pd.DateOffset(hours=1)
# Define empty data frame that spans our period of interest
self._reanalysis_aggregate = pd.DataFrame(
index=pd.date_range(start=start_date, end=end_date, freq=self._resample_freq),
dtype=float,
)
# Check if the date range covers the maximum number of years needed for the windiness correction
start_date_required = (
self._reanalysis_aggregate.index[-1]
+ self._reanalysis_aggregate.index.freq
- pd.offsets.DateOffset(years=self.uncertainty_windiness[1])
)
if self._reanalysis_aggregate.index[0] > start_date_required:
if self.end_date_lt is not None:
raise ValueError(
"Invalid end date argument for long-term correction. This end date does not provide enough reanalysis data for the long-term correction."
)
else:
raise ValueError(
"The date range of the provided reanalysis data is not long enough to perform the long-term correction."
)
# Now loop through the different reanalysis products, density-correct wind speeds, and take monthly averages
for key in self._reanal_products:
rean_df = self._plant._reanalysis._product[key].df
rean_df["ws_dens_corr"] = mt.air_density_adjusted_wind_speed(
rean_df["windspeed_ms"], rean_df["rho_kgm-3"]
) # Density correct wind speeds
self._reanalysis_aggregate[key] = rean_df.resample(self._resample_freq)[
"ws_dens_corr"
].mean() # .to_frame() # Get average wind speed by year-month
if self.reg_winddirection | self.reg_temperature:
namescol = [key + "_" + var for var in self._rean_vars]
self._reanalysis_aggregate[namescol] = (
rean_df[self._rean_vars].resample(self._resample_freq).mean()
)
if self.reg_winddirection: # if wind direction is considered as regression variable
self._reanalysis_aggregate[key + "_wd"] = np.rad2deg(
np.pi
- (
np.arctan2(
-self._reanalysis_aggregate[key + "_u_ms"],
self._reanalysis_aggregate[key + "_v_ms"],
)
)
) # Calculate wind direction
self._aggregate.df = self._aggregate.df.join(
self._reanalysis_aggregate
) # Merge monthly reanalysis data to monthly energy data frame
@logged_method_call
def trim_monthly_df(self):
"""
Remove first and/or last month of data if the raw data had an incomplete number of days
Args:
(None)
Returns:
(None)
"""
for p in self._aggregate.df.index[[0, -1]]: # Loop through 1st and last data entry
if (
self._aggregate.df.loc[p, "num_days_expected"]
!= self._aggregate.df.loc[p, "num_days_actual"]
):
self._aggregate.df.drop(p, inplace=True) # Drop the row from data frame
@logged_method_call
def calculate_long_term_losses(self):
"""
This function calculates long-term availability and curtailment losses based on the reported data grouped by the time resolution,
filtering for those data that are deemed representative of average plant performance.
Args:
(None)
Returns:
(None)
"""
df = self._aggregate.df
# isolate availabilty and curtailment values that are representative of average plant performance
avail_valid = df.loc[df["availability_typical"], "availability_pct"].to_frame()
curt_valid = df.loc[df["curtailment_typical"], "curtailment_pct"].to_frame()
# Now get average percentage losses by month or day
avail_long_term = self.groupby_time_res(avail_valid)["availability_pct"]
curt_long_term = self.groupby_time_res(curt_valid)["curtailment_pct"]
# Ensure there are 12 or 365 data points in long-term average. If not, throw an exception:
if avail_long_term.shape[0] < self._calendar_samples:
raise Exception(
"Not all calendar days/months represented in long-term availability calculation"
)
if curt_long_term.shape[0] < self._calendar_samples:
raise Exception(
"Not all calendar days/months represented in long-term curtailment calculation"
)
self.long_term_losses = (avail_long_term, curt_long_term)
@logged_method_call
def filter_outliers(self, n):
"""
This function filters outliers based on a combination of range filter, unresponsive sensor filter,
and window filter.
We use a memoized funciton to store the regression data in a dictionary for each combination as it
comes up in the Monte Carlo simulation. This saves significant computational time in not having to run
robust linear regression for each Monte Carlo iteration
Args:
n(:obj:`float`): Monte Carlo iteration
Returns:
:obj:`pandas.DataFrame`: Filtered monthly/daily data ready for linear regression
"""
reanal = self._run.reanalysis_product
# Check if valid data has already been calculated and stored. If so, just return it
if (reanal, self._run.loss_threshold) in self.outlier_filtering:
valid_data = self.outlier_filtering[(reanal, self._run.loss_threshold)]
return valid_data
# If valid data hasn't yet been stored in dictionary, determine the valid data
df = self._aggregate.df
# First set of filters checking combined losses and if the Nan data flag was on
df_sub = df.loc[
((df["availability_pct"] + df["curtailment_pct"]) < self._run.loss_threshold)
& (~df["nan_flag"]),
:,
]
# Set maximum range for using bin-filter, convert from MW to GWh
plant_capac = self._plant._plant_capacity / 1000.0 * self._hours_in_res
# Apply range filter to wind speed
df_sub = df_sub.assign(flag_range=filters.range_flag(df_sub[reanal], below=0, above=40))
if self.reg_temperature:
# Apply range filter to temperatre
df_sub = df_sub.assign(
flag_range_T=filters.range_flag(
df_sub[reanal + "_temperature_K"], below=200, above=320
)
)
# Apply window range filter
df_sub.loc[:, "flag_window"] = filters.window_range_flag(
window_col=df_sub[reanal],
window_start=5.0,
window_end=40,
value_col=df_sub["energy_gwh"],
value_min=0.02 * plant_capac,
value_max=1.2 * plant_capac,
)
if self.outlier_detection:
if self.time_resolution == "M":
# Monthly linear regression (i.e., few data points):
# filter outliers based on robust linear regression
# using Huber algorithm to flag outliers
X = sm.add_constant(df_sub[reanal]) # Reanalysis data with constant column
y = (
df_sub["gross_energy_gwh"] * 30 / df_sub["num_days_expected"]
) # Energy data (normalized to 30-days)
# Perform robust linear regression
rlm = sm.RLM(y, X, M=sm.robust.norms.HuberT(self._run.outlier_threshold))
rlm_results = rlm.fit()
# Define valid data as points in which the Huber algorithm returned a value of 1
df_sub.loc[:, "flag_outliers"] = rlm_results.weights != 1
else:
# Daily regressions (i.e., higher number of data points):
# Apply bin filter to catch outliers
df_sub.loc[:, "flag_outliers"] = filters.bin_filter(
bin_col=df_sub["gross_energy_gwh"],
value_col=df_sub[reanal],
bin_width=0.06 * plant_capac,
threshold=self._run.outlier_threshold, # wind bin threshold (multiplicative factor of std of <value_col> in bin)
center_type="median",
bin_min=0.01 * plant_capac,
bin_max=0.85 * plant_capac,
threshold_type="std",
direction="all", # both left and right (from the median)
)
else:
df_sub.loc[:, "flag_outliers"] = False
# Create a 'final' flag which is true if any of the previous flags are true
df_sub.loc[:, "flag_final"] = (
(df_sub.loc[:, "flag_range"])
| (df_sub.loc[:, "flag_window"])
| (df_sub.loc[:, "flag_outliers"])
)
if self.reg_temperature:
df_sub.loc[:, "flag_final"] = (df_sub.loc[:, "flag_final"]) | (
df_sub.loc[:, "flag_range_T"]
)
# Define valid data
valid_data = df_sub.loc[
~df_sub.loc[:, "flag_final"],
[reanal, "energy_gwh", "availability_gwh", "curtailment_gwh"],
]
if self.reg_winddirection:
valid_data_to_add = df_sub.loc[
~df_sub.loc[:, "flag_final"],
[reanal + "_wd", reanal + "_u_ms", reanal + "_v_ms"],
]
valid_data = pd.concat([valid_data, valid_data_to_add], axis=1)
if self.reg_temperature:
valid_data_to_add = df_sub.loc[
~df_sub.loc[:, "flag_final"], [reanal + "_temperature_K"]
]
valid_data = pd.concat([valid_data, valid_data_to_add], axis=1)
if self.time_resolution == "M":
valid_data_to_add = df_sub.loc[~df_sub.loc[:, "flag_final"], ["num_days_expected"]]
valid_data = pd.concat([valid_data, valid_data_to_add], axis=1)
# Update the dictionary
self.outlier_filtering[(reanal, self._run.loss_threshold)] = valid_data
# Return result
return valid_data
@logged_method_call
def set_regression_data(self, n):
"""
This will be called for each iteration of the Monte Carlo simulation and will do the following:
1. Randomly sample monthly/daily revenue meter, availabilty, and curtailment data based on specified uncertainties
and correlations
2. Randomly choose one reanalysis product
3. Calculate gross energy from randomzied energy data
4. Normalize gross energy to 30-day months
5. Filter results to remove months/days with NaN data and with combined losses that exceed the Monte Carlo
sampled max threhold
6. Return the wind speed and normalized gross energy to be used in the regression relationship
Args:
n(:obj:`int`): The Monte Carlo iteration number
Returns:
:obj:`pandas.Series`: Monte-Carlo sampled wind speeds and other variables (temperature, wind direction) if used in the regression
:obj:`pandas.Series`: Monte-Carlo sampled normalized gross energy
"""
# Get data to use in regression based on filtering result
reg_data = self.filter_outliers(n)
# Now monte carlo sample the data
mc_energy = (
reg_data["energy_gwh"] * self._run.metered_energy_fraction
) # Create new Monte-Carlo sampled data frame and sample energy data
mc_availability = (
reg_data["availability_gwh"] * self._run.loss_fraction
) # Calculate MC-generated availability
mc_curtailment = (
reg_data["curtailment_gwh"] * self._run.loss_fraction
) # Calculate MC-generated curtailment
# Calculate gorss energy and normalize to 30-days
mc_gross_energy = mc_energy + mc_availability + mc_curtailment
if self.time_resolution == "M":
num_days_expected = reg_data["num_days_expected"]
mc_gross_norm = (
mc_gross_energy * 30 / num_days_expected
) # Normalize gross energy to 30-day months
else:
mc_gross_norm = mc_gross_energy
# Set reanalysis product
reg_inputs = reg_data[
self._run.reanalysis_product
] # Copy wind speed data to Monte Carlo data frame
if self.reg_temperature: # if temperature is considered as regression variable
mc_temperature = reg_data[
self._run.reanalysis_product + "_temperature_K"
] # Copy temperature data to Monte Carlo data frame
reg_inputs = pd.concat([reg_inputs, mc_temperature], axis=1)
if self.reg_winddirection: # if wind direction is considered as regression variable
mc_wind_direction = reg_data[
self._run.reanalysis_product + "_wd"
] # Copy wind direction data to Monte Carlo data frame
reg_inputs = pd.concat([reg_inputs, np.sin(np.deg2rad(mc_wind_direction))], axis=1)
reg_inputs = pd.concat([reg_inputs, np.cos(np.deg2rad(mc_wind_direction))], axis=1)
reg_inputs = pd.concat([reg_inputs, mc_gross_norm], axis=1)
# Return values needed for regression
return reg_inputs # Return randomly sampled wind speed, wind direction, temperature and normalized gross energy
@logged_method_call
def run_regression(self, n):
"""
Run robust linear regression between Monte-Carlo generated monthly/daily gross energy,
wind speed, temperature and wind direction (if used)
Args:
n(:obj:`int`): The Monte Carlo iteration number
Returns:
:obj:`?`: trained regression model
"""
reg_data = self.set_regression_data(n) # Get regression data
# Bootstrap input data to incorporate some regression uncertainty
reg_data = np.array(reg_data.sample(frac=1.0, replace=True))
# Update Monte Carlo tracker fields
self._mc_num_points[n] = np.shape(reg_data)[0]
# Run regression. Note, the last column of reg_data is the target variable for the regression
# Linear regression
if self.reg_model == "lin":
reg = LinearRegression().fit(np.array(reg_data[:, 0:-1]), reg_data[:, -1])
predicted_y = reg.predict(np.array(reg_data[:, 0:-1]))
self._mc_slope[n, :] = reg.coef_
self._mc_intercept[n] = np.float64(reg.intercept_)
self._r2_score[n] = r2_score(reg_data[:, -1], predicted_y)
self._mse_score[n] = mean_squared_error(reg_data[:, -1], predicted_y)
return reg
# Machine learning models
else:
ml = MachineLearningSetup(self.reg_model, **self.ml_setup_kwargs)
# Memoized approach for optimized hyperparameters
if self._run.reanalysis_product in self.opt_model:
self.opt_model[(self._run.reanalysis_product)].fit(
np.array(reg_data[:, 0:-1]), reg_data[:, -1]
)
else: # optimize hyperparameters once for each reanalysis product
ml.hyper_optimize(
np.array(reg_data[:, 0:-1]),
reg_data[:, -1],
n_iter_search=20,
report=False,
cv=KFold(n_splits=5),
)
# Store optimized hyperparameters for each reanalysis product
self.opt_model[(self._run.reanalysis_product)] = ml.opt_model
predicted_y = self.opt_model[(self._run.reanalysis_product)].predict(
np.array(reg_data[:, 0:-1])
)
self._r2_score[n] = r2_score(reg_data[:, -1], predicted_y)
self._mse_score[n] = mean_squared_error(reg_data[:, -1], predicted_y)
return self.opt_model[(self._run.reanalysis_product)]
@logged_method_call
def run_AEP_monte_carlo(self):
"""
Loop through OA process a number of times and return array of AEP results each time
Returns:
:obj:`numpy.ndarray` Array of AEP, long-term avail, long-term curtailment calculations
"""
num_sim = self.num_sim
# Initialize arrays to store metrics and results
self._mc_num_points = np.empty(num_sim, dtype=np.float64)
self._r2_score = np.empty(num_sim, dtype=np.float64)
self._mse_score = np.empty(num_sim, dtype=np.float64)
num_vars = 1
if self.reg_winddirection:
num_vars = num_vars + 2
if self.reg_temperature:
num_vars = num_vars + 1
if self.reg_model == "lin":
self._mc_intercept = np.empty(num_sim, dtype=np.float64)
self._mc_slope = np.empty([num_sim, num_vars], dtype=np.float64)
aep_GWh = np.empty(num_sim)
avail_pct = np.empty(num_sim)
curt_pct = np.empty(num_sim)
lt_por_ratio = np.empty(num_sim)
iav = np.empty(num_sim)
# Loop through number of simulations, run regression each time, store AEP results
for n in tqdm(np.arange(num_sim)):
self._run = self._inputs.loc[n]
# Run regression
fitted_model = self.run_regression(n)
# Get long-term regression inputs
reg_inputs_lt = self.sample_long_term_reanalysis()
# Get long-term normalized gross energy by applying regression result to long-term monthly wind speeds
inputs = np.array(reg_inputs_lt)
if num_vars == 1:
inputs = inputs.reshape(-1, 1)
gross_lt = fitted_model.predict(inputs)
# Get POR gross energy by applying regression result to POR regression inputs
reg_inputs_por = [self._reanalysis_por[self._run.reanalysis_product]]
if self.reg_temperature:
reg_inputs_por += [
self._reanalysis_por[self._run.reanalysis_product + "_temperature_K"]
]
if self.reg_winddirection:
reg_inputs_por += [
np.sin(np.deg2rad(self._reanalysis_por[self._run.reanalysis_product + "_wd"]))
]
reg_inputs_por += [
np.cos(np.deg2rad(self._reanalysis_por[self._run.reanalysis_product + "_wd"]))
]
gross_por = fitted_model.predict(np.array(pd.concat(reg_inputs_por, axis=1)))
# Create padans dataframe for gross_por and group by calendar date to have a single full year
gross_por = self.groupby_time_res(
pd.DataFrame(
data=gross_por, index=self._reanalysis_por[self._run.reanalysis_product].index
)
)
if self.time_resolution == "M": # Undo normalization to 30-day months
# Shift the list of number of days per month to align with the reanalysis data
last_month = self._reanalysis_aggregate.index[-1].month
gross_lt = (
gross_lt
* np.tile(
np.roll(self.num_days_lt, 12 - last_month), self._run.num_years_windiness
)
/ 30
)
gross_por = np.array(gross_por).flatten() * self.num_days_lt / 30
# Annual values of lt gross energy, needed for IAV
reg_inputs_lt["gross_lt"] = gross_lt
# Annual resample starting on the first day in reg_inputs_lt
gross_lt_annual = get_annual_values(reg_inputs_lt["gross_lt"])
# Get long-term availability and curtailment losses, using gross_lt to weight individual monthly losses
[avail_lt_losses, curt_lt_losses] = self.sample_long_term_losses(
reg_inputs_lt["gross_lt"]
)
# Assign AEP, IAV, long-term availability, and long-term curtailment to output data frame
aep_GWh[n] = gross_lt.sum() / self._run.num_years_windiness * (1 - avail_lt_losses)
iav[n] = gross_lt_annual.std() / gross_lt_annual.mean()
avail_pct[n] = avail_lt_losses
curt_pct[n] = curt_lt_losses
lt_por_ratio[n] = (gross_lt.sum() / self._run.num_years_windiness) / gross_por.sum()
# Calculate mean IAV for gross energy
iav_avg = iav.mean()
# Apply IAV to AEP from single MC iterations
iav_nsim = np.random.normal(1, iav_avg, self.num_sim)
aep_GWh = aep_GWh * iav_nsim
lt_por_ratio = lt_por_ratio * iav_nsim
# Return final output
sim_results = pd.DataFrame(
index=np.arange(num_sim),
data={
"aep_GWh": aep_GWh,
"avail_pct": avail_pct,
"curt_pct": curt_pct,
"lt_por_ratio": lt_por_ratio,
"r2": self._r2_score,
"mse": self._mse_score,
"n_points": self._mc_num_points,
"iav": iav,
},
)
return sim_results
@logged_method_call
def sample_long_term_reanalysis(self):
"""
This function returns the long-term monthly/daily wind speeds based on the Monte-Carlo generated sample of:
1. The reanalysis product
2. The number of years to use in the long-term correction
Args:
(None)
Returns:
:obj:`pandas.DataFrame`: the windiness-corrected or 'long-term' monthly/daily wind speeds
"""
# Check if valid data has already been calculated and stored. If so, just return it
if (self._run.reanalysis_product, self._run.num_years_windiness) in self.long_term_sampling:
long_term_reg_inputs = self.long_term_sampling[
(self._run.reanalysis_product, self._run.num_years_windiness)
]
return long_term_reg_inputs.copy()
# Sample long-term wind speed values
ws_df = (
self._reanalysis_aggregate[self._run.reanalysis_product].to_frame().dropna()
) # Drop NA values from monthly/daily reanalysis data series
long_term_reg_inputs = ws_df[
ws_df.index[-1]
+ ws_df.index.freq
- pd.offsets.DateOffset(years=self._run.num_years_windiness) :
] # Get last 'x' years of data from reanalysis product
# Temperature and wind direction
namescol = [self._run.reanalysis_product + "_" + var for var in self._rean_vars]
long_term_temp = self._reanalysis_aggregate[namescol].dropna()[
ws_df.index[-1]
+ ws_df.index.freq
- pd.offsets.DateOffset(years=self._run.num_years_windiness) :
]
if self.reg_temperature:
long_term_reg_inputs = pd.concat(
[
long_term_reg_inputs,
long_term_temp[self._run.reanalysis_product + "_temperature_K"],
],
axis=1,
)
if self.reg_winddirection:
wd_aggregate = np.rad2deg(
np.pi
- np.arctan2(
-long_term_temp[self._run.reanalysis_product + "_u_ms"],
long_term_temp[self._run.reanalysis_product + "_v_ms"],
)
) # Calculate wind direction
long_term_reg_inputs = pd.concat(
[
long_term_reg_inputs,
np.sin(np.deg2rad(wd_aggregate)),
np.cos(np.deg2rad(wd_aggregate)),
],
axis=1,
)
# Store result in dictionary
self.long_term_sampling[
(self._run.reanalysis_product, self._run.num_years_windiness)
] = long_term_reg_inputs
# Return result
return long_term_reg_inputs.copy()
@logged_method_call
def sample_long_term_losses(self, gross_lt):
"""
This function calculates long-term availability and curtailment losses based on the Monte Carlo sampled
historical availability and curtailment data. To estimate long-term losses, average percentage monthly losses
are weighted by monthly long-term gross energy.
Args:
gross_lt(:obj:`pandas.Series`): Time series of long-term gross energy
Returns:
:obj:`float`: long-term availability loss expressed as fraction
:obj:`float`: long-term curtailment loss expressed as fraction
"""
mc_avail = self.long_term_losses[0] * self._run.loss_fraction
mc_curt = self.long_term_losses[1] * self._run.loss_fraction
# Calculate annualized monthly average long-term gross energy
# Rename axis to time to be consistent with mc_avail and mc_curt when combining variables
gross_lt_avg = self.groupby_time_res(gross_lt.rename_axis("time"))
# Estimate long-term losses by weighting monthly losses by long-term monthly gross energy
mc_avail_lt = (gross_lt_avg * mc_avail).sum() / gross_lt_avg.sum()
mc_curt_lt = (gross_lt_avg * mc_curt).sum() / gross_lt_avg.sum()
# Return long-term availabilty and curtailment
return mc_avail_lt, mc_curt_lt