Source code for pudl.validate
"""PUDL data validation functions and test case specifications.
Note that this module is being cannibalized and translated into dbt tests.
"""
import numpy as np
import pandas as pd
from dagster import AssetCheckResult
import pudl.logging_helpers
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
class ExcessiveNullRowsError(ValueError):
"""Exception raised when rows have excessive null values."""
def __init__(self, message: str, null_rows: pd.DataFrame):
"""Initialize the ExcessiveNullRowsError with a message and DataFrame of null rows."""
super().__init__(message)
[docs]
self.null_rows = null_rows
[docs]
def no_null_rows(
df: pd.DataFrame,
cols: list[str] | str = "all",
df_name: str = "",
max_null_fraction: float = 0.9,
) -> pd.DataFrame:
"""Check for rows with excessive missing values, usually due to a merge gone wrong.
Sum up the number of NA values in each row and the columns specified by ``cols``.
If the NA values make up more than ``max_null_fraction`` of the columns overall, the
row is considered Null and the check fails.
Args:
df: Table to check for null rows.
cols: Columns to check for excessive null value. If "all" check all columns.
df_name: Name of the dataframe, to aid in debugging/logging.
max_null_fraction: The maximum fraction of NA values allowed in any row.
Returns:
The input DataFrame, for use with DataFrame.pipe().
Raises:
ExcessiveNullRowsError: If the fraction of NA values in any row is greater than
``max_null_fraction``.
"""
if cols == "all":
cols = list(df.columns)
null_rows = df[cols].isna().sum(axis="columns") / len(cols) > max_null_fraction
if null_rows.any():
raise ExcessiveNullRowsError(
message=(
f"Found {null_rows.sum()} excessively null rows in {df_name}.\n"
f"{df[null_rows]}"
),
null_rows=df[null_rows],
)
return df
[docs]
def group_mean_continuity_check(
df: pd.DataFrame,
thresholds: dict[str, float],
groupby_col: str,
n_outliers_allowed: int = 0,
) -> AssetCheckResult:
"""Check that certain variables don't vary too much on average between groups.
Groups and sorts the data by ``groupby_col``, then takes the mean across
each group. Useful for saying something like "the average water usage of
cooling systems didn't jump by 10x from 2012-2013."
Args:
df: the df with the actual data
thresholds: a mapping from column names to the ratio by which those
columns are allowed to fluctuate from one group to the next.
groupby_col: the column by which we will group the data.
n_outliers_allowed: how many data points are allowed to be above the
threshold.
"""
pct_change = (
df.loc[:, [groupby_col] + list(thresholds.keys())]
.groupby(groupby_col, sort=True)
.mean()
.pct_change()
.abs()
.dropna()
)
discontinuity = pct_change >= thresholds
metadata = {
col: {
"top5": list(pct_change[col][discontinuity[col]].nlargest(n=5)),
"threshold": thresholds[col],
}
for col in thresholds
if discontinuity[col].sum() > 0
}
if (discontinuity.sum() > n_outliers_allowed).any():
return AssetCheckResult(passed=False, metadata=metadata)
return AssetCheckResult(passed=True, metadata=metadata)
[docs]
def weighted_quantile(data: pd.Series, weights: pd.Series, quantile: float) -> float:
"""Calculate the weighted quantile of a Series or DataFrame column.
This function allows us to take two columns from a :class:`pandas.DataFrame` one of
which contains an observed value (data) like heat content per unit of fuel, and the
other of which (weights) contains a quantity like quantity of fuel delivered which
should be used to scale the importance of the observed value in an overall
distribution, and calculate the values that the scaled distribution will have at
various quantiles.
Args:
data: A series containing numeric data.
weights: Weights to use in scaling the data. Must have the same length as data.
quantile: A number between 0 and 1, representing the quantile at which we want
to find the value of the weighted data.
Returns:
The value in the weighted data corresponding to the given quantile. If there are
no values in the data, return :mod:`numpy.nan`.
"""
if (quantile < 0) or (quantile > 1):
raise ValueError("quantile must have a value between 0 and 1.")
if len(data) != len(weights):
raise ValueError("data and weights must have the same length")
df = (
pd.DataFrame({"data": data, "weights": weights})
.replace([np.inf, -np.inf], np.nan)
.dropna()
# dbt weighted quantiles detour: the following group/sum operation is necessary to
# match our weighted quantile definition in dbt, which treats repeated data values
# as an n-way tie and pools the weights. see "Migrate vs_bounds" issue on github
# for details:
# https://github.com/catalyst-cooperative/pudl/issues/4106#issuecomment-2810774598
.groupby("data")
.sum()
.reset_index()
# /end dbt weighted quantiles detour
.sort_values(by="data")
)
Sn = df.weights.cumsum() # noqa: N806
# This conditional is necessary because sometimes new columns get
# added to the EIA data, and so they won't show up in prior years.
if len(Sn) > 0:
Pn = (Sn - 0.5 * df.weights) / Sn.iloc[-1] # noqa: N806
return np.interp(quantile, Pn, df.data)
return np.nan