from __future__ import annotations
from typing import (
Any,
Callable,
Hashable,
List,
MutableMapping,
Optional,
Sequence,
Text,
Tuple,
Union,
cast,
)
import numpy as np
import xarray as xr
from muse.agents import Agent
[docs]
class Subsector:
"""Agent group servicing a subset of the sectorial commodities."""
def __init__(
self,
agents: Sequence[Agent],
commodities: Sequence[Text],
demand_share: Optional[Callable] = None,
constraints: Optional[Callable] = None,
investment: Optional[Callable] = None,
name: Text = "subsector",
forecast: int = 5,
expand_market_prices: bool = False,
):
from muse import constraints as cs
from muse import demand_share as ds
from muse import investments as iv
self.agents: Sequence[Agent] = list(agents)
self.commodities: List[Text] = list(commodities)
self.demand_share = demand_share or ds.factory()
self.constraints = constraints or cs.factory()
self.investment = investment or iv.factory()
self.forecast = forecast
self.name = name
self.expand_market_prices = expand_market_prices
"""Whether to expand prices to include destination region.
If ``True``, the input market prices are expanded of the missing "dst_region"
dimension by setting them to the maximum between the source and destination
region.
"""
[docs]
def invest(
self,
technologies: xr.Dataset,
market: xr.Dataset,
time_period: int = 5,
current_year: Optional[int] = None,
) -> None:
if current_year is None:
current_year = market.year.min()
if self.expand_market_prices:
market = market.copy()
market["prices"] = np.maximum(
market.prices, market.prices.rename(region="dst_region")
)
for agent in self.agents:
agent.asset_housekeeping()
lp_problem = self.aggregate_lp(
technologies, market, time_period, current_year=current_year
)
if lp_problem is None:
return
years = technologies.year
techs = technologies.interp(year=years)
techs = techs.sel(year=current_year + time_period)
solution = self.investment(
search=lp_problem[0], technologies=techs, constraints=lp_problem[1]
)
self.assign_back_to_agents(technologies, solution, current_year, time_period)
[docs]
def assign_back_to_agents(
self,
technologies: xr.Dataset,
solution: xr.DataArray,
current_year: int,
time_period: int,
):
agents = {u.uuid: u for u in self.agents}
for uuid, assets in solution.groupby("agent"):
agents[uuid].add_investments(
technologies, assets, current_year, time_period
)
[docs]
def aggregate_lp(
self,
technologies: xr.Dataset,
market: xr.Dataset,
time_period: int = 5,
current_year: Optional[int] = None,
) -> Optional[Tuple[xr.Dataset, Sequence[xr.Dataset]]]:
from muse.utilities import agent_concatenation, reduce_assets
if current_year is None:
current_year = market.year.min()
demands = self.demand_share(
self.agents,
market,
technologies,
current_year=current_year,
forecast=self.forecast,
)
if "dst_region" in demands.dims:
msg = """
dst_region found in demand dimensions. This is unexpected. Demands
should only have a region dimension rather both a source and destination
dimension.
"""
raise ValueError(msg)
agent_market = market.copy()
assets = agent_concatenation(
{agent.uuid: agent.assets for agent in self.agents}
)
agent_market["capacity"] = (
reduce_assets(assets.capacity, coords=("region", "technology"))
.interp(year=market.year, method="linear", kwargs={"fill_value": 0.0})
.swap_dims(dict(asset="technology"))
)
agent_lps: MutableMapping[Hashable, xr.Dataset] = {}
for agent in self.agents:
if "agent" in demands.coords:
share = demands.sel(asset=demands.agent == agent.uuid)
else:
share = demands
result = agent.next(
technologies, agent_market, share, time_period=time_period
)
if result is not None:
agent_lps[agent.uuid] = result
if len(agent_lps) == 0:
return None
lps = cast(xr.Dataset, agent_concatenation(agent_lps, dim="agent"))
coords = {"agent", "technology", "region"}.intersection(assets.asset.coords)
constraints = self.constraints(
demand=demands,
assets=reduce_assets(assets, coords=coords).set_coords(coords),
search_space=lps.search_space,
market=market,
technologies=technologies,
year=current_year,
)
return lps, constraints
[docs]
@classmethod
def factory(
cls,
settings: Any,
technologies: xr.Dataset,
regions: Optional[Sequence[Text]] = None,
current_year: Optional[int] = None,
name: Text = "subsector",
) -> Subsector:
from muse import constraints as cs
from muse import demand_share as ds
from muse import investments as iv
from muse.agents import InvestingAgent, agents_factory
from muse.commodities import is_enduse
from muse.readers.toml import undo_damage
agents = agents_factory(
settings.agents,
settings.existing_capacity,
technologies=technologies,
regions=regions,
year=current_year or int(technologies.year.min()),
asset_threshhold=getattr(settings, "asset_threshhold", 1e-12),
# only used by self-investing agents
investment=getattr(settings, "lpsolver", "adhoc"),
forecast=getattr(settings, "forecast", 5),
)
# technologies can have nans where a commodity
# does not apply to a technology at all
# (i.e. hardcoal for a technology using hydrogen)
# check that all regions have technologies with at least one end-use output
for a in agents:
techs = a.filter_input(technologies, region=a.region)
outputs = techs.fixed_outputs.sel(
commodity=is_enduse(technologies.comm_usage)
)
msg = f"Subsector with {techs.technology.values[0]} for region {a.region} has no output commodities" # noqa: E501
if len(outputs) == 0:
raise RuntimeError(msg)
if np.sum(outputs) == 0.0:
raise RuntimeError(msg)
if hasattr(settings, "commodities"):
commodities = settings.commodities
else:
commodities = aggregate_enduses(
[agent.assets for agent in agents], technologies
)
# len(commodities) == 0 may happen only if
# we run only one region or all regions have no outputs
msg = f"Subsector with {techs.technology.values[0]} has no output commodities"
if len(commodities) == 0:
raise RuntimeError(msg)
demand_share = ds.factory(undo_damage(getattr(settings, "demand_share", None)))
constraints = cs.factory(getattr(settings, "constraints", None))
# only used by non-self-investing agents
investment = iv.factory(getattr(settings, "lpsolver", "scipy"))
forecast = getattr(settings, "forecast", 5)
expand_market_prices = getattr(settings, "expand_market_prices", None)
if expand_market_prices is None:
expand_market_prices = "dst_region" in technologies.dims and not any(
(isinstance(u, InvestingAgent) for u in agents)
)
return cls(
agents=agents,
commodities=commodities,
demand_share=demand_share,
constraints=constraints,
investment=investment,
forecast=forecast,
name=name,
expand_market_prices=expand_market_prices,
)
[docs]
def aggregate_enduses(
assets: Sequence[Union[xr.Dataset, xr.DataArray]], technologies: xr.Dataset
) -> Sequence[Text]:
"""Aggregate enduse commodities for input assets.
This function is meant as a helper to figure out the commodities attached to a group
of agents.
"""
from muse.commodities import is_enduse
techs = set.union(*(set(data.technology.values) for data in assets))
outputs = technologies.fixed_outputs.sel(
commodity=is_enduse(technologies.comm_usage), technology=list(techs)
)
return outputs.commodity.sel(
commodity=outputs.any([u for u in outputs.dims if u != "commodity"])
).values.tolist()