Source code for muse.sectors.sector

from collections.abc import Iterator, Sequence
from typing import (
    Any,
    Callable,
    Self,
    cast,
)

import xarray as xr

from muse.agents import AbstractAgent
from muse.dispatch import PRODUCTION_SIGNATURE
from muse.readers.toml import read_technodata
from muse.sectors.abstract import AbstractSector
from muse.sectors.register import register_sector
from muse.sectors.subsector import Subsector
from muse.timeslices import compress_timeslice, expand_timeslice, get_level


[docs] @register_sector(name="default") class Sector(AbstractSector): # type: ignore """Base class for all sectors."""
[docs] @classmethod def factory(cls, name: str, settings: Any) -> Self: from muse.dispatch import factory as pfactory from muse.interactions import factory as interaction_factory from muse.outputs.sector import factory as ofactory # Read sector settings sector_settings = getattr(settings.sectors, name)._asdict() # Extract required settings subsectors = sector_settings.get("subsectors") if not subsectors: raise RuntimeError(f"Missing 'subsectors' section in sector {name}") if len(subsectors._asdict()) == 0: raise RuntimeError(f"Empty 'subsectors' section in sector {name}") interpolation_mode = sector_settings.get("interpolation", "linear") timeslice_level = sector_settings.get("timeslice_level", None) dispatch_production = sector_settings.get("dispatch_production", "share") outputs_config = sector_settings.get("outputs", []) interactions_config = sector_settings.get("interactions", None) # Read technologies technologies = read_technodata( settings, name, interpolation_mode=interpolation_mode, ) # Create subsectors subsectors = [ Subsector.factory( subsec_settings, technologies, regions=settings.regions, current_year=int(min(settings.time_framework)), name=subsec_name, timeslice_level=timeslice_level, ) for subsec_name, subsec_settings in subsectors._asdict().items() ] # Check that subsector commodities are disjoint sector_commodities = [c for s in subsectors for c in s.commodities] duplicates = [ c for c in set(sector_commodities) if sector_commodities.count(c) > 1 ] if duplicates: raise RuntimeError( f"Commodities {duplicates} are outputted by multiple subsectors." ) # Create outputs outputs = ofactory(*outputs_config, sector_name=name) # Create production method production = pfactory(dispatch_production) # Create interactions interactions = interaction_factory(interactions_config) # Create sector return cls( name, technologies, supply_prod=production, subsectors=subsectors, commodities=sector_commodities, outputs=outputs, interactions=interactions, timeslice_level=timeslice_level, )
def __init__( self, name: str, technologies: xr.Dataset, supply_prod: PRODUCTION_SIGNATURE, subsectors: Sequence[Subsector] = [], commodities: list[str] = [], interactions: Callable[[Sequence[AbstractAgent]], None] | None = None, outputs: Callable | None = None, timeslice_level: str | None = None, ): from muse.interactions import factory as interaction_factory from muse.outputs.sector import factory as ofactory from muse.timeslices import TIMESLICE """Name of the sector.""" self.name: str = name """Timeslice level for the sector (e.g. "month").""" self.timeslice_level = timeslice_level or get_level(TIMESLICE) """Subsectors controlled by this object.""" self.subsectors: Sequence[Subsector] = list(subsectors) """Parameters describing the sector's technologies.""" self.technologies: xr.Dataset = technologies if "timeslice" in self.technologies.dims: if not get_level(self.technologies) == self.timeslice_level: raise ValueError( f"Technodata for {self.name} sector does not match " "the specified timeslice level for that sector " f"({self.timeslice_level})" ) """Interactions between agents. Called right before computing new investments, this function should manage any interactions between agents, e.g. passing assets from *new* agents to *retro* agents, and maket make-up from *retro* to *new*. Defaults to doing nothing. The function takes the sequence of agents as input, and returns nothing. It is expected to modify the agents in-place. See Also -------- :py:mod:`muse.interactions` contains MUSE's base interactions """ self.interactions = interactions or interaction_factory() """A function for outputting data for post-mortem analysis.""" self.outputs: Callable = ( cast(Callable, ofactory()) if outputs is None else outputs ) """Computes production as used to return the supply to the MCA. It can be anything registered with :py:func:`@register_production<muse.dispatch.register_production>`. """ self.supply_prod = supply_prod """Full supply, consumption and costs data for the most recent year.""" self.output_data: xr.Dataset """Commodities that the sector is in charge of producing.""" self.commodities: list[str] = commodities
[docs] def next( self, mca_market: xr.Dataset, ) -> xr.Dataset: """Advance sector by one time period. Args: mca_market: Market with ``demand``, ``supply``, and ``prices``. Returns: A market containing the ``supply`` offered by the sector, it's attendant ``consumption`` of fuels and materials and the associated ``costs``. """ from logging import getLogger def group_assets(x: xr.DataArray) -> xr.DataArray: return xr.Dataset(dict(x=x)).groupby("region").sum("asset").x # Time period from the market object assert len(mca_market.year) == 2 current_year, investment_year = map(int, mca_market.year.values) getLogger(__name__).info( f"Running {self.name} for years {current_year} to {investment_year}" ) # Agent interactions self.interactions(list(self.agents)) # Convert market to sector timeslicing mca_market = self.convert_to_sector_timeslicing(mca_market) # Select appropriate data from the market market = mca_market.sel( commodity=self.technologies.commodity.values, region=self.technologies.region, ) # Select technology data from the investment year techs = self.technologies.sel(year=investment_year, drop=True) # Perform investments for subsector in self.subsectors: subsector.invest(technologies=techs, market=market) # Full output data supply, consume, costs = self.market_variables(market, self.technologies) self.output_data = xr.Dataset( dict( supply=supply, consumption=consume, costs=costs, ) ) # Output data for MCA (aggregated over assets) if len(supply.region.dims) == 0: output_data = self.output_data.sum("asset") output_data = output_data.expand_dims(region=[output_data.region.values]) else: output_data = xr.Dataset( dict( supply=group_assets(supply), consumption=group_assets(consume), costs=costs, ) ) result = output_data.copy(deep=True) if "dst_region" in result: exclude = ["dst_region", "commodity", "year", "timeslice"] prices = market.prices.expand_dims(dst_region=market.prices.region.values) sup, prices = xr.broadcast(result.supply, prices) sup = sup.fillna(0.0) con, prices = xr.broadcast(result.consumption, prices) con = con.fillna(0.0) supply = result.supply.sum("region").rename(dst_region="region") consumption = con.sum("dst_region") assert len(supply.region) == len(consumption.region) # Need to reindex costs to avoid nans for non-producing regions costs0, prices = xr.broadcast(result.costs, prices, exclude=exclude) # Fulfil nans with price values costs0 = costs0.reindex_like(prices).fillna(prices) costs0 = costs0.where(costs0 > 0, prices) # Find where sup >0 (exporter) # Importers have nans and average over exporting price costs = ((costs0 * sup) / sup.sum("dst_region")).fillna( costs0.mean("region") ) # Take average over dst regions costs = costs.where(costs > 0, prices).mean("dst_region") result = xr.Dataset( dict(supply=supply, consumption=consumption, costs=costs) ) # Convert result to global timeslicing scheme return self.convert_to_global_timeslicing(result)
[docs] def save_outputs(self, year: int) -> None: """Calls the outputs function with the current output data.""" self.outputs( self.output_data, self.capacity, year=year, commodities=self.commodities )
[docs] def market_variables(self, market: xr.Dataset, technologies: xr.Dataset) -> Any: """Computes resulting market: production, consumption, and costs.""" from muse.costs import levelized_cost_of_energy, supply_cost from muse.quantities import capacity_to_service_demand, consumption from muse.utilities import broadcast_over_assets, interpolate_capacity years = market.year.values capacity = interpolate_capacity(self.capacity, year=years) # Select technology data for each asset # Each asset uses the technology data from the year it was installed technodata = broadcast_over_assets( technologies, capacity, installed_as_year=True ) # Calculate supply/consumption/costs (one year at a time) supply_list = [] consume_list = [] costs_list = [] for year in capacity.year.values: supply_year = self.supply_prod( demand=market.consumption.sel(year=year), capacity=capacity.sel(year=year), technologies=technodata, timeslice_level=self.timeslice_level, prices=market.prices.sel(year=year), ) # Select relevant prices for each asset prices_for_assets = broadcast_over_assets( market.prices.sel(year=year), capacity, installed_as_year=False ) # Calculate consumption consume_year = consumption( technologies=technodata, production=supply_year, prices=prices_for_assets, timeslice_level=self.timeslice_level, ) # Calculate LCOE # We base LCOE only on the portion of capacity that is actually used (#728) utilized_capacity = capacity_to_service_demand( demand=supply_year, technologies=technodata, timeslice_level=self.timeslice_level, ) lcoe = levelized_cost_of_energy( prices=prices_for_assets, technologies=technodata, capacity=utilized_capacity, production=supply_year, consumption=consume_year, method="annual", ) # Calculate new commodity prices costs_year = supply_cost(supply_year, lcoe, asset_dim="asset") supply_list.append(supply_year.expand_dims(year=[year])) consume_list.append(consume_year.expand_dims(year=[year])) costs_list.append(costs_year.expand_dims(year=[year])) supply = xr.concat(supply_list, dim="year") consume = xr.concat(consume_list, dim="year") costs = xr.concat(costs_list, dim="year") return supply, consume, costs
[docs] def convert_to_sector_timeslicing(self, market: xr.Dataset) -> xr.Dataset: """Converts market data to sector timeslicing.""" supply = compress_timeslice( market["supply"], level=self.timeslice_level, operation="sum" ) consumption = compress_timeslice( market["consumption"], level=self.timeslice_level, operation="sum" ) prices = compress_timeslice( market["prices"], level=self.timeslice_level, operation="mean" ) return xr.Dataset(dict(supply=supply, consumption=consumption, prices=prices))
[docs] def convert_to_global_timeslicing(self, market: xr.Dataset) -> xr.Dataset: """Converts market data to global timeslicing.""" supply = expand_timeslice(market["supply"], operation="distribute") consumption = expand_timeslice(market["consumption"], operation="distribute") costs = expand_timeslice(market["costs"], operation="broadcast") return xr.Dataset(dict(supply=supply, consumption=consumption, costs=costs))
@property def capacity(self) -> xr.DataArray: """Aggregates capacity across agents. The capacities are aggregated leaving only two dimensions: asset (technology, installation date, region), year. """ from muse.utilities import interpolate_capacity, reduce_assets traded = [ u.assets.capacity for u in self.agents if "dst_region" in u.assets.capacity.dims ] nontraded = [ u.assets.capacity for u in self.agents if "dst_region" not in u.assets.capacity.dims ] # Only nontraded assets if not traded: full_list = [ list(nontraded[i].year.values) for i in range(len(nontraded)) if "year" in nontraded[i].dims ] flat_list = [item for sublist in full_list for item in sublist] years = sorted(list(set(flat_list))) capacity = reduce_assets( [ u.assets.capacity for u in self.agents if "dst_region" not in u.assets.capacity.dims ] ) return interpolate_capacity(capacity, year=years) # Only traded assets elif not nontraded: full_list = [ list(traded[i].year.values) for i in range(len(traded)) if "year" in traded[i].dims ] flat_list = [item for sublist in full_list for item in sublist] years = sorted(list(set(flat_list))) capacity = reduce_assets( [ u.assets.capacity for u in self.agents if "dst_region" in u.assets.capacity.dims ] ) return interpolate_capacity(capacity, year=years) # Both traded and nontraded assets else: traded_results = reduce_assets(traded) nontraded_results = reduce_assets(nontraded) capacity = reduce_assets( [ traded_results, nontraded_results * (nontraded_results.region == traded_results.dst_region), ] ) return interpolate_capacity(capacity, year=years) @property def agents(self) -> Iterator[AbstractAgent]: """Iterator over all agents in the sector.""" for subsector in self.subsectors: yield from subsector.agents