Source code for muse.constraints

r"""Investment constraints.

Constraints on investements ensure that investements match some given criteria. For
instance, the constraints could ensure that only so much of a new asset can be built
every year.

Functions to compute constraints should be registered via the decorator
:py:meth:`~muse.constraints.register_constraints`. This registration step makes it
possible for constraints to be declared in the TOML file.

Generally, LP solvers accept linear constraint defined as:

.. math::

    A x \\leq b

with :math:`A` a matrix, :math:`x` the decision variables, and :math:`b` a vector.
However, these quantities are dimensionless. They do no have timeslices, assets, or
replacement technologies, or any other dimensions that users have set-up in their model.
The crux is to translates from MUSE's data-structures to a consistent dimensionless
format.

In MUSE, users can register constraints functions that return fully dimensional
quantities. The matrix operator is split over the capacity decision variables and the
production decision variables:

.. math::

    A_c .* x_c + A_p .* x_p \\leq b

The operator :math:`.*` means the standard elementwise multiplication of xarray,
including automatic broadcasting (adding missing dimensions by repeating the smaller
matrix along the missing dimension).  Constraint functions return the three quantities
:math:`A_c`, :math:`A_p`, and :math:`b`. These three quantities will often not have the
same dimension. E.g. one might include timeslices where another might not. The
transformation from :math:`A_c`, :math:`A_p`, :math:`b` to :math:`A` and :math:`b`
happens as described below.

- :math:`b` remains the same. It defines the rows of :math:`A`.
- :math:`x_c` and :math:`x_p` are concatenated one on top of the other and define the
  columns of :math:`A`.
- :math:`A` is split into a left submatrix for capacities and a right submatrix for
  production, following the concatenation of :math:`x_c` and :math:`x_p`
- Any dimension in :math:`A_c .* x_c` (:math:`A_p .* x_p`) that is also in :math:`b`
  defines diagonal entries into the left (right) submatrix of :math:`A`.
- Any dimension in :math:`A_c .* x_c` (:math:`A_p .* x_b`) and missing from
  :math:`b` is reduce by summation over a row in the left (right) submatrix of
  :math:`A`. In other words, those dimension do become part of a standard tensor
  reduction or matrix multiplication.

There are two additional rules. However, they are likely to be the result of an
inefficient definition of :math:`A_c`, :math:`A_p` and :math:`b`.

- Any dimension in :math:`A_c` (:math:`A_b`) that is neither in :math:`b` nor in
  :math:`x_c` (:math:`x_p`) is reduced by summation before consideration for the
  elementwise multiplication. For instance, if :math:`d` is such a dimension, present
  only in :math:`A_c`, then the problem becomes :math:`(\\sum_d A_c) .* x_c + A_p .* x_p
  \\leq b`.
- Any dimension missing from :math:`A_c .* x_c` (:math:`A_p .* x_p`) and present in
  :math:`b` is added by repeating the resulting row in :math:`A`.

Constraints are registered using the decorator
:py:meth:`~muse.constraints.register_constraints`. The decorated functions must follow
the following signature:

.. code-block:: python

    @register_constraints
    def constraints(
        demand: xr.DataArray,
        assets: xr.Dataset,
        search_space: xr.DataArray,
        market: xr.Dataset,
        technologies: xr.Dataset,
        year: Optional[int] = None,
        **kwargs,
    ) -> Constraint:
        pass

demand:
    The demand for the sectors products. In practice it is a demand share obtained in
    :py:mod:`~muse.demand_share`. It is a data-array with dimensions including `asset`,
    `commodity`, `timeslice`.
assets:
    The capacity of the assets owned by the agent.
search_space:
    A matrix `asset` vs `replacement` technology defining which replacement technologies
    will be considered for each existing asset.
market:
    The market as obtained from the MCA.
technologies:
    Technodata characterizing the competing technologies.
year:
    current year.
``**kwargs``:
    Any other parameter.
"""

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum, auto
from typing import (
    Any,
    Callable,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Sequence,
    Text,
    Tuple,
    Union,
    cast,
)

import numpy as np
import pandas as pd
import xarray as xr
from mypy_extensions import KwArg

from muse.registration import registrator

CAPACITY_DIMS = "asset", "replacement", "region"
"""Default dimensions for capacity decision variables."""
PRODUCT_DIMS = "commodity", "timeslice", "region"
"""Default dimensions for product decision variables."""


class ConstraintKind(Enum):
    EQUALITY = auto()
    UPPER_BOUND = auto()
    LOWER_BOUND = auto()


