Source code for muse.demand_share

"""Demand share computations.

The demand share splits a demand amongst agents. It is used within a sector to assign
part of the input MCA demand to each agent.

Demand shares functions should be registered via the decorator `register_demand_share`.

Demand share functions are not expected to modify any of their arguments. They
should all have the following signature:

.. code-block:: Python

    @register_demand_share
    def demand_share(
        agents: Sequence[AbstractAgent],
        demand: xr.Dataarray,
        technologies: xr.Dataset,
        **kwargs
    ) -> xr.DataArray:
        pass

Arguments:
    agents: a sequence of  agent relevant to the demand share procedure. The agent can
        be queried for parameters specific to the demand share procedure. For instance,
        :py:func`new_and_retro` will query the agents for the assets they own, the
        region they are contained with, their category (new or retrofit), etc...
    demand: DataArray of commodity demands for the current year and investment year.
    technologies: a dataset containing all constant data characterizing the
        technologies.
    kwargs: Any number of keyword arguments that can parametrize how the demand is
        shared. These keyword arguments can be modified from the TOML file.

Returns:
    The unmet consumption. Unless indicated, all agents will compete for a the full
    demand. However, if there exists a coordinate "agent" of dimension "asset" giving
    the :py:attr:`~muse.agents.agent.AbstractAgent.uuid` of the agent, then agents will
    only service that part of the demand.
"""

from __future__ import annotations

__all__ = [
    "DEMAND_SHARE_SIGNATURE",
    "factory",
    "new_and_retro",
    "register_demand_share",
    "unmet_demand",
    "unmet_forecasted_demand",
]

from collections.abc import Hashable, Mapping, MutableMapping, Sequence
from typing import (
    Any,
    Callable,
    cast,
)

import xarray as xr
from mypy_extensions import KwArg

from muse.agents import AbstractAgent
from muse.errors import (
    AgentWithNoAssetsInDemandShare,
    RetrofitAgentInStandardDemandShare,
)
from muse.registration import registrator
from muse.utilities import check_dimensions

DEMAND_SHARE_SIGNATURE = Callable[
    [Sequence[AbstractAgent], xr.DataArray, xr.Dataset, KwArg(Any)], xr.DataArray
]
"""Demand share signature."""

DEMAND_SHARE: MutableMapping[str, DEMAND_SHARE_SIGNATURE] = {}
"""Dictionary of demand share functions."""


