Source code for muse.sectors.preset_sector

"""Sector with preset behaviours."""

from __future__ import annotations

from typing import Any, Text

from xarray import DataArray, Dataset

from muse.sectors.register import AbstractSector, register_sector


[docs] @register_sector(name=("preset", "presets")) class PresetSector(AbstractSector): # type: ignore """Sector with outcomes fixed from the start."""
[docs] @classmethod def factory(cls, name: Text, settings: Any) -> PresetSector: """Constructs a PresetSectors from input data.""" from typing import Sequence from xarray import DataArray, zeros_like from muse.commodities import CommodityUsage from muse.readers import ( read_attribute_table, read_csv_outputs, read_macro_drivers, read_regression_parameters, read_timeslice_shares, read_timeslices, ) from muse.regressions import endogenous_demand from muse.timeslices import QuantityType, convert_timeslice sector_conf = getattr(settings.sectors, name) presets = Dataset() presets["timeslice"] = read_timeslices( getattr(sector_conf, "timeslice_levels", None) ).timeslice if getattr(sector_conf, "consumption_path", None) is not None: consumption = read_csv_outputs(sector_conf.consumption_path) consumption.coords["timeslice"] = presets.timeslice presets["consumption"] = consumption elif getattr(sector_conf, "demand_path", None) is not None: presets["consumption"] = read_attribute_table(sector_conf.demand_path) elif ( getattr(sector_conf, "macrodrivers_path", None) is not None and getattr(sector_conf, "regression_path", None) is not None ): macro_drivers = read_macro_drivers( getattr(sector_conf, "macrodrivers_path", None) ) regression_parameters = read_regression_parameters( getattr(sector_conf, "regression_path", None) ) forecast = getattr(sector_conf, "forecast", 0) if isinstance(forecast, Sequence): forecast = DataArray( forecast, coords={"forecast": forecast}, dims="forecast" ) consumption = endogenous_demand( drivers=macro_drivers, regression_parameters=regression_parameters, forecast=forecast, ) if hasattr(sector_conf, "filters"): consumption = consumption.sel(sector_conf.filters._asdict()) if "sector" in consumption.dims: consumption = consumption.sum("sector") if getattr(sector_conf, "timeslice_shares_path", None) is not None: timeslice = presets["timeslice"] assert isinstance(timeslice, DataArray) shares = read_timeslice_shares( sector_conf.timeslice_shares_path, timeslice=timeslice ) assert consumption.commodity.isin(shares.commodity).all() assert consumption.region.isin(shares.region).all() consumption = consumption * shares.sel( region=consumption.region, commodity=consumption.commodity ) presets["consumption"] = consumption if getattr(sector_conf, "supply_path", None) is not None: supply = read_csv_outputs(sector_conf.supply_path) supply.coords["timeslice"] = presets.timeslice presets["supply"] = supply if getattr(sector_conf, "costs_path", None) is not None: presets["costs"] = read_attribute_table(sector_conf.costs_path) elif ( getattr(sector_conf, "lcoe_path", None) is not None and "supply" in presets ): costs = ( read_csv_outputs( sector_conf.lcoe_path, indices=("RegionName", "ProcessName"), columns="timeslices", ) * presets["supply"] ) presets["costs"] = costs if len(presets.data_vars) == 0: raise IOError("None of supply, consumption, costs given") # add missing data as zeros: we only need one of conumption, costs, supply components = {"supply", "consumption", "costs"} for component in components: others = components.intersection(presets.data_vars).difference({component}) if component not in presets and len(others) > 0: presets[component] = zeros_like(presets[others.pop()]) # add timeslice, if missing for component in {"supply", "consumption"}: if "timeslice" not in presets[component].dims: presets[component] = convert_timeslice( presets[component], presets.timeslice, QuantityType.EXTENSIVE ) comm_usage = (presets.costs > 0).any(set(presets.costs.dims) - {"commodity"}) presets["comm_usage"] = ( "commodity", [CommodityUsage.PRODUCT if u else CommodityUsage.OTHER for u in comm_usage], ) presets = presets.set_coords("comm_usage") if "process" in presets.dims: presets = presets.sum("process") interpolation_mode = getattr(sector_conf, "interpolation_mode", "linear") return cls(presets, interpolation_mode=interpolation_mode, name=name)
def __init__( self, presets: Dataset, interpolation_mode: Text = "linear", name: Text = "preset", ): super().__init__() self.presets: Dataset = presets """Market across time and space.""" self.interpolation_mode: Text = interpolation_mode """Interpolation method""" self.name = name """Name by which to identify a sector"""
[docs] def next(self, mca_market: Dataset) -> Dataset: """Advance sector by one time period.""" from muse.timeslices import QuantityType, convert_timeslice presets = self.presets.sel(region=mca_market.region) supply = self._interpolate(presets.supply, mca_market.year) consumption = self._interpolate(presets.consumption, mca_market.year) costs = self._interpolate(presets.costs, mca_market.year) result = convert_timeslice( Dataset({"supply": supply, "consumption": consumption}), mca_market.timeslice, QuantityType.EXTENSIVE, ) result["costs"] = convert_timeslice( costs, mca_market.timeslice, QuantityType.INTENSIVE ) assert isinstance(result, Dataset) return result
def _interpolate(self, data: DataArray, years: DataArray) -> DataArray: """Chooses interpolation depending on whether forecast is available.""" if "forecast" in data.dims: baseyear = int(years.min()) forecasted = (years - baseyear).values result = ( data.interp( year=baseyear, method=self.interpolation_mode, kwargs={"fill_value": "extrapolate"}, ) .interp( forecast=forecasted, method=self.interpolation_mode, kwargs={"fill_value": "extrapolate"}, ) .drop_vars(("year", "forecast")) ) result["year"] = "forecast", years.values return result.set_index(forecast="year").rename(forecast="year") return data.interp(year=years, method=self.interpolation_mode).ffill("year")