Constraint = xr.Dataset
"""An investment constraint :math:`A * x ~ b`

Where :math:`~` is one of :math:`=,\\leq,\\geq`.

A constraint should contain a data-array `b` corresponding to right-hand-side vector
of the constraint. It should also contain a data-array `capacity` corresponding to the
left-hand-side matrix operator which will be applied to the capacity-related decision
variables.  It should contain a similar matrix `production` corresponding to
the left-hand-side matrix operator which will be applied to the production-related
decision variables. Should any of these three objects be missing, they default to the
scalar 0. Finally, the constraint should contain an attribute `kind` of type
:py:class:`ConstraintKind` defining the operation. If it is missing, it defaults to an
upper bound constraint.
"""


CONSTRAINT_SIGNATURE = Callable[
    [xr.DataArray, xr.Dataset, xr.DataArray, xr.Dataset, xr.Dataset, KwArg(Any)],
    Optional[Constraint],
]
"""Basic signature for functions producing constraints.

.. note::

    A constraint can return `None`, in which case it is ignored. This makes it simple to
    add constraints that are only used if some condition is met, e.g. minimum service
    conditions are defined in the technodata.
"""
CONSTRAINTS: MutableMapping[Text, CONSTRAINT_SIGNATURE] = {}
"""Registry of constraint functions."""


