from __future__ import annotations
from typing import (
Any,
Callable,
Iterator,
Mapping,
Optional,
Sequence,
Text,
Tuple,
Union,
cast,
)
import pandas as pd
import xarray as xr
from muse.agents import AbstractAgent
from muse.production import PRODUCTION_SIGNATURE
from muse.sectors.abstract import AbstractSector
from muse.sectors.register import register_sector
from muse.sectors.subsector import Subsector
[docs]
@register_sector(name="default")
class Sector(AbstractSector): # type: ignore
"""Base class for all sectors."""
[docs]
@classmethod
def factory(cls, name: Text, settings: Any) -> Sector:
from muse.interactions import factory as interaction_factory
from muse.outputs.sector import factory as ofactory
from muse.production import factory as pfactory
from muse.readers import read_timeslices
from muse.readers.toml import read_technodata
from muse.utilities import nametuple_to_dict
sector_settings = getattr(settings.sectors, name)._asdict()
for attribute in ("name", "type", "priority", "path"):
sector_settings.pop(attribute, None)
timeslices = read_timeslices(
sector_settings.pop("timeslice_levels", None)
).get_index("timeslice")
technologies = read_technodata(settings, name, settings.time_framework)
if "subsectors" not in sector_settings:
raise RuntimeError(f"Missing 'subsectors' section in sector {name}")
if len(sector_settings["subsectors"]._asdict()) == 0:
raise RuntimeError(f"Empty 'subsectors' section in sector {name}")
subsectors = [
Subsector.factory(
subsec_settings,
technologies,
regions=settings.regions,
current_year=int(min(settings.time_framework)),
name=subsec_name,
)
for subsec_name, subsec_settings in sector_settings.pop("subsectors")
._asdict()
.items()
]
are_disjoint_commodities = sum((len(s.commodities) for s in subsectors)) == len(
set().union(*(set(s.commodities) for s in subsectors)) # type: ignore
)
if not are_disjoint_commodities:
raise RuntimeError("Subsector commodities are not disjoint")
outputs = ofactory(*sector_settings.pop("outputs", []), sector_name=name)
supply_args = sector_settings.pop(
"supply", sector_settings.pop("dispatch_production", {})
)
if isinstance(supply_args, Text):
supply_args = {"name": supply_args}
else:
supply_args = nametuple_to_dict(supply_args)
supply = pfactory(**supply_args)
interactions = interaction_factory(sector_settings.pop("interactions", None))
for attr in ("technodata", "commodities_out", "commodities_in"):
sector_settings.pop(attr, None)
return cls(
name,
technologies,
subsectors=subsectors,
timeslices=timeslices,
supply_prod=supply,
outputs=outputs,
interactions=interactions,
**sector_settings,
)
def __init__(
self,
name: Text,
technologies: xr.Dataset,
subsectors: Sequence[Subsector] = [],
timeslices: Optional[pd.MultiIndex] = None,
technodata_timeslices: xr.Dataset = None,
interactions: Optional[Callable[[Sequence[AbstractAgent]], None]] = None,
interpolation: Text = "linear",
outputs: Optional[Callable] = None,
supply_prod: Optional[PRODUCTION_SIGNATURE] = None,
):
from muse.interactions import factory as interaction_factory
from muse.outputs.sector import factory as ofactory
from muse.production import maximum_production
self.name: Text = name
"""Name of the sector."""
self.subsectors: Sequence[Subsector] = list(subsectors)
"""Subsectors controlled by this object."""
self.technologies: xr.Dataset = technologies
"""Parameters describing the sector's technologies."""
self.timeslices: Optional[pd.MultiIndex] = timeslices
"""Timeslice at which this sector operates.
If None, it will operate using the timeslice of the input market.
"""
self.interpolation: Mapping[Text, Any] = {
"method": interpolation,
"kwargs": {"fill_value": "extrapolate"},
}
"""Interpolation method and arguments when computing years."""
if interactions is None:
interactions = interaction_factory()
self.interactions = interactions
"""Interactions between agents.
Called right before computing new investments, this function should manage any
interactions between agents, e.g. passing assets from *new* agents to *retro*
agents, and maket make-up from *retro* to *new*.
Defaults to doing nothing.
The function takes the sequence of agents as input, and returns nothing. It is
expected to modify the agents in-place.
See Also
--------
:py:mod:`muse.interactions` contains MUSE's base interactions
"""
self.outputs: Callable = (
cast(Callable, ofactory()) if outputs is None else outputs
)
"""A function for outputting data for post-mortem analysis."""
self.supply_prod = (
supply_prod if supply_prod is not None else maximum_production
)
""" Computes production as used to return the supply to the MCA.
It can be anything registered with
:py:func:`@register_production<muse.production.register_production>`.
"""
@property
def forecast(self):
"""Maximum forecast horizon across agents.
If no agents with a "forecast" attribute are found, defaults to 5. It cannot be
lower than 1 year.
"""
forecasts = [
getattr(agent, "forecast")
for agent in self.agents
if hasattr(agent, "forecast")
]
if len(forecasts) == 0:
return 5
return max(1, max(forecasts))
[docs]
def next(
self,
mca_market: xr.Dataset,
time_period: Optional[int] = None,
current_year: Optional[int] = None,
) -> xr.Dataset:
"""Advance sector by one time period.
Args:
mca_market:
Market with ``demand``, ``supply``, and ``prices``.
time_period:
Length of the time period in the framework. Defaults to the range of
``mca_market.year``.
current_year: Current year of the simulation
Returns:
A market containing the ``supply`` offered by the sector, it's attendant
``consumption`` of fuels and materials and the associated ``costs``.
"""
from logging import getLogger
def group_assets(x: xr.DataArray) -> xr.DataArray:
return xr.Dataset(dict(x=x)).groupby("region").sum("asset").x
if time_period is None:
time_period = int(mca_market.year.max() - mca_market.year.min())
if current_year is None:
current_year = int(mca_market.year.min())
getLogger(__name__).info(f"Running {self.name} for year {current_year}")
# > to sector timeslice
market = self.convert_market_timeslice(
mca_market.sel(
commodity=self.technologies.commodity, region=self.technologies.region
).interp(
year=sorted(
{
current_year,
current_year + time_period,
current_year + self.forecast,
}
),
**self.interpolation,
),
self.timeslices,
)
# > agent interactions
self.interactions(list(self.agents))
# > investment
years = sorted(
set(
market.year.data.tolist()
+ self.capacity.installed.data.tolist()
+ self.technologies.year.data.tolist()
)
)
technologies = self.technologies.interp(year=years, **self.interpolation)
for subsector in self.subsectors:
subsector.invest(
technologies, market, time_period=time_period, current_year=current_year
)
# > output to mca
supply, consume, costs = self.market_variables(market, technologies)
output_data = xr.Dataset(
dict(
supply=supply,
consumption=consume,
costs=costs,
)
)
# < output to mca
self.outputs(output_data, self.capacity, technologies)
if len(supply.region.dims) == 0:
output_data = xr.Dataset(
dict(
supply=supply,
consumption=consume,
costs=costs,
)
)
output_data = output_data.sum("asset")
output_data = output_data.expand_dims(region=[output_data.region.values])
else:
output_data = xr.Dataset(
dict(
supply=group_assets(supply),
consumption=group_assets(consume),
costs=costs,
)
)
# > to mca timeslices
result = output_data.copy(deep=True)
if "dst_region" in result:
exclude = ["dst_region", "commodity", "year", "timeslice"]
prices = market.prices.expand_dims(dst_region=market.prices.region.values)
sup, prices = xr.broadcast(result.supply, prices)
sup = sup.fillna(0.0)
con, prices = xr.broadcast(result.consumption, prices)
con = con.fillna(0.0)
supply = result.supply.sum("region").rename(dst_region="region")
consumption = con.sum("dst_region")
assert len(supply.region) == len(consumption.region)
# Need to reindex costs to avoid nans for non-producing regions
costs0, prices = xr.broadcast(result.costs, prices, exclude=exclude)
# Fulfil nans with price values
costs0 = costs0.reindex_like(prices).fillna(prices)
costs0 = costs0.where(costs0 > 0, prices)
# Find where sup >0 (exporter)
# Importers have nans and average over exporting price
costs = ((costs0 * sup) / sup.sum("dst_region")).fillna(
costs0.mean("region")
)
# Take average over dst regions
costs = costs.where(costs > 0, prices).mean("dst_region")
result = xr.Dataset(
dict(supply=supply, consumption=consumption, costs=costs)
)
result = self.convert_market_timeslice(result, mca_market.timeslice)
result["comm_usage"] = technologies.comm_usage.sel(commodity=result.commodity)
result.set_coords("comm_usage")
# < to mca timeslices
return result
[docs]
def market_variables(self, market: xr.Dataset, technologies: xr.Dataset) -> Any:
"""Computes resulting market: production, consumption, and costs."""
from muse.commodities import is_pollutant
from muse.quantities import (
annual_levelized_cost_of_energy,
consumption,
supply_cost,
)
from muse.timeslices import QuantityType, convert_timeslice
from muse.utilities import broadcast_techs
years = market.year.values
capacity = self.capacity.interp(year=years, **self.interpolation)
supply = self.supply_prod(
market=market, capacity=capacity, technologies=technologies
)
if "timeslice" in market.prices.dims and "timeslice" not in supply.dims:
supply = convert_timeslice(supply, market.timeslice, QuantityType.EXTENSIVE)
consume = consumption(technologies, supply, market.prices)
technodata = cast(xr.Dataset, broadcast_techs(technologies, supply))
costs = supply_cost(
supply.where(~is_pollutant(supply.comm_usage), 0),
annual_levelized_cost_of_energy(
market.prices.sel(region=supply.region), technodata
),
asset_dim="asset",
)
return supply, consume, costs
@property
def capacity(self) -> xr.DataArray:
"""Aggregates capacity across agents.
The capacities are aggregated leaving only two
dimensions: asset (technology, installation date,
region), year.
"""
from muse.utilities import filter_input, reduce_assets
traded = [
u.assets.capacity
for u in self.agents
if "dst_region" in u.assets.capacity.dims
]
nontraded = [
u.assets.capacity
for u in self.agents
if "dst_region" not in u.assets.capacity.dims
]
if not traded:
full_list = [
list(nontraded[i].year.values)
for i in range(len(nontraded))
if "year" in nontraded[i].dims
]
flat_list = [item for sublist in full_list for item in sublist]
years = sorted(list(set(flat_list)))
nontraded = [
filter_input(u.assets.capacity, year=years)
for u in self.agents
if "dst_region" not in u.assets.capacity.dims
]
return reduce_assets(nontraded)
if not nontraded:
full_list = [
list(traded[i].year.values)
for i in range(len(traded))
if "year" in traded[i].dims
]
flat_list = [item for sublist in full_list for item in sublist]
years = sorted(list(set(flat_list)))
traded = [
filter_input(u.assets.capacity, year=years)
for u in self.agents
if "dst_region" in u.assets.capacity.dims
]
return reduce_assets(traded)
traded_results = reduce_assets(traded)
nontraded_results = reduce_assets(nontraded)
return reduce_assets(
[
traded_results,
nontraded_results
* (nontraded_results.region == traded_results.dst_region),
]
)
@property
def agents(self) -> Iterator[AbstractAgent]:
"""Iterator over all agents in the sector."""
for subsector in self.subsectors:
yield from subsector.agents
[docs]
@staticmethod
def convert_market_timeslice(
market: xr.Dataset,
timeslice: pd.MultiIndex,
intensive: Union[Text, Tuple[Text]] = "prices",
) -> xr.Dataset:
"""Converts market from one to another timeslice."""
from muse.timeslices import QuantityType, convert_timeslice
if isinstance(intensive, Text):
intensive = (intensive,)
timesliced = {d for d in market.data_vars if "timeslice" in market[d].dims}
intensives = convert_timeslice(
market[list(timesliced.intersection(intensive))],
timeslice,
QuantityType.INTENSIVE,
)
extensives = convert_timeslice(
market[list(timesliced.difference(intensives.data_vars))],
timeslice,
QuantityType.EXTENSIVE,
)
others = market[list(set(market.data_vars).difference(timesliced))]
return xr.merge([intensives, extensives, others])