Source code for muse.investments

"""Investment decision.

An investment determines which technologies to invest given a metric to
determine preferred technologies, a corresponding search space of technologies,
and the demand to fulfill.

Investments should be registered via the decorator `register_investment`. The
registration makes it possible to call investments dynamically through
`compute_investment`, by specifying the name of the investment. It is part of
MUSE's plugin platform.

Investments are not expected to modify any of their arguments. They should all
have the following signature:

.. code-block:: python

    @register_investment
    def investment(
        costs: xr.DataArray,
        search_space: xr.DataArray,
        technologies: xr.Dataset,
        constraints: List[Constraint],
        year: int,
        **kwargs
    ) -> xr.DataArray:
        pass

Arguments:
    costs: specifies for each `asset` which `replacement` technology should be invested
        in preferentially. This should be an integer or floating point array with
        dimensions `asset` and `replacement`.
    search_space: an `asset` by `replacement` matrix defining allowed and disallowed
        replacement technologies for each asset
    technologies: a dataset containing all constant data characterizing the
        technologies.
    constraints: a list of constraints as defined in :py:mod:`~muse.constraints`.
    year: the current year.

Returns:
    A data array with dimensions `asset` and `technology` specifying the amount
    of newly invested capacity.
"""

__all__ = [
    "adhoc_match_demand",
    "cliff_retirement_profile",
    "register_investment",
    "INVESTMENT_SIGNATURE",
]
from typing import (
    Any,
    Callable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Text,
    Union,
    cast,
)

import numpy as np
import xarray as xr
from mypy_extensions import KwArg

from muse.constraints import Constraint
from muse.errors import GrowthOfCapacityTooConstrained
from muse.outputs.cache import cache_quantity
from muse.registration import registrator

INVESTMENT_SIGNATURE = Callable[
    [xr.DataArray, xr.DataArray, xr.Dataset, List[Constraint], KwArg(Any)],
    Union[xr.DataArray, xr.Dataset],
]
"""Investment signature. """

INVESTMENTS: MutableMapping[Text, INVESTMENT_SIGNATURE] = {}
"""Dictionary of investment functions."""


