"""Functions and functors to compute macro-drivers."""
from __future__ import annotations
from abc import abstractmethod
from pathlib import Path
from typing import Callable, ClassVar, Mapping, Optional, Sequence, Text, Tuple, Union
from xarray import DataArray, Dataset
__all__ = [
"factory",
"Exponential",
"ExponentialAdj",
"Logistic",
"Loglog",
"LogisticSigmoid",
"Linear",
"endogenous_demand",
"register_regression",
]
REGRESSION_FUNCTOR_CREATOR = {}
"""Dictionary of factory functions for creating regression functors."""
REGRESSION_FUNCTOR_NAMES = {}
"""Dictionary of alternative names for a given functor."""
class Regression(Callable):
"""Regression functors predicting demand from macro quantities.
All regression functors are derived from this object.
Examples:
Creating a regression function can be done via it's constructor, or
through a input csv file. This file is a
>>> from muse.regressions import Exponential
>>> from muse.defaults import DATA_DIRECTORY
>>> path_to_regression_params = DATA_DIRECTORY / "regressionparamaters.csv"
>>> if path_to_regression_params.exists():
... expo = Exponential.factory(path_to_regression_params)
The regression function itself takes either two `xarray.DataArray` or a
`xarray.Dataset` as input. In any case, it is given the gpd and
population. These can be read from standard MUSE csv files:
>>> from muse.readers import read_macro_drivers
>>> from muse.defaults import DATA_DIRECTORY
>>> path_to_macrodrivers = DATA_DIRECTORY / "Macrodrivers.csv"
>>> if path_to_macrodrivers.exists():
... macrodrivers = read_macro_drivers(path_to_macrodrivers)
... demand = expo(macrodrivers, year=2010, forecast=5)
"""
__mappings__: ClassVar = {}
""" Maps from input names to coefficient names
Maps the coefficients names in the class to their names in the input data
tables. This class attribute must be overridden.
"""
__regression__ = ""
""" Name of the regression function.
This class attribute must be overridden.
"""
def __init__(self, interpolation: Text = "linear", base_year: int = 2010, **kwargs):
super().__init__()
self.interpolation = interpolation
"""Interpolation method when interpolating years"""
self.base_year = base_year
"""Reference year for the start of the simulation."""
attrs = {k: v for k, v in kwargs.items() if k in self.__mappings__}
filters = {k: v for k, v in kwargs.items() if k not in self.__mappings__}
self.coeffs = Dataset(attrs).sel(filters)
"""Coefficients of the regression function."""
@abstractmethod
def __call__(
self,
gdp_or_dataset: Union[DataArray, Dataset],
population: Optional[DataArray],
year: Optional[Union[int, Sequence[int]]] = None,
forecast: int = 5,
**kwargs,
) -> DataArray:
pass
def sel(self, **filters) -> "Regression":
"""Regression over part of the data only."""
return self.__class__(
interpolation=self.interpolation,
base_year=self.base_year,
**(self.coeffs.sel(filters).data_vars),
)
@staticmethod
def _to_dataset(
first: Union[DataArray, Dataset], population: Optional[DataArray]
) -> Dataset:
data = first if isinstance(first, Dataset) else Dataset({"gdp": first})
if population is not None:
data["population"] = population
return data
@staticmethod
def _split_kwargs(data: Dataset, **kwargs) -> Tuple[Mapping, Mapping]:
filters = {k: v for k, v in kwargs.items() if k in data.dims}
attrs = {k: v for k, v in kwargs.items() if k not in data.dims}
return filters, attrs
@classmethod
def factory(
cls,
regression_data: Union[Text, Path, Dataset],
interpolation: Text = "linear",
base_year: int = 2010,
**filters,
) -> Regression:
"""Creates a regression function from standard muse input."""
from muse.readers import read_regression_parameters
assert cls.__mappings__
assert cls.__regression__ != ""
if isinstance(regression_data, (Text, Path)):
regression_data = read_regression_parameters(regression_data)
# Get the parameters of interest with a 'simple' name
coeffs = Dataset({k: regression_data[v] for k, v in cls.__mappings__.items()})
filters.update(coeffs.data_vars)
return cls(interpolation=interpolation, base_year=base_year, **filters)
[docs]
def factory(
regression_parameters: Union[Text, Path, Dataset],
sector: Optional[Union[Text, Sequence[Text]]] = None,
) -> Regression:
"""Creates regression functor from standard MUSE data for given sector."""
from muse.readers import read_regression_parameters
if isinstance(regression_parameters, (Text, Path)):
regression_parameters = read_regression_parameters(regression_parameters)
if sector is not None:
regression_parameters = regression_parameters.sel(sector=sector)
if regression_parameters.function_type.size > 1:
functions = [
REGRESSION_FUNCTOR_CREATOR[value](group)
for value, group in regression_parameters.groupby("function_type")
]
def regressions(*args, **kwargs):
from xarray import align
result = functions[0](*args, **kwargs)
for function in functions[1:]:
left, right = align(result, function(*args, **kwargs), join="outer")
result = left.fillna(0) + right.fillna(0)
return result
return regressions
if regression_parameters.function_type.dims == ():
functype = str(regression_parameters.function_type.values)
else:
functype = str(regression_parameters.function_type[0].values)
regfactory = REGRESSION_FUNCTOR_CREATOR[functype]
return regfactory(regression_parameters)
def _snake_case(name: str) -> str:
from re import sub
s1 = sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
def _kebab_case(name: str) -> str:
from re import sub
s1 = sub("(.)([A-Z][a-z]+)", r"\1-\2", name)
return sub("([a-z0-9])([A-Z])", r"\1-\2", s1).lower()
[docs]
def register_regression(
Functor: Regression = None, name: Optional[Text] = None
) -> Regression:
"""Registers a functor with MUSE regressions.
Regression functors are registered with MUSE so that the functors can be
called easily on created.
functor name that the functor is registered with defaults to the snake_case
version of the functor name. However, it can also be specified explicitly
as a *keyword* argument. In any case, it must be unique amongst all
registered regression functor.
"""
from logging import getLogger
from pathlib import Path
from muse.registration import name_variations
# allows specifyng the registered name as a keyword argument
if Functor is None:
return lambda x: register_regression(x, name=name)
logger = getLogger(__name__)
def factory(file_or_dataset, *args, **kwargs):
if isinstance(file_or_dataset, (Path, Text)):
msg = "Creating regression functor {} from data in {}".format(
_kebab_case(name if name is not None else Functor.__name__),
file_or_dataset,
)
else:
msg = "Creating regression functor {} from dataset".format(
_kebab_case(name if name is not None else Functor.__name__)
)
logger.info(msg)
function = getattr(Functor, "factory", Functor)
return function(file_or_dataset, *args, **kwargs)
names = {_snake_case(a) for a in {Functor.__name__, name} if a is not None}
REGRESSION_FUNCTOR_NAMES[Functor.__name__.lower()] = []
for n in name_variations(*names):
if n in REGRESSION_FUNCTOR_CREATOR:
msg = "A regression with the name %s already exists" % n
raise RuntimeError(msg)
REGRESSION_FUNCTOR_CREATOR[n] = factory
REGRESSION_FUNCTOR_NAMES[Functor.__name__.lower()].append(n)
return Functor
def regression_functor(
mappings: Mapping[Text, Text], name: Optional[Text] = None
) -> Regression:
"""Creates a macro-driver functor from a function.
The functions are transformed into classes inheriting from Regression.
Arguments:
mappings: a dictionary mapping from the functions expected coefficients
(e.g. a, b, c) to the name in the input csv data tables (.e.g.
constant, GDPexp, GDPscale).
name: name by which the function is referred to in the input data table.
"""
from logging import getLogger
def decorator(func):
from functools import wraps
if func.__name__[0] != func.__name__[0].upper():
raise Exception(
"The function will be turned into a class. "
"It's name should be capitalized."
)
name_ = (func.__name__ if name is None else name).lower()
classname = func.__name__
logger = getLogger(__name__)
log = "Calling {} regression function".format(_kebab_case(classname))
# the main function will transform the input so 'func' can deal with it
@wraps(func)
def __call__(
self,
gdp_or_dataset: Union[DataArray, Dataset],
population: Optional[DataArray] = None,
year: Optional[Union[int, Sequence[int]]] = None,
forecast: Optional[Union[int, Sequence[int]]] = None,
**kwargs,
):
from numpy import ndarray
logger.debug(log)
data = self._to_dataset(gdp_or_dataset, population)
filters, attrs = self._split_kwargs(data, **kwargs)
years = year
if forecast is not None and year is None and "year" in data.dims:
years = data.year
if isinstance(forecast, (Sequence, ndarray)):
forecast = DataArray(
forecast, coords={"forecast": forecast}, dims="forecast"
)
if forecast is not None and years is not None:
years = years
data = data.sel(filters)
if years is not None:
data = data.interp(
year=years,
method=self.interpolation,
kwargs={"fill_value": "extrapolate"},
)
attrs.update(**data.data_vars)
# for the duration of the call, change coeffs to allow for
# selections
old_coeffs = self.coeffs
try:
filters = {k: v for k, v in kwargs.items() if k in self.coeffs.dims}
self.coeffs = self.coeffs.sel(filters)
return func(self, year=year, forecast=forecast, **attrs)
finally:
self.coeffs = old_coeffs
msg = """
This function accepts extra keyword arguments to filter over the
dimensions of the input data-array.
Furthermore, the gdp and population can be passed as a single argument
if the first argument (not including self) is a dataset, then it is
expected it should hold both the gdp and population. If population is
also given, it will override population in the dataset argument.
"""
if __call__.__doc__ is None:
__call__.__doc__ = "\n\n" + msg
else:
__call__.__doc__ += "Regression function: {}\n\n{}".format(name_, msg)
doc = """Regression function: {name}
This functor is a regression function registered with MUSE as '{name}'.
""".format(name=name_)
Self = type(
classname,
(Regression,),
{
"__regression__": name_,
"__mappings__": mappings,
"__call__": __call__,
"__module__": func.__module__,
"__doc__": doc,
},
)
return Self
return decorator
[docs]
@register_regression
@regression_functor({"a": "constant", "b": "GDPexp"})
def Exponential(
self, gdp: DataArray, population: DataArray, *args, year: int = 0, **kwargs
) -> DataArray:
from numpy import exp
factor = 1e6 * self.coeffs.a * population
return factor * exp(self.coeffs.b * population / gdp)
[docs]
@register_regression
@regression_functor({"a": "constant", "b": "GDPexp", "w": "timeEff"})
def ExponentialAdj(
self,
gdp: DataArray,
population: DataArray,
*args,
year: Optional[Union[int, Sequence[int]]] = None,
forecast: int = 5,
n: int = 6,
**kwargs,
) -> DataArray:
from numpy import exp, power
if year is None:
year = self.base_year
factor = 1e6 * self.coeffs.a * population
unadjusted = factor * exp(self.coeffs.b * population / gdp)
p = power(year + forecast - self.base_year, n)
return unadjusted * (1 + self.coeffs.w * p) / (1 + p)
[docs]
@register_regression
@regression_functor({"a": "constant", "b": "GDPscale", "c": "GDPexp", "w": "timeEff"})
def Logistic(
self, gdp: DataArray, population: DataArray, forecast: int = 5, n: int = 4, **kwargs
) -> DataArray:
"""(1 + t * f^n) / (1 + f^n) * a * pop / (1 + b * e^(gpd * c / pop)).
With f the number of forecast years.
"""
from numpy import exp, power
a, b, c, w = self.coeffs.a, self.coeffs.b, self.coeffs.c, self.coeffs.w
p = power(forecast, n)
factor = 1e6 * a * population * (1 + w * p) / (1 + p)
return factor / (1 + b * exp(gdp * c / population))
[docs]
@register_regression(name="log-log")
@regression_functor({"a": "constant", "b": "GDPexp"})
def Loglog(self, gdp: DataArray, population: DataArray, *args, **kwargs) -> DataArray:
"""1e6 * e^a * population * (gpd/population)^b."""
from numpy import exp, power
factor = 1e6 * exp(self.coeffs.a) * population
return factor * power(gdp / population, self.coeffs.b)
[docs]
@register_regression
@regression_functor(
{"a": "constant", "b0": "GDPscaleLess", "b1": "GDPscaleGreater", "c": "GDPexp"}
)
def LogisticSigmoid(
self,
gdp: DataArray,
population: DataArray,
*args,
year: Optional[Union[int, Sequence[int]]] = None,
**kwargs,
) -> DataArray:
"""0.001 * (constant * pop + gdp * c / sqrt(1 + (gdp * scale / pop)^2)."""
from numpy import power
constant = self.coeffs.a
c = self.coeffs.c
if year is None:
year = self.base_year
if isinstance(year, int):
scale = self.coeffs.b0 if year < 2015 else self.coeffs.b1
elif year is not None and "year" in gdp.dims:
# fmt: disable
years = (
year
if isinstance(year, DataArray)
else DataArray(year, coords={"year": year}, dims="year")
)
# fmt: enable
scale = self.coeffs.b0.where(years < 2015, self.coeffs.b1)
else:
scale = 1
p = power(1 + power(gdp * scale / population, 2), 0.5)
return 0.001 * (constant * population + gdp * c / p)
[docs]
@register_regression
class Linear(Regression):
"""a * population + b * (gdp - gdp[2010]/population[2010] * population)."""
__mappings__: ClassVar[dict[str, str]] = {
"a": "constant",
"b0": "GDPscaleLess",
"b1": "GDPscaleGreater",
}
__regression__ = "linear"
__scaleyear__ = 2015
def __call__(
self,
gdp_or_dataset: Union[DataArray, Dataset],
population: Optional[DataArray] = None,
year: Optional[Union[int, Sequence[int]]] = None,
forecast: int = 5,
**kwargs,
) -> DataArray:
from logging import getLogger
getLogger(__name__).debug("Calling linear regression function")
data = self._to_dataset(gdp_or_dataset, population)
filters = self._split_kwargs(data, **kwargs)[0]
coeffs = self.coeffs.sel(**filters)
data = data.sel(**filters)
if isinstance(year, int):
condition = year + forecast < self.__scaleyear__
scale = coeffs.b0 if condition else coeffs.b1
elif year is not None and "year" in data.dims:
years = (
year
if isinstance(year, DataArray)
else DataArray(year, coords={"year": year}, dims="year")
)
sel = years + forecast < self.__scaleyear__
scale = coeffs.b0.where(sel, coeffs.b1)
else:
scale = coeffs.b0
data_baseyear = data.sel(year=self.base_year)
gdpcap_offset = data_baseyear.gdp / data_baseyear.population
if year is not None and "year" in data.dims:
data = data.interp(year=year, method=self.interpolation)
return coeffs.a * data.population + scale * (
data.gdp - gdpcap_offset * data.population
)
[docs]
def endogenous_demand(
regression_parameters: Union[Text, Path, Dataset],
drivers: Union[Text, Path, Dataset],
sector: Optional[Union[Text, Sequence]] = None,
**kwargs,
) -> Dataset:
"""Endogenous demand based on macro drivers and regression parameters."""
from muse.readers import read_macro_drivers
regression = factory(regression_parameters, sector=sector)
if isinstance(drivers, (Text, Path)):
drivers = read_macro_drivers(drivers)
return regression(drivers, **kwargs)