[docs] @registrator(registry=DEMAND_SHARE, loglevel="info") def register_demand_share(function: DEMAND_SHARE_SIGNATURE): """Decorator to register a function as a demand share calculation.""" return function
def factory( settings: str | Mapping[str, Any] | None = None, ) -> DEMAND_SHARE_SIGNATURE: if settings is None or isinstance(settings, str): name = settings or "standard_demand" params: Mapping[str, Any] = {} else: name = settings.get("name", "standard_demand") params = {k: v for k, v in settings.items() if k != "name"} function = DEMAND_SHARE[name] keywords = dict(**params) def demand_share( agents: Sequence[AbstractAgent], demand: xr.DataArray, technologies: xr.Dataset, **kwargs, ) -> xr.DataArray: from copy import copy keyword_args = copy(keywords) keyword_args.update(**kwargs) # Check inputs check_dimensions( demand, ["commodity", "year", "timeslice", "region"], optional=["dst_region"], ) assert len(demand.year) == 2 check_dimensions( technologies, ["technology", "region"], optional=["timeslice", "commodity", "dst_region"], ) # Calculate demand share result = function(agents, demand, technologies, **keyword_args) # Check result check_dimensions( result, ["timeslice", "commodity"], optional=["asset", "region"] ) # TODO: asset should be required, but trade model is failing return result return cast(DEMAND_SHARE_SIGNATURE, demand_share)
[docs] @register_demand_share(name="new_and_retro") def new_and_retro( agents: Sequence[AbstractAgent], demand: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ) -> xr.DataArray: r"""Splits demand across new and retro agents. The input demand is split amongst both *new* and *retrofit* agents. *New* agents get a share of the increase in demand for the investment year, whereas *retrofit* agents are assigned a share of the demand that occurs from decommissioned assets. Args: agents: a list of all agents. This list should mainly be used to determine the type of an agent and the assets it owns. The agents will not be modified in any way. demand: commodity demands for the current year and investment year. technologies: quantities describing the technologies. timeslice_level: the timeslice level of the sector (e.g. "hour", "day") Pseudo-code: #. the capacity is reduced over agents and expanded over timeslices (extensive quantity) and aggregated over agents. Generally: .. math:: A_{a, s}^r = w_s\sum_i A_a^{r, i} with :math:`w_s` a weight associated with each timeslice and determined via :py:func:`muse.timeslices.distribute_timeslice`. #. An intermediate quantity, the :py:func:`unmet demand <muse.demand_share.unmet_demand>` :math:`U` is defined from :math:`P[\mathcal{M}, \mathcal{A}]`, a function giving the production for a given market :math:`\mathcal{M}`, the associated consumption :math:`\mathcal{C}`, and aggregate assets :math:`\mathcal{A}`: .. math:: U[\mathcal{M}, \mathcal{A}] = \max(\mathcal{C} - P[\mathcal{M}, \mathcal{A}], 0) where :math:`\max` operates element-wise, and indices have been dropped for simplicity. The resulting expression has the same indices as the consumption :math:`\mathcal{C}_{c, s}^r`. :math:`P` is the maximum production, given by <muse.quantities.maximum_production>`. #. the *new* demand :math:`N` is defined as: .. math:: N = \min\left( \mathcal{C}_{c, s}^r(y + \Delta y) - \mathcal{C}_{c, s}^r(y), U[\mathcal{M}^r(y + \Delta y), \mathcal{A}_{a, s}^r(y)] \right) #. the *retrofit* demand :math:`R` is defined from the identity .. math:: C_{c, s}^r(y + \Delta y) = P[\mathcal{M}^r(y+\Delta y), \mathcal{A}_{a, s}^r(y + \Delta y)] + N_{c, s}^r + R_{c, s}^r In other words, it is the share of the forecasted consumption that is serviced neither by the current assets still present in the investment year, nor by the *new* agent. #. then each *new* agent gets a share of :math:`N` proportional to it's share of the :py:func:`production <muse.quantities.maximum_production>`, :math:`P[\mathcal{A}_{a, s}^{r, i}(y)]`. Then the share of the demand for new agent :math:`i` is: .. math:: N_{c, s, t}^{i, r}(y) = N_{c, s}^r \frac{\sum_\iota P[\mathcal{A}_{s, t, \iota}^{r, i}(y)]} {\sum_{i, t, \iota}P[\mathcal{A}_{s, t, \iota}^{r, i}(y)]} #. similarly, each *retrofit* agent gets a share of :math:`N` proportional to it's share of the :py:func:`decommissioning demand <muse.quantities.decommissioning_demand>`, :math:`D^{r, i}_{t, c}`. Then the share of the demand for retrofit agent :math:`i` is: .. math:: R_{c, s, t}^{i, r}(y) = R_{c, s}^r \frac{\sum_\iota\mathcal{D}_{t, c, \iota}^{i, r}(y)} {\sum_{i, t, \iota}\mathcal{D}_{t, c, \iota}^{i, r}(y)} Note that in the last two steps, the assets owned by the agent are aggregated over the installation year. The effect is that the demand serviced by agents is disaggregated over each technology, rather than not over each *model* of each technology (asset). .. SeeAlso:: :ref:`indices`, :ref:`quantities`, :ref:`Agent investments<model, agent investment>`, :py:func:`~muse.quantities.decommissioning_demand`, :py:func:`~muse.quantities.maximum_production` """ from functools import partial from muse.commodities import is_enduse from muse.quantities import maximum_production from muse.utilities import ( agent_concatenation, broadcast_over_assets, interpolate_capacity, reduce_assets, ) current_year, investment_year = map(int, demand.year.values) def decommissioning(capacity, technologies): return decommissioning_demand( technologies=technologies, capacity=interpolate_capacity( capacity, year=[current_year, investment_year] ), timeslice_level=timeslice_level, ) capacity = interpolate_capacity( reduce_assets([u.assets.capacity for u in agents]), year=[current_year, investment_year], ) # Select technodata for assets technodata = broadcast_over_assets(technologies, capacity, installed_as_year=True) demands = new_and_retro_demands( capacity, demand, technodata, timeslice_level=timeslice_level, ) demands = demands.where( is_enduse(technologies.comm_usage.sel(commodity=demands.commodity)), 0 ) quantity = { agent.name: agent.quantity for agent in agents if agent.category != "retrofit" } for agent in agents: if agent.category == "retrofit": setattr(agent, "quantity", quantity[agent.name]) id_to_share: MutableMapping[Hashable, xr.DataArray] = {} for region in demands.region.values: retro_capacity: MutableMapping[Hashable, xr.DataArray] = { agent.uuid: interpolate_capacity( agent.assets.capacity, year=[current_year, investment_year] ) for agent in agents if agent.category == "retrofit" and agent.region == region } retro_technodata: MutableMapping[Hashable, xr.Dataset] = { agent_uuid: technodata.sel(asset=retro_capacity[agent_uuid].asset) for agent_uuid in retro_capacity.keys() } name_to_id = { (agent.name, agent.region): agent.uuid for agent in agents if agent.category == "retrofit" and agent.region == region } id_to_rquantity = { agent.uuid: (agent.name, agent.region, agent.quantity) for agent in agents if agent.category == "retrofit" and agent.region == region } retro_demands: MutableMapping[Hashable, xr.DataArray] = _inner_split( retro_capacity, retro_technodata, demands.retrofit.sel(region=region), decommissioning, id_to_rquantity, ) assert len(name_to_id) == len(retro_capacity) id_to_share.update(retro_demands) new_capacity: Mapping[Hashable, xr.DataArray] = { agent.uuid: retro_capacity[name_to_id[(agent.name, agent.region)]] # * agent.quantity for agent in agents if agent.category != "retrofit" and agent.region == region } new_technodata: MutableMapping[Hashable, xr.Dataset] = { agent_uuid: technodata.sel(asset=new_capacity[agent_uuid].asset) for agent_uuid in new_capacity.keys() } id_to_nquantity = { agent.uuid: (agent.name, agent.region, agent.quantity) for agent in agents if agent.category != "retrofit" and agent.region == region } new_demands = _inner_split( new_capacity, new_technodata, demands.new.sel(region=region), partial( maximum_production, year=current_year, timeslice_level=timeslice_level, ), id_to_nquantity, ) id_to_share.update(new_demands) result = cast(xr.DataArray, agent_concatenation(id_to_share)) return result
@register_demand_share(name="default") def standard_demand( agents: Sequence[AbstractAgent], demand: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ) -> xr.DataArray: r"""Splits demand across new agents. The input demand is split amongst *new* agents. *New* agents get a share of the increase in demand for the investment year, as well as the demand that occurs from decommissioned assets. Args: agents: a list of all agents. This list should mainly be used to determine the type of an agent and the assets it owns. The agents will not be modified in any way. demand: commodity demands for the current year and investment year. technologies: quantities describing the technologies. timeslice_level: the timeslice level of the sector (e.g. "hour", "day") """ from functools import partial from muse.commodities import is_enduse from muse.quantities import maximum_production from muse.utilities import ( agent_concatenation, broadcast_over_assets, interpolate_capacity, reduce_assets, ) current_year, investment_year = map(int, demand.year.values) def decommissioning(capacity, technologies): return decommissioning_demand( technologies=technologies, capacity=interpolate_capacity( capacity, year=[current_year, investment_year] ), timeslice_level=timeslice_level, ) # Make sure there are no retrofit agents for agent in agents: if agent.category == "retrofit": raise RetrofitAgentInStandardDemandShare() # Calculate existing capacity capacity = interpolate_capacity( reduce_assets([agent.assets.capacity for agent in agents]), year=[current_year, investment_year], ) # Select technodata for assets technodata = broadcast_over_assets(technologies, capacity, installed_as_year=True) # Calculate new and retrofit demands demands = new_and_retro_demands( capacity=capacity, demand=demand, technologies=technodata, timeslice_level=timeslice_level, ) # Only consider end-use commodities demands = demands.where( is_enduse(technologies.comm_usage.sel(commodity=demands.commodity)), 0 ) id_to_share: MutableMapping[Hashable, xr.DataArray] = {} for region in demands.region.values: # Calculate current capacity current_capacity: MutableMapping[Hashable, xr.DataArray] = { agent.uuid: interpolate_capacity( agent.assets.capacity, year=[current_year, investment_year] ) for agent in agents if agent.region == region } current_technodata: MutableMapping[Hashable, xr.Dataset] = { agent_uuid: technodata.sel(asset=current_capacity[agent_uuid].asset) for agent_uuid in current_capacity.keys() } # Split demands between agents id_to_quantity = { agent.uuid: (agent.name, agent.region, agent.quantity) for agent in agents if agent.region == region } retro_demands: MutableMapping[Hashable, xr.DataArray] = _inner_split( current_capacity, current_technodata, demands.retrofit.sel(region=region), decommissioning, id_to_quantity, ) new_demands = _inner_split( current_capacity, current_technodata, demands.new.sel(region=region), partial( maximum_production, year=current_year, timeslice_level=timeslice_level, ), id_to_quantity, ) # Sum new and retrofit demands total_demands = { k: new_demands[k] + retro_demands[k] for k in new_demands.keys() } id_to_share.update(total_demands) result = cast(xr.DataArray, agent_concatenation(id_to_share)) assert "year" not in result.dims return result
[docs] @register_demand_share(name="unmet_demand") def unmet_forecasted_demand( agents: Sequence[AbstractAgent], demand: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ) -> xr.DataArray: """Forecast demand that cannot be serviced by non-decommissioned current assets.""" from muse.commodities import is_enduse from muse.utilities import ( broadcast_over_assets, interpolate_capacity, reduce_assets, ) current_year, investment_year = map(int, demand.year.values) demand = demand.where( is_enduse(technologies.comm_usage.sel(commodity=demand.commodity)), 0 ) # Calculate existing capacity capacity = interpolate_capacity( reduce_assets([agent.assets.capacity for agent in agents]), year=[current_year, investment_year], ) # Select data for future years future_demand = demand.sel(year=investment_year, drop=True) future_capacity = capacity.sel(year=investment_year) # Select technology data for assets techs = broadcast_over_assets(technologies, capacity, installed_as_year=True) # Calculate unmet demand result = unmet_demand( demand=future_demand, capacity=future_capacity, technologies=techs, timeslice_level=timeslice_level, ) assert "year" not in result.dims return result
def _inner_split( assets: Mapping[Hashable, xr.DataArray], technologies: Mapping[Hashable, xr.DataSet], demand: xr.DataArray, method: Callable, quantity: Mapping, ) -> MutableMapping[Hashable, xr.DataArray]: r"""Compute share of the demand for a set of agents. The input ``demand`` is split between agents according to their share of the demand computed by ``method``. """ from numpy import logical_and # Find decrease in capacity production by each asset over time shares: Mapping[Hashable, xr.DataArray] = { key: method(capacity=capacity, technologies=technologies[key]) .groupby("technology") .sum("asset") .rename(technology="asset") for key, capacity in assets.items() } # Total decrease in production across assets try: summed_shares: xr.DataArray = xr.concat(shares.values(), dim="concat_dim").sum( "concat_dim" ) total: xr.DataArray = summed_shares.sum("asset") except AttributeError: raise AgentWithNoAssetsInDemandShare() # Calculates the demand divided by the number of assets times the number of agents # if the demand is bigger than zero and the total demand assigned with the "method" # function is zero. n_agents = len(quantity) n_assets = summed_shares.sizes["asset"] unassigned = (demand / (n_agents * n_assets)).where( logical_and(demand > 1e-12, total <= 1e-12), 0 ) totals = { key: (share / share.sum("asset")).fillna(0) for key, share in shares.items() } newshares = { key: (total * quantity[key][2] * demand).fillna(0) + unassigned * quantity[key][2] for key, total in totals.items() } return newshares
[docs] def unmet_demand( demand: xr.DataArray, capacity: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ): r"""Share of the demand that cannot be serviced by the existing assets. .. math:: U[\mathcal{M}, \mathcal{A}] = \max(\mathcal{C} - P[\mathcal{M}, \mathcal{A}], 0) :math:`\max` operates element-wise, and indices have been dropped for simplicity. The resulting expression has the same indices as the consumption :math:`\mathcal{C}_{c, s}^r`. :math:`P` is the maximum production, given by <muse.quantities.maximum_production>. """ from muse.quantities import maximum_production # Check inputs assert "year" not in technologies.dims assert "year" not in capacity.dims assert "year" not in demand.dims # Calculate maximum production by existing assets produced = maximum_production( capacity=capacity, technologies=technologies, timeslice_level=timeslice_level, ) # Total commodity production by summing over assets if "dst_region" in produced.dims: produced = produced.sum("asset").rename(dst_region="region") elif "region" in produced.coords and produced.region.dims: produced = produced.groupby("region").sum("asset") else: produced = produced.sum("asset") # Unmet demand is the difference between the consumption and the production _unmet_demand = (demand - produced).clip(min=0) assert "year" not in _unmet_demand.dims return _unmet_demand
def new_consumption( capacity: xr.DataArray, demand: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ) -> xr.DataArray: r"""Computes share of the demand attributed to new agents. The new agents service the demand that can be attributed specifically to growth and that cannot be serviced by existing assets. In other words: .. math:: N_{c, s}^r = \min\left( C_{c, s}^(y + \Delta y) - C_{c, s}^(y), C_{c, s}^(y + \Delta y) - P[\mathcal{M}(y + \Delta y), \mathcal{A}_{a, s}^r(y)] \right) Where :math:`P` the maximum production by existing assets, given by <muse.quantities.maximum_production>. """ from numpy import minimum # Check inputs assert len(demand.year) == 2 assert len(capacity.year) == 2 assert (demand.year.values == capacity.year.values).all() assert "year" not in technologies.dims current_year, investment_year = map(int, demand.year.values) # Select data for current/future years current_demand = demand.sel(year=current_year, drop=True) future_demand = demand.sel(year=investment_year, drop=True) future_capacity = capacity.sel(year=investment_year) # Calculate the increase in consumption over the investment period delta = (future_demand - current_demand).clip(min=0) missing = unmet_demand( demand=future_demand, capacity=future_capacity, technologies=technologies, timeslice_level=timeslice_level, ) consumption = minimum(delta, missing) assert "year" not in consumption.dims return consumption def new_and_retro_demands( capacity: xr.DataArray, demand: xr.DataArray, technologies: xr.Dataset, timeslice_level: str | None = None, ) -> xr.Dataset: """Splits demand into *new* and *retrofit* demand. The demand in the investment year is split three ways: #. the demand that can be serviced by the assets that will still be operational that year. #. the *new* demand is defined as the growth in consumption that cannot be serviced by existing assets in the current year, as computed in :py:func:`new_demand`. #. the retrofit demand is everything else. """ from numpy import minimum from muse.quantities import maximum_production # Check inputs assert len(demand.year) == 2 assert len(capacity.year) == 2 assert (demand.year.values == capacity.year.values).all() assert "year" not in technologies.dims investment_year = int(demand.year[1]) if hasattr(capacity, "region") and capacity.region.dims == (): capacity["region"] = ( "asset", [str(capacity.region.values)] * len(capacity.asset), ) # Calculate demand to allocate to "new" agents new_demand = new_consumption( capacity=capacity, demand=demand, technologies=technologies, timeslice_level=timeslice_level, ) # Maximum production in the investment year by existing assets service = ( maximum_production( technologies=technologies, capacity=capacity.sel(year=investment_year), timeslice_level=timeslice_level, ) .groupby("region") .sum("asset") ) # Existing asset should not execute beyond demand service = minimum(service, demand.sel(year=investment_year, drop=True)) # Leftover demand that cannot be serviced by existing assets or "new" agents retro_demand = ( demand.sel(year=investment_year, drop=True) - new_demand - service ).clip(min=0) assert "year" not in retro_demand.dims return xr.Dataset({"new": new_demand, "retrofit": retro_demand}) def decommissioning_demand( technologies: xr.Dataset, capacity: xr.DataArray, timeslice_level: str | None = None, ) -> xr.DataArray: r"""Computes demand from process decommissioning. Let :math:`M_t^r(y)` be the retrofit demand, :math:`^{(s)}\mathcal{D}_t^r(y)` be the decommissioning demand at the level of the sector, and :math:`A^r_{t, \iota}(y)` be the assets owned by the agent. Then, the decommissioning demand for agent :math:`i` is : .. math:: \mathcal{D}^{r, i}_{t, c}(y) = \sum_\iota \alpha_{t, \iota}^r \beta_{t, \iota, c}^r \left(A^{i, r}_{t, \iota}(y) - A^{i, r}_{t, \iota, c}(y + 1) \right) given the utilization factor :math:`\alpha_{t, \iota}` and the fixed output factor :math:`\beta_{t, \iota, c}`. Furthermore, decommissioning demand is non-zero only for end-use commodities. ncsearch-nohlsearch).. SeeAlso: :ref:`indices`, :ref:`quantities`, :py:func:`~muse.quantities.maximum_production` :py:func:`~muse.commodities.is_enduse` """ from muse.quantities import maximum_production assert len(capacity.year) == 2 assert "year" not in technologies.dims current_year, investment_year = map(int, capacity.year.values) # Calculate the decrease in capacity from the current year to future years capacity_decrease = capacity.sel(year=current_year) - capacity.sel( year=investment_year ) # Calculate production associated with this capacity result = maximum_production( technologies, capacity_decrease, timeslice_level=timeslice_level, ).clip(min=0) assert "year" not in result.dims return result