[docs] @registrator(registry=INVESTMENTS, loglevel="info") def register_investment(function: INVESTMENT_SIGNATURE) -> INVESTMENT_SIGNATURE: """Decorator to register a function as an investment. The output of the function can be a DataArray, with the invested capacity, or a Dataset. In this case, it must contain a DataArray named "capacity" and, optionally, a DataArray named "production". Only the invested capacity DataArray is returned to the calling function. """ from functools import wraps @wraps(function) def decorated( costs: xr.DataArray, search_space: xr.DataArray, technologies: xr.Dataset, constraints: List[Constraint], **kwargs, ) -> xr.DataArray: result = function(costs, search_space, technologies, constraints, **kwargs) if isinstance(result, xr.Dataset): investment = result["capacity"].rename("investment") if "production" in result: cache_quantity(production=result["production"]) else: investment = result.rename("investment") cache_quantity(capacity=investment) return investment return decorated
def factory(settings: Optional[Union[Text, Mapping]] = None) -> Callable: from typing import Dict if settings is None: name = "match_demand" params: Dict = {} elif isinstance(settings, Text): name = settings params = {} else: name = settings["name"] params = {k: v for k, v in settings.items() if k != "name"} top = params.get("timeslice_op", "max") if isinstance(top, Text): if top.lower() == "max": def timeslice_op(x: xr.DataArray) -> xr.DataArray: from muse.timeslices import convert_timeslice return (x / convert_timeslice(xr.DataArray(1), x)).max("timeslice") elif top.lower() == "sum": def timeslice_op(x: xr.DataArray) -> xr.DataArray: return x.sum("timeslice") else: raise ValueError(f"Unknown timeslice transform {top}") params["timeslice_op"] = timeslice_op investment = INVESTMENTS[name] def compute_investment( search: xr.Dataset, technologies: xr.Dataset, constraints: List[Constraint], **kwargs, ) -> xr.DataArray: """Computes investment needed to fulfill demand. The return is a data array with two dimensions: (asset, replacement). """ from numpy import zeros if any(u == 0 for u in search.decision.shape): return xr.DataArray( zeros((len(search.asset), len(search.replacement))), coords={"asset": search.asset, "replacement": search.replacement}, dims=("asset", "replacement"), ) return investment( search.decision, search.search_space, technologies, constraints, **params, **kwargs, ).rename("investment") return compute_investment
[docs] def cliff_retirement_profile( technical_life: xr.DataArray, current_year: int = 0, protected: int = 0, interpolation: Text = "linear", **kwargs, ) -> xr.DataArray: """Cliff-like retirement profile from current year. Computes the retirement profile of all technologies in ``technical_life``. Assets with a technical life smaller than the input time-period should automatically be renewed. Hence, if ``technical_life <= protected``, then effectively, the technical life is rewritten as ``technical_life * n`` with ``n = int(protected // technical_life) + 1``. We could just return an array where each year is represented. Instead, to save memory, we return a compact view of the same where years where no change happens are removed. Arguments: technical_life: lifetimes for each technology current_year: current year protected: The technologies are assumed to be renewed between years `current_year` and `current_year + protected` interpolation: Interpolation type **kwargs: arguments by which to filter technical_life, if any. Returns: A boolean DataArray where each each element along the year dimension is true if the technology is still not retired for the given year. """ from muse.utilities import avoid_repetitions if kwargs: technical_life = technical_life.sel(**kwargs) if "year" in technical_life.dims: technical_life = technical_life.interp(year=current_year, method=interpolation) technical_life = (1 + protected // technical_life) * technical_life # type:ignore if len(technical_life) > 0: max_year = int(current_year + technical_life.max()) else: max_year = int(current_year + protected) allyears = xr.DataArray( range(current_year, max_year + 1), dims="year", coords={"year": range(current_year, max_year + 1)}, ) profile = allyears < (current_year + technical_life) # type: ignore # now we minimize the number of years needed to represent the profile fully # this is done by removing the central year of any three repeating year, ensuring # the removed year can be recovered by a linear interpolation. goodyears = avoid_repetitions(profile.astype(int)) return profile.sel(year=goodyears).astype(bool)
class LinearProblemError(RuntimeError): """Error returned for infeasible LP problems.""" def __init__(self, *args): super().__init__(*args) @register_investment(name=["adhoc"]) def adhoc_match_demand( costs: xr.DataArray, search_space: xr.DataArray, technologies: xr.Dataset, constraints: List[Constraint], year: int, timeslice_op: Optional[Callable[[xr.DataArray], xr.DataArray]] = None, ) -> xr.DataArray: from muse.demand_matching import demand_matching from muse.quantities import capacity_in_use, maximum_production from muse.timeslices import QuantityType, convert_timeslice demand = next((c for c in constraints if c.name == "demand")).b max_capacity = next( (c for c in constraints if c.name == "max capacity expansion") ).b max_prod = maximum_production( technologies, max_capacity, year=year, technology=costs.replacement, commodity=demand.commodity, ).drop_vars("technology") if "timeslice" in demand.dims and "timeslice" not in max_prod.dims: max_prod = convert_timeslice(max_prod, demand, QuantityType.EXTENSIVE) # Push disabled techs to last rank. # Any production assigned to them by the demand-matching algorithm will be removed. if "timeslice" in costs.dims and timeslice_op is not None: costs = costs.mean("timeslice").mean("asset") # timeslice_op(costs) minobj = costs.min() maxobj = costs.where(search_space, minobj).max("replacement") + 1 decision = costs.where(search_space, maxobj) production = demand_matching( demand.sel(asset=demand.asset.isin(search_space.asset)), decision, max_prod, ).where(search_space, 0) capacity = capacity_in_use( production, technologies, year=year, technology=production.replacement ).drop_vars("technology") if "timeslice" in capacity.dims and timeslice_op is not None: capacity = timeslice_op(capacity) result = xr.Dataset({"capacity": capacity, "production": production}) return result @register_investment(name=["scipy", "match_demand"]) def scipy_match_demand( costs: xr.DataArray, search_space: xr.DataArray, technologies: xr.Dataset, constraints: List[Constraint], year: Optional[int] = None, timeslice_op: Optional[Callable[[xr.DataArray], xr.DataArray]] = None, **options, ) -> xr.DataArray: from logging import getLogger from scipy.optimize import linprog from muse.constraints import ScipyAdapter df_technologies = technologies.to_dataframe().reset_index() if "timeslice" in costs.dims and timeslice_op is not None: costs = timeslice_op(costs) if "year" in technologies.dims and year is None: raise ValueError("Missing year argument") elif "year" in technologies.dims: techs = technologies.sel(year=year).drop_vars("year") else: techs = technologies timeslice = next((cs.timeslice for cs in constraints if "timeslice" in cs.dims)) adapter = ScipyAdapter.factory( techs, cast(np.ndarray, costs), timeslice, *constraints ) res = linprog(**adapter.kwargs, method="highs") if not res.success and (res.status != 0): res = linprog( **adapter.kwargs, method="highs-ipm", options={ "disp": True, "presolve": False, "dual_feasibility_tolerance": 1e-2, "primal_feasibility_tolerance": 1e-2, "ipm_optimality_tolerance": 1e-2, }, ) if not res.success: msg = ( res.message + "\n" + f"Error in sector containing {df_technologies.technology.unique()}" ) getLogger(__name__).critical(msg) raise GrowthOfCapacityTooConstrained return cast(Callable[[np.ndarray], xr.Dataset], adapter.to_muse)(res.x) @register_investment(name=["cvxopt"]) def cvxopt_match_demand( costs: xr.DataArray, search_space: xr.DataArray, technologies: xr.Dataset, constraints: List[Constraint], year: Optional[int] = None, timeslice_op: Optional[Callable[[xr.DataArray], xr.DataArray]] = None, **options, ) -> xr.DataArray: from importlib import import_module from logging import getLogger from muse.constraints import ScipyAdapter if "year" in technologies.dims and year is None: raise ValueError("Missing year argument") elif "year" in technologies.dims: techs = technologies.interp(year=year).drop_vars("year") else: techs = technologies def default_to_scipy(): return scipy_match_demand( costs, search_space, techs, constraints, timeslice_op=timeslice_op ) try: cvxopt = import_module("cvxopt") except ModuleNotFoundError: msg = ( "cvxopt is not installed\n" "It can be installed with `pip install cvxopt`\n" "Using the scipy linear solver instead." ) getLogger(__name__).critical(msg) return default_to_scipy() if "timeslice" in costs.dims and timeslice_op is not None: costs = timeslice_op(costs) timeslice = next((cs.timeslice for cs in constraints if "timeslice" in cs.dims)) adapter = ScipyAdapter.factory( techs, -cast(np.ndarray, costs), timeslice, *constraints ) G = np.zeros((0, adapter.c.size)) if adapter.A_ub is None else adapter.A_ub h = np.zeros((0,)) if adapter.b_ub is None else adapter.b_ub if adapter.bounds[0] != -np.inf and adapter.bounds[0] is not None: G = np.concatenate((G, -np.eye(adapter.c.size))) h = np.concatenate((h, np.full(adapter.c.size, adapter.bounds[0]))) if adapter.bounds[1] != np.inf and adapter.bounds[1] is not None: G = np.concatenate((G, np.eye(adapter.c.size))) h = np.concatenate((h, np.full(adapter.c.size, adapter.bounds[1]))) args = [adapter.c, G, h] if adapter.A_eq is not None: args += [adapter.A_eq, adapter.b_eq] res = cvxopt.solvers.lp(*map(cvxopt.matrix, args), **options) # type: ignore if res["status"] != "optimal": getLogger(__name__).info(res["status"]) if res["x"] is None: getLogger(__name__).critical("infeasible system") raise LinearProblemError("Infeasible system", res) solution = cast(Callable[[np.ndarray], xr.Dataset], adapter.to_muse)(list(res["x"])) return solution