r"""Collection of demand-matching algorithms.
At it's simplest, the demand matching algorithm solves the following problem,
- given a demand for a commodity :math:`D_d`, with :math:`d\in\mathcal{D}`
- given processes to supply these commodities, with an associated cost per process,
:math:`C_{d, i}`, with :math:`i\in\mathcal{I}`
Match demand and supply while minimizing the associated cost.
.. math::
\min_{X} \sum_{d, i} C_{d,i} X_{d, i}
X_{d, i} \geq 0
\sum_o X_o \geq D_d
The basic algorithm proceeds as follows:
#. sort all costs :math:`C_{d, i}` across both :math:`d` and :math:`i`
#. for each cost :math:`c_0` in order:
#. find the set of indices :math:`\mathcal{C}\subseteq\mathcal{D}\cup\mathcal{I}`
for which
.. math::
\forall (d, i) \in \mathcal{C}\quad C_{d, i} == c_0
#. determine the partial result for the current cost
.. math::
\forall (d, i) \in \mathcal{C}\quad X_{d, i} = \frac{D_d}{|i\in\mathcal{C}|}
Where :math:`|i\in\mathcal{C}|` indicates the number of indices :math:`i` in
:math:`\mathcal{C}`.
However, in practice, the problem to solve often contains constraints, e.g. a constraint
on production :math:`\sum_d X_{d, i} \leq M_i`. The algorithms in this module try and
solve these constrained problems one way or another.
"""
__all__ = ["demand_matching"]
from typing import Optional, Set
from xarray import DataArray
[docs]
def demand_matching(
demand: DataArray,
cost: DataArray,
*constraints: DataArray,
protected_dims: Optional[Set] = None,
) -> DataArray:
r"""Demand matching over heterogeneous dimensions.
This algorithm enables demand matching while enforcing constraints on how much an
asset can produce. Any set of dimensions can be matched. The algorithm is general
with respect to the dimensions in demand and cost. It also enforces constraints over
sets of indices.
.. math::
\min_{X} \sum_{d, i} C_{d, i} X_{d, i}
X_{d, i} \geq 0
\sum_i X_{d, i} \geq D_d
M_{(d, i) \in \mathcal{R}^{(\alpha)}}^{(\alpha)}
\geq \sum_{(d, i)\notin\mathcal{R}^{(\alpha)}} X_{d, i}
Where :math:`\alpha` is an index running over constraints,
:math:`\mathcal{R}^{(\alpha)}\subseteq\mathcal{D}\cup\mathcal{I}` is a subset of
indices.
The algorithm proceeds as described in :py:mod:`muse.demand_matching`.
However, an extra step is added to ensure that the solutions falls within the
convex-hull formed by the constraints. This projects the current solution onto the
constraint. Hence, the solution will depend on the order in which the constraints
are given.
#. sort all costs :math:`C_{d, m}` across both :math:`d` and :math:`m`
#. for each cost :math:`c_0` in order:
#. find the set of indices :math:`\mathcal{C}`
.. math::
\mathcal{C}\subseteq\mathcal{D}\cup\mathcal{I}
\forall (d, i) \in \mathcal{C}\quad C_{d, i} == c_0
#. determine an interim partial result for the current cost
.. math::
\forall (d, i) \in \mathcal{C}\quad
\delta X_{d, i} = \frac{1}{|i\in\mathcal{C}|}\left(
D_d - \sum_{j \in \mathcal{I}} X_{d, j}\right)
Where :math:`|i\in\mathcal{C}|` indicates the number of :math:`i` indices in
:math:`\mathcal{C}`. The expression in the parenthesis is the currently
unserviced demand.
#. Loop over each constraint :math:`\alpha`. Below we drop the index
:math:`\alpha` over constraints for simplicity.
#. Determine the excess over the constraint:
.. math::
E_{(d, i) \in \mathcal{R}} = \max\left\{
0,
\sum_{(d, i)\notin\mathcal{R}}\left(
X_{d, i} + \delta X_{d, i}
\right) - M_{(d, i) \in \mathcal{R}}
\right\}
#. Correct :math:`\delta X` as follows:
.. math::
\forall (d, i) \in \mathcal{C}\cap\mathcal{R}\quad
\delta X\prime_{d, i} =
E_{(d, i)}
\frac{\delta X_{(d, i)}}{
\sum_{(e, j)\in \mathcal{C}\cap\mathcal{R}} \delta X_{(e,j)}
}
\forall (d, i) \notin \mathcal{R}, (d, i)\in\mathcal{C} \quad
\delta X\prime_{d, i} = 0
#. Set :math:`\delta X = \max(0, \delta X - \delta X\prime)`
A more complex problem would see independent dimensions for each quantity. In that,
case we can reduce to the original problem as shown here
.. math::
C_{d, i, c} = \min_cC\prime_{d, i, c}
D_d = \sum_{d\prime} D\prime_{d, d\prime}
M_r = \sum_m M\prime_{r, m}
X_{d, d\prime, i, m, c} =
\left(C\prime_{d, i, c} == C_{d, i}\right)
\frac{M\prime_{r, m}}{M_r} \frac{D\prime_{d, d\prime}}{D_d} X_{d, i}
A dimension could be shared by all quantities, in which case each point along that
dimension is treated as independent.
Similarly, if a dimension is shared only by the demand and a constraint but not by
the cost, then the problem can be reduced a set of problems independent along that
direction.
Arguments:
demand: Demand to match with production. It should have the same physical units
as `max_production`.
cost: Cost to minimize while fulfilling the demand.
*constraints: each item is a separate constraint :math:`M_r`.
protected_dims: Dimensions that will not be modified
Returns:
An array with the joint dimensionality of `max_production`, `cost`, and
`demand`, containing the supply that fulfills the demand. The units of this
supply are the same as `demand` and `max_production`.
"""
from pandas import MultiIndex
from xarray import Dataset
if protected_dims is None:
protected_dims = set()
# demand has extra dimensions unique to it
extra_dims = set(demand.dims).difference(
cost.dims, *(cons.dims for cons in constraints)
)
if extra_dims:
summed_demand = demand.sum(extra_dims)
demand_share = (demand / summed_demand).where(demand > 0, 0)
result = demand_matching(summed_demand, cost, *constraints)
return demand_share * result
# a constraint has dimensions unique to it
for i, constraint in enumerate(constraints):
others = [cons for j, cons in enumerate(constraints) if j != i]
extra_dims = set(constraint.dims).difference(
demand.dims, cost.dims, *(cons.dims for cons in others), protected_dims
)
if extra_dims:
summed_constraint = constraint.sum(extra_dims)
share = (constraint / summed_constraint).where(constraint > 0, 0)
others.insert(i, summed_constraint)
result = demand_matching(demand, cost, *others)
return share * result
# cost has extra dimensions unique to it
extra_dims = set(cost.dims).difference(
demand.dims, *(cons.dims for cons in constraints), protected_dims
)
if extra_dims:
mincost = cost.min(extra_dims)
result = demand_matching(demand, mincost, *constraints)
structure = cost == mincost
cost_share = (1 / structure.sum(extra_dims)).where(structure, 0)
return result * cost_share
# missing coordinates in max_production
# add a fake coordinate so that the individual items in coordinate-less dimension
# can be addressed at the end of the main loop.
ds = Dataset(
{
**{f"constraint{i}": cons for i, cons in enumerate(constraints)},
"cost": cost,
"demand": demand,
}
)
nocoords = set(ds.dims).difference(ds.coords.keys())
if nocoords:
for coord in nocoords:
ds.coords[coord] = coord, ds.coords[coord].values
result = demand_matching( # type: ignore
ds.demand, ds.cost, *(ds[f"constraint{i}"] for i in range(len(constraints)))
)
return result.drop_vars(nocoords)
# multi-index dimensions seem to make life complicated for groupby
# so drop them for the duration of the call.
# see https://github.com/pydata/xarray/issues/1603
multics = {
k: ds.coords[k] for k in ds.dims if isinstance(ds.get_index(k), MultiIndex)
}
if len(multics) > 0:
for k in multics:
ds.coords[k] = list(range(len(ds[k])))
result = demand_matching( # type: ignore
ds.demand, ds.cost, *(ds[f"constraint{i}"] for i in range(len(constraints)))
)
for k, v in multics.items():
result.coords[k] = v
return result
return _demand_matching_impl(demand, cost, *constraints)
def _demand_matching_impl(
demand: DataArray, cost: DataArray, *constraints: DataArray
) -> DataArray:
"""Implementation of demand matching.
It is expected that demand does not have dimension unique to itself, and that all
dimensions have coordinates. Input sanitization is performed in `demand_matching`
proper.
"""
from numpy import isnan, prod
from xarray import Dataset, align, full_like
assert not set(demand.dims).difference(
cost.dims, *(cons.dims for cons in constraints)
)
result = full_like(sum(constraints) + demand + cost, 0) # type: ignore
names = [f"constraint{i}" for i in range(len(constraints))]
data = Dataset(
{
"cost": cost,
"demand": demand.copy(deep=True),
**dict(zip(names, constraints)),
}
)
def expand_dims(x, like):
"""Add extra dims that are in ``like``."""
N = max(1, prod([len(like[d]) for d in like.dims if d not in x.dims]))
b = (x / N).expand_dims(**{d: like[d] for d in like.dims if d not in x.dims})
return b
def remove_dims(x, to):
"""Remove extra dims that are not in ``to``."""
extras = set(x.dims).difference(to.dims)
a = x.sum(extras) / (~isnan(x)).sum(extras)
return a
idims = [dim for dim in result.dims if dim not in demand.dims]
for _, same_cost in data.groupby("cost") if cost.dims else [(cost, data)]:
current = same_cost.drop_vars("cost").unstack()
assert (~isnan(result)).all()
delta_x = expand_dims(
(remove_dims(current.demand, demand) - result.sum(idims)).clip(0), current
)
for cname in names:
constraint = remove_dims(current[cname], data[cname])
delta_x, constraint = align(delta_x, constraint, join="outer", fill_value=0)
excess = (
result.sum(set(data.dims).difference(data[cname].dims))
+ delta_x.sum(set(data.dims).difference(data[cname].dims))
- constraint
).clip(min=0)
excess_share = (
excess
* (
delta_x / delta_x.sum(set(delta_x.dims).difference(constraint.dims))
).fillna(0)
).fillna(0)
delta_x = (expand_dims(delta_x, excess_share) - excess_share).clip(0)
result = sum(align(result, delta_x.fillna(0), fill_value=0, join="left"))
return result