[docs] @registrator(registry=CONSTRAINTS) def register_constraints(function: CONSTRAINT_SIGNATURE) -> CONSTRAINT_SIGNATURE: """Registers a constraint with MUSE. See :py:mod:`muse.constraints`. """ from functools import wraps @wraps(function) def decorated( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, **kwargs, ) -> Optional[Constraint]: """Computes and standardizes a constraint.""" constraint = function( # type: ignore demand, assets, search_space, market, technologies, **kwargs ) if constraint is not None: if "kind" not in constraint.attrs: constraint.attrs["kind"] = ConstraintKind.UPPER_BOUND if ( "capacity" not in constraint.data_vars and "production" not in constraint.data_vars ): raise RuntimeError("Invalid constraint format") if "capacity" not in constraint.data_vars: constraint["capacity"] = 0 if "production" not in constraint.data_vars: constraint["production"] = 0 if "b" not in constraint.data_vars: constraint["b"] = 0 if "name" not in constraint.data_vars and "name" not in constraint.attrs: constraint.attrs["name"] = function.__name__ # ensure that the constraint and the search space match dims = [d for d in constraint.dims if d in search_space.dims] constraint = constraint.sel({k: search_space[k] for k in dims}) return constraint return decorated
[docs] def factory( settings: Optional[ Union[Text, Mapping, Sequence[Text], Sequence[Union[Text, Mapping]]] ] = None, ) -> Callable: """Creates a list of constraints from standard settings. The standard settings can be a string naming the constraint, a dictionary including at least "name", or a list of strings and dictionaries. """ from functools import partial if settings is None: settings = ( "max_production", "max_capacity_expansion", "demand", "search_space", "minimum_service", ) def normalize(x) -> MutableMapping: return dict(name=x) if isinstance(x, Text) else x if isinstance(settings, (Text, Mapping)): settings = cast(Union[Sequence[Text], Sequence[Mapping]], [settings]) parameters = [normalize(x) for x in settings] names = [x.pop("name") for x in parameters] constraint_closures = [ partial(CONSTRAINTS[name], **param) for name, param in zip(names, parameters) ] def constraints( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, ) -> List[Constraint]: if year is None: year = int(market.year.min()) constraints = [ function(demand, assets, search_space, market, technologies, year=year) for function in constraint_closures ] return [constraint for constraint in constraints if constraint is not None] return constraints
[docs] @register_constraints def max_capacity_expansion( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, forecast: Optional[int] = None, interpolation: Text = "linear", ) -> Constraint: r"""Max-capacity addition, max-capacity growth, and capacity limits constraints. Limits by how much the capacity of each technology owned by an agent can grow in a given year. This is a constraint on the agent's ability to invest in a technology. Let :math:`L_t^r(y)` be the total capacity limit for a given year, technology, and region. :math:`G_t^r(y)` is the maximum growth. And :math:`W_t^r(y)` is the maximum additional capacity. :math:`y=y_0` is the current year and :math:`y=y_1` is the year marking the end of the investment period. Let :math:`\mathcal{A}^{i, r}_{t, \iota}(y)` be the current assets, before invesment, and let :math:`\Delta\mathcal{A}^{i,r}_t` be the future investements. The the constraint on agent :math:`i` are given as: .. math:: L_t^r(y_0) - \sum_\iota \mathcal{A}^{i, r}_{t, \iota}(y_1) \geq \Delta\mathcal{A}^{i,r}_t (y_1 - y_0 + 1) G_t^r(y_0) \sum_\iota \mathcal{A}^{i, r}_{t, \iota}(y_0) - \sum_\iota \mathcal{A}^{i, r}_{t, \iota}(y_1) \geq \Delta\mathcal{A}^{i,r}_t (y_1 - y_0)W_t^r(y_0) \geq \Delta\mathcal{A}^{i,r}_t The three constraints are combined into a single one which is returned as the maximum capacity expansion, :math:`\Gamma_t^{r, i}`. The maximum capacity expansion cannot impose negative investments: Maximum capacity addition: .. math:: \Gamma_t^{r, i} \geq 0 """ from muse.utilities import filter_input, reduce_assets if year is None: year = int(market.year.min()) if forecast is None and len(getattr(market, "year", [])) <= 1: forecast = 5 elif forecast is None: forecast = next((int(u) for u in sorted(market.year - year) if u > 0)) forecast_year = year + forecast capacity = ( reduce_assets( assets.capacity, coords={"technology", "region"}.intersection(assets.capacity.coords), ) .interp(year=[year, forecast_year], method=interpolation) .ffill("year") ) # case with technology and region in asset dimension if capacity.region.dims != (): names = [u for u in capacity.asset.coords if capacity[u].dims == ("asset",)] index = pd.MultiIndex.from_arrays( [capacity[u].values for u in names], names=names ) capacity = capacity.drop_vars(names) capacity["asset"] = index capacity = capacity.unstack("asset", fill_value=0).rename( technology=search_space.replacement.name ) # case with only technology in asset dimension else: capacity = cast(xr.DataArray, capacity.set_index(asset="technology")).rename( asset=search_space.replacement.name ) capacity = capacity.reindex_like(search_space.replacement, fill_value=0) replacement = search_space.replacement replacement = replacement.drop_vars( [u for u in replacement.coords if u not in replacement.dims] ) techs = filter_input( technologies[ ["max_capacity_addition", "max_capacity_growth", "total_capacity_limit"] ], technology=replacement, year=year, ).drop_vars("technology") regions = getattr(capacity, "region", None) if regions is not None and "region" in technologies.dims: techs = techs.sel(region=regions) add_cap = techs.max_capacity_addition * forecast limit = techs.total_capacity_limit forecasted = capacity.sel(year=forecast_year, drop=True) total_cap = (limit - forecasted).clip(min=0).rename("total_cap") max_growth = techs.max_capacity_growth initial = capacity.sel(year=year, drop=True) growth_cap = initial * (max_growth * forecast + 1) - forecasted growth_cap = growth_cap.where(growth_cap > 0, total_cap) zero_cap = add_cap.where(add_cap < total_cap, total_cap) with_growth = zero_cap.where(zero_cap < growth_cap, growth_cap) b = with_growth.where(initial > 0, zero_cap) if b.region.dims == (): capa = 1 elif "dst_region" in b.dims: b = b.rename(region="src_region") capa = search_space.agent.region == b.src_region return xr.Dataset( dict(b=b, capacity=capa), attrs=dict(kind=ConstraintKind.UPPER_BOUND, name="max capacity expansion"), )
[docs] @register_constraints def demand( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, forecast: int = 5, interpolation: Text = "linear", ) -> Constraint: """Constraints production to meet demand.""" from muse.commodities import is_enduse enduse = technologies.commodity.sel(commodity=is_enduse(technologies.comm_usage)) b = demand.sel(commodity=demand.commodity.isin(enduse)) if "region" in b.dims and "dst_region" in assets.dims: b = b.rename(region="dst_region") assert "year" not in b.dims return xr.Dataset( dict(b=b, production=1), attrs=dict(kind=ConstraintKind.LOWER_BOUND) )
[docs] @register_constraints def search_space( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, forecast: int = 5, ) -> Optional[Constraint]: """Removes disabled technologies.""" if search_space.all(): return None capacity = cast(xr.DataArray, 1 - 2 * cast(np.ndarray, search_space)) b = xr.zeros_like(capacity) return xr.Dataset( dict(b=b, capacity=capacity), attrs=dict(kind=ConstraintKind.UPPER_BOUND) )
[docs] @register_constraints def max_production( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, ) -> Constraint: """Constructs constraint between capacity and maximum production. Constrains the production decision variable by the maximum production for a given capacity. """ from xarray import ones_like, zeros_like from muse.commodities import is_enduse from muse.timeslices import QuantityType, convert_timeslice if year is None: year = int(market.year.min()) commodities = technologies.commodity.sel( commodity=is_enduse(technologies.comm_usage) ) replacement = search_space.replacement replacement = replacement.drop_vars( [u for u in replacement.coords if u not in replacement.dims] ) kwargs = dict(technology=replacement, year=year, commodity=commodities) if "region" in search_space.coords and "region" in technologies.dims: kwargs["region"] = search_space.region techs = ( technologies[["fixed_outputs", "utilization_factor"]] .sel(**kwargs) .drop_vars("technology") ) capacity = convert_timeslice( techs.fixed_outputs * techs.utilization_factor, market.timeslice, QuantityType.EXTENSIVE, ) if "asset" not in capacity.dims and "asset" in search_space.dims: capacity = capacity.expand_dims(asset=search_space.asset) production = ones_like(capacity) b = zeros_like(production) # Include maxaddition constraint in max production to match region-dst_region if "dst_region" in assets.dims: b = b.expand_dims(dst_region=assets.dst_region) capacity = capacity.rename(region="src_region") production = production.rename(region="src_region") maxadd = technologies.max_capacity_addition.rename(region="src_region") if "year" in maxadd.dims: maxadd = maxadd.sel(year=year) maxadd = maxadd.rename(technology="replacement") maxadd = maxadd.where(maxadd == 0, 0.0) maxadd = maxadd.where(maxadd > 0, -1.0) capacity = capacity * maxadd production = production * maxadd b = b.rename(region="src_region") return xr.Dataset( dict(capacity=-cast(np.ndarray, capacity), production=production, b=b), attrs=dict(kind=ConstraintKind.UPPER_BOUND), )
@register_constraints def minimum_service( demand: xr.DataArray, assets: xr.Dataset, search_space: xr.DataArray, market: xr.Dataset, technologies: xr.Dataset, year: Optional[int] = None, ) -> Optional[Constraint]: """Constructs constraint between capacity and minimum service.""" from xarray import ones_like, zeros_like from muse.commodities import is_enduse from muse.timeslices import QuantityType, convert_timeslice if "minimum_service_factor" not in technologies.data_vars: return None if np.all(technologies["minimum_service_factor"] == 0): return None if year is None: year = int(market.year.min()) commodities = technologies.commodity.sel( commodity=is_enduse(technologies.comm_usage) ) replacement = search_space.replacement replacement = replacement.drop_vars( [u for u in replacement.coords if u not in replacement.dims] ) kwargs = dict(technology=replacement, year=year, commodity=commodities) if "region" in search_space.coords and "region" in technologies.dims: kwargs["region"] = assets.region techs = ( technologies[["fixed_outputs", "utilization_factor", "minimum_service_factor"]] .sel(**kwargs) .drop_vars("technology") ) capacity = convert_timeslice( techs.fixed_outputs * techs.utilization_factor * techs.minimum_service_factor, market.timeslice, QuantityType.EXTENSIVE, ) if "asset" not in capacity.dims: capacity = capacity.expand_dims(asset=search_space.asset) production = ones_like(capacity) b = zeros_like(production) return xr.Dataset( dict(capacity=-cast(np.ndarray, capacity), production=production, b=b), attrs=dict(kind=ConstraintKind.LOWER_BOUND), )
[docs] def lp_costs( technologies: xr.Dataset, costs: xr.DataArray, timeslices: xr.DataArray ) -> xr.Dataset: """Creates costs for solving with scipy's LP solver. Example: We can now construct example inputs to the function from the sample model. The costs will be a matrix where each assets has a candidate replacement technology. >>> from muse import examples >>> technologies = examples.technodata("residential", model="medium") >>> search_space = examples.search_space("residential", model="medium") >>> timeslices = examples.sector("residential", model="medium").timeslices >>> costs = ( ... search_space ... * np.arange(np.prod(search_space.shape)).reshape(search_space.shape) ... ) The function returns the LP vector split along capacity and production variables. >>> from muse.constraints import lp_costs >>> lpcosts = lp_costs( ... technologies.sel(year=2020, region="R1"), costs, timeslices ... ) >>> assert "capacity" in lpcosts.data_vars >>> assert "production" in lpcosts.data_vars The capacity costs correspond exactly to the input costs: >>> assert (costs == lpcosts.capacity).all() The production is zero in this context. It does not enter the cost function of the LP problem: >>> assert (lpcosts.production == 0).all() They should correspond to a data-array with dimensions ``(asset, replacement)`` (and possibly ``region`` as well). >>> lpcosts.capacity.dims ('asset', 'replacement') The production costs are zero by default. However, the production expands over not only the dimensions of the capacity, but also the ``timeslice`` during which production occurs and the ``commodity`` produced. >>> lpcosts.production.dims ('timeslice', 'asset', 'replacement', 'commodity') """ from xarray import zeros_like from muse.commodities import is_enduse from muse.timeslices import convert_timeslice assert "year" not in technologies.dims ts_costs = convert_timeslice(costs, timeslices) selection = dict( commodity=is_enduse(technologies.comm_usage), technology=technologies.technology.isin(costs.replacement), ) if "region" in technologies.fixed_outputs.dims and "region" in ts_costs.coords: selection["region"] = ts_costs.region fouts = technologies.fixed_outputs.sel(selection).rename(technology="replacement") # lpcosts.dims = Frozen({'asset': 2, # 'replacement': 2, # 'timeslice': 3, # 'commodity': 1}) # muse38: lpcosts.dims = Frozen({'asset': 2, , # 'commodity': 1 # 'replacement': 2, # 'timeslice': 3}) production = zeros_like(ts_costs * fouts) for dim in production.dims: if isinstance(production.get_index(dim), pd.MultiIndex): production[dim] = pd.Index(production.get_index(dim), tupleize_cols=False) return xr.Dataset(dict(capacity=costs, production=production))
def merge_lp( costs: xr.Dataset, *constraints: Constraint ) -> Tuple[xr.Dataset, List[Constraint]]: """Unify coordinate systems of costs and constraints. In practice, this function brings costs and constraints into a single xr.Dataset and then splits things up again. This ensures the dimensions are not only compatible, but also such that that their order in memory is the same. """ from xarray import merge data = merge( [costs] + [ constraint.rename( b=f"b{i}", capacity=f"capacity{i}", production=f"production{i}" ) for i, constraint in enumerate(constraints) ] ) unified_costs = cast(xr.Dataset, data[["capacity", "production"]]) unified_constraints = [ xr.Dataset( { "capacity": data[f"capacity{i}"], "production": data[f"production{i}"], "b": data[f"b{i}"], }, attrs=constraint.attrs, ) for i, constraint in enumerate(constraints) ] return unified_costs, unified_constraints
[docs] def lp_constraint(constraint: Constraint, lpcosts: xr.Dataset) -> Constraint: """Transforms the constraint to LP data. The goal is to create from ``lpcosts.capacity``, ``constraint.capacity``, and ``constraint.b`` a 2d-matrix ``constraint`` vs ``decision variables``. #. The dimensions of ``constraint.b`` are the constraint dimensions. They are renamed ``"c(xxx)"``. #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are renamed ``"d(xxx)"``. #. ``set(b.dims).intersection(lpcosts.xxx.dims)`` are diagonal in constraint dimensions and decision variables dimension, with ``xxx`` the capacity or the production #. ``set(constraint.xxx.dims) - set(lpcosts.xxx.dims) - set(b.dims)`` are reduced by summation, with ``xxx`` the capacity or the production #. ``set(lpcosts.xxx.dims) - set(constraint.xxx.dims) - set(b.dims)`` are added for expansion, with ``xxx`` the capacity or the production See :py:func:`muse.constraints.lp_constraint_matrix` for a more detailed explanation of the transformations applied here. """ constraint = constraint.copy(deep=False) for dim in constraint.dims: if isinstance(constraint.get_index(dim), pd.MultiIndex): constraint[dim] = pd.Index(constraint.get_index(dim), tupleize_cols=False) b = constraint.b.drop_vars(set(constraint.b.coords) - set(constraint.b.dims)) b = b.rename({k: f"c({k})" for k in b.dims}) capacity = lp_constraint_matrix(constraint.b, constraint.capacity, lpcosts.capacity) capacity = capacity.drop_vars(set(capacity.coords) - set(capacity.dims)) production = lp_constraint_matrix( constraint.b, constraint.production, lpcosts.production ) production = production.drop_vars(set(production.coords) - set(production.dims)) return xr.Dataset( {"b": b, "capacity": capacity, "production": production}, attrs=constraint.attrs )
[docs] def lp_constraint_matrix( b: xr.DataArray, constraint: xr.DataArray, lpcosts: xr.DataArray ): """Transforms one constraint block into an lp matrix. The goal is to create from ``lpcosts``, ``constraint``, and ``b`` a 2d-matrix of constraints vs decision variables. #. The dimensions of ``b`` are the constraint dimensions. They are renamed ``"c(xxx)"``. #. The dimensions of ``lpcosts`` are the decision-variable dimensions. They are renamed ``"d(xxx)"``. #. ``set(b.dims).intersection(lpcosts.dims)`` are diagonal in constraint dimensions and decision variables dimension #. ``set(constraint.dims) - set(lpcosts.dims) - set(b.dims)`` are reduced by summation #. ``set(lpcosts.dims) - set(constraint.dims) - set(b.dims)`` are added for expansion #. ``set(b.dims) - set(constraint.dims) - set(lpcosts.dims)`` are added for expansion. Such dimensions only make sense if they consist of one point. The result is the constraint matrix, expanded, reduced and diagonalized for the conditions above. Example: Lets first setup a constraint and a cost matrix: >>> from muse import examples >>> from muse import constraints as cs >>> res = examples.sector("residential", model="medium") >>> technologies = res.technologies >>> market = examples.residential_market("medium") >>> search = examples.search_space("residential", model="medium") >>> assets = next(a.assets for a in res.agents if a.category == "retrofit") >>> demand = None # not used in max production >>> constraint = cs.max_production(demand, assets, search, market, ... technologies) # noqa: E501 >>> lpcosts = cs.lp_costs( ... ( ... technologies ... .interp(year=market.year.min() + 5) ... .drop_vars("year") ... .sel(region=assets.region) ... ), ... costs=search * np.arange(np.prod(search.shape)).reshape(search.shape), ... timeslices=market.timeslice, ... ) For a simple example, we can first check the case where b is scalar. The result ought to be a single row of a matrix, or a vector with only decision variables: >>> from pytest import approx >>> result = cs.lp_constraint_matrix( ... xr.DataArray(1), constraint.capacity, lpcosts.capacity ... ) >>> assert result.values == approx(-1) >>> assert set(result.dims) == {f"d({x})" for x in lpcosts.capacity.dims} >>> result = cs.lp_constraint_matrix( ... xr.DataArray(1), constraint.production, lpcosts.production ... ) >>> assert set(result.dims) == {f"d({x})" for x in lpcosts.production.dims} >>> assert result.values == approx(1) As expected, the capacity vector is 1, whereas the production vector is -1. These are the values the :py:func:`~muse.constraints.max_production` is set up to create. Now, let's check the case where ``b`` is the one from the :py:func:`~muse.constraints.max_production` constraint. In that case, all the dimensions should end up as constraint dimensions: the production for each timeslice, region, asset, and replacement technology should not outstrip the capacity assigned for the asset and replacement technology. >>> result = cs.lp_constraint_matrix( ... constraint.b, constraint.capacity, lpcosts.capacity ... ) >>> decision_dims = {f"d({x})" for x in lpcosts.capacity.dims} >>> constraint_dims = { ... f"c({x})" ... for x in set(lpcosts.production.dims).union(constraint.b.dims) ... } >>> assert set(result.dims) == decision_dims.union(constraint_dims) The :py:func:`~muse.constraints.max_production` constraint on the production side is the identy matrix with a factor :math:`-1`. We can easily check this by stacking the decision and constraint dimensions in the result: >>> result = cs.lp_constraint_matrix( ... constraint.b, constraint.production, lpcosts.production ... ) >>> decision_dims = {f"d({x})" for x in lpcosts.production.dims} >>> assert set(result.dims) == decision_dims.union(constraint_dims) >>> stacked = result.stack(d=sorted(decision_dims), c=sorted(constraint_dims)) >>> assert stacked.shape[0] == stacked.shape[1] >>> assert stacked.values == approx(np.eye(stacked.shape[0])) """ from functools import reduce from numpy import eye result = constraint.sum(set(constraint.dims) - set(lpcosts.dims) - set(b.dims)) result = result.rename( {k: f"d({k})" for k in set(result.dims).intersection(lpcosts.dims)} ) result = result.rename( {k: f"c({k})" for k in set(result.dims).intersection(b.dims)} ) expand = set(lpcosts.dims) - set(constraint.dims) - set(b.dims) if expand == {"timeslice", "asset", "commodity"}: expand = ["asset", "timeslice", "commodity"] result = result.expand_dims({f"d({k})": lpcosts[k] for k in expand}) expand = set(b.dims) - set(constraint.dims) - set(lpcosts.dims) result = result.expand_dims({f"c({k})": b[k] for k in expand}) diag_dims = set(b.dims).intersection(lpcosts.dims) diag_dims = sorted(diag_dims) if diag_dims: def get_dimension(dim): if dim in b.dims: return b[dim].values if dim in lpcosts.dims: return lpcosts[dim].values return constraint[dim].values diagonal_submats = [ xr.DataArray( eye(len(b[k])), coords={f"c({k})": get_dimension(k), f"d({k})": get_dimension(k)}, dims=(f"c({k})", f"d({k})"), ) for k in diag_dims ] result = result * reduce(xr.DataArray.__mul__, diagonal_submats) return result
[docs] @dataclass class ScipyAdapter: """Creates the input for the scipy solvers. Example: Lets give a fist simple example. The constraint :py:func:`~muse.constraints.max_capacity_expansion` limits how much each capacity can be expanded in a given year. >>> from muse import examples >>> from muse.quantities import maximum_production >>> from muse.timeslices import convert_timeslice >>> from muse import constraints as cs >>> res = examples.sector("residential", model="medium") >>> market = examples.residential_market("medium") >>> search = examples.search_space("residential", model="medium") >>> assets = next(a.assets for a in res.agents if a.category == "retrofit") >>> market_demand = 0.8 * maximum_production( ... res.technologies.interp(year=2025), ... convert_timeslice( ... assets.capacity.sel(year=2025).groupby("technology").sum("asset"), ... market.timeslice, ... ), ... ).rename(technology="asset") >>> costs = search * np.arange(np.prod(search.shape)).reshape(search.shape) >>> constraint = cs.max_capacity_expansion( ... market_demand, assets, search, market, res.technologies, ... ) The constraint acts over capacity decision variables only: >>> assert constraint.production.data == np.array(0) >>> assert len(constraint.production.dims) == 0 It is an upper bound for a straightforward sum over the capacities for a given technology. The matrix operator is simply the identity: >>> assert constraint.capacity.data == np.array(1) >>> assert len(constraint.capacity.dims) == 0 And the upperbound is expanded over the replacement technologies, but not over the assets. Hence the assets will be summed over in the final constraint: >>> assert (constraint.b.data == np.array([50.0, 3.0, 3.0, 50.0 ])).all() >>> assert set(constraint.b.dims) == {"replacement"} >>> assert constraint.kind == cs.ConstraintKind.UPPER_BOUND As shown above, it does not bind the production decision variables. Hence, production is zero. The matrix operator for the capacity is simply the identity. Hence it can be inputted as the dimensionless scalar 1. The upper bound is simply the maximum for replacement technology (and region, if that particular dimension exists in the problem). The lp problem then becomes: >>> technologies = res.technologies.interp(year=market.year.min() + 5) >>> inputs = cs.ScipyAdapter.factory( ... technologies, costs, market.timeslice, constraint ... ) The decision variables are always constrained between zero and infinity: >>> assert inputs.bounds == (0, np.inf) The problem is an upper-bound one. There are no equality constraints: >>> assert inputs.A_eq is None >>> assert inputs.b_eq is None The upper bound matrix and vector, and the costs are consistent in their dimensions: >>> assert inputs.c.ndim == 1 >>> assert inputs.b_ub.ndim == 1 >>> assert inputs.A_ub.ndim == 2 >>> assert inputs.b_ub.size == inputs.A_ub.shape[0] >>> assert inputs.c.size == inputs.A_ub.shape[1] >>> assert inputs.c.ndim == 1 In practice, :py:func:`~muse.constraints.lp_costs` helps us define the decision variables (and ``c``). We can verify that the sizes are consistent: >>> lpcosts = cs.lp_costs(technologies, costs, market.timeslice) >>> capsize = lpcosts.capacity.size >>> prodsize = lpcosts.production.size >>> assert inputs.c.size == capsize + prodsize The upper bound itself is over each replacement technology: >>> assert inputs.b_ub.size == lpcosts.replacement.size The production decision variables are not involved: >>> from pytest import approx >>> assert inputs.A_ub[:, capsize:] == approx(0) The matrix for the capacity decision variables is a sum over assets for a given replacement technology. Hence, each row is constituted of zeros and ones and sums to the number of assets: >>> assert inputs.A_ub[:, :capsize].sum(axis=1) == approx(lpcosts.asset.size) >>> assert set(inputs.A_ub[:, :capsize].flatten()) == {0.0, 1.0} """ c: np.ndarray to_muse: Callable[[np.ndarray], xr.Dataset] bounds: Tuple[Optional[float], Optional[float]] = (0, np.inf) A_ub: Optional[np.ndarray] = None b_ub: Optional[np.ndarray] = None A_eq: Optional[np.ndarray] = None b_eq: Optional[np.ndarray] = None @classmethod def factory( cls, technologies: xr.Dataset, costs: xr.DataArray, timeslices: pd.Index, *constraints: Constraint, ) -> ScipyAdapter: lpcosts = lp_costs(technologies, costs, timeslices) data = cls._unified_dataset(technologies, lpcosts, *constraints) capacities = cls._selected_quantity(data, "capacity") productions = cls._selected_quantity(data, "production") bs = cls._selected_quantity(data, "b") kwargs = cls._to_scipy_adapter(capacities, productions, bs, *constraints) def to_muse(x: np.ndarray) -> xr.Dataset: return ScipyAdapter._back_to_muse(x, capacities.costs, productions.costs) return ScipyAdapter(to_muse=to_muse, **kwargs) @property def kwargs(self): return { "c": self.c, "A_eq": self.A_eq, "b_eq": self.b_eq, "A_ub": self.A_ub, "b_ub": self.b_ub, "bounds": self.bounds, } @staticmethod def _unified_dataset( technologies: xr.Dataset, lpcosts: xr.Dataset, *constraints: Constraint ) -> xr.Dataset: """Creates single xr.Dataset from costs and constraints.""" from xarray import merge assert "year" not in technologies.dims coords = sorted([k for k in lpcosts.dims]) lpcosts_df = lpcosts.to_dataframe().reset_index().set_index(coords) slpcosts = lpcosts_df.to_xarray() # sorted lpcosts.dims data = merge( [lpcosts.rename({k: f"d({k})" for k in slpcosts.dims})] + [ lp_constraint(constraint, lpcosts).rename( b=f"b{i}", capacity=f"capacity{i}", production=f"production{i}" ) for i, constraint in enumerate(constraints) ] ) for i, constraint in enumerate(constraints): if constraint.kind == ConstraintKind.LOWER_BOUND: data[f"b{i}"] = -data[f"b{i}"] # type: ignore data[f"capacity{i}"] = -data[f"capacity{i}"] # type: ignore data[f"production{i}"] = -data[f"production{i}"] # type: ignore return data.transpose(*data.dims) @staticmethod def _selected_quantity(data: xr.Dataset, name: Text) -> xr.Dataset: result = cast( xr.Dataset, data[[u for u in data.data_vars if str(u).startswith(name)]] ) return result.rename( { k: ("costs" if k == name else int(str(k).replace(name, ""))) for k in result.data_vars } ) @staticmethod def _to_scipy_adapter( capacities: xr.Dataset, productions: xr.Dataset, bs: xr.Dataset, *constraints ): def reshape(matrix: xr.DataArray) -> np.ndarray: if list(matrix.dims) != sorted(matrix.dims): new_dims = sorted(matrix.dims) matrix = matrix.transpose(*new_dims) # before building LP we need to sort dimensions for consistency size = np.prod( [matrix[u].shape[0] for u in matrix.dims if str(u).startswith("c")] ) return matrix.values.reshape((size, -1)) def extract_bA(constraints, *kinds): indices = [i for i in range(len(bs)) if constraints[i].kind in kinds] capa_constraints = [reshape(capacities[i]) for i in indices] prod_constraints = [reshape(productions[i]) for i in indices] if capa_constraints: A: Optional[np.ndarray] = np.concatenate( ( np.concatenate(capa_constraints, axis=0), np.concatenate(prod_constraints, axis=0), ), axis=1, ) b: Optional[np.ndarray] = np.concatenate( [bs[i].stack(constraint=sorted(bs[i].dims)) for i in indices], axis=0, ) else: A = None b = None return A, b c = np.concatenate( ( cast(np.ndarray, capacities["costs"].values).flatten(), cast(np.ndarray, productions["costs"].values).flatten(), ), axis=0, ) A_ub, b_ub = extract_bA( constraints, ConstraintKind.UPPER_BOUND, ConstraintKind.LOWER_BOUND ) A_eq, b_eq = extract_bA(constraints, ConstraintKind.EQUALITY) return { "c": c, "A_ub": A_ub, "b_ub": b_ub, "A_eq": A_eq, "b_eq": b_eq, "bounds": (0, np.inf), } @staticmethod def _back_to_muse_quantity( x: np.ndarray, template: Union[xr.DataArray, xr.Dataset] ) -> xr.DataArray: result = xr.DataArray( x.reshape(template.shape), coords=template.coords, dims=template.dims ) return result.rename({k: str(k)[2:-1] for k in result.dims}) @staticmethod def _back_to_muse( x: np.ndarray, capacity: xr.DataArray, production: xr.DataArray ) -> xr.Dataset: capa = ScipyAdapter._back_to_muse_quantity(x[: capacity.size], capacity) prod = ScipyAdapter._back_to_muse_quantity(x[capacity.size :], production) return xr.Dataset({"capacity": capa, "production": prod})