Source code for muse.mca

from __future__ import annotations

from pathlib import Path
from typing import (
    Any,
    Callable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Text,
    Union,
    cast,
)

from xarray import Dataset, zeros_like

from muse.outputs.cache import OutputCache
from muse.readers import read_initial_market
from muse.sectors import SECTORS_REGISTERED, AbstractSector


[docs] class MCA(object): """Market Clearing Algorithm. The market clearing algorithm is the main object implementing the MUSE model. It is responsible for orchestrating how the sectors are run, how they interface with one another, with the general market and the carbon market. """
[docs] @classmethod def factory(cls, settings: Union[Text, Path, Mapping, Any]) -> MCA: """Loads MCA from input settings and input files. Arguments: settings: namedtuple with the global MUSE input settings. Returns: The loaded MCA """ from logging import getLogger from muse.outputs.mca import factory as ofactory from muse.readers import read_settings from muse.readers.toml import convert if isinstance(settings, (Text, Path)): settings = read_settings(settings) # type: ignore elif isinstance(settings, Mapping): settings = convert(settings) settings = cast(Any, settings) # We create the initial market market = ( read_initial_market( settings.global_input_files.projections, base_year_export=getattr( settings.global_input_files, "base_year_export", None ), base_year_import=getattr( settings.global_input_files, "base_year_import", None ), timeslices=settings.timeslices, ).sel(region=settings.regions) ).interp(year=settings.time_framework, method=settings.interpolation_mode) market["supply"] = zeros_like(market.exports) market["consumption"] = zeros_like(market.exports) # We create the sectors sectors = [] for sector in settings.sectors.list: kind = getattr(settings.sectors, sector).type sectors.append(SECTORS_REGISTERED[kind](sector, settings)) getLogger(__name__).info(f"Created sector {sector}") outputs = ofactory(*getattr(settings, "outputs", [])) outputs_cache = OutputCache( *getattr(settings, "outputs_cache", []), sectors=sectors ) extras = { "foresight", "regions", "interest_rate", "log_level", "interpolation_mode", "timeslices", "root", "plugins", "outputs", "outputs_cache", } global_kw = { k: v for k, v in settings._asdict().items() if not hasattr(v, "_asdict") and k not in extras } if "equilibrium" in global_kw: global_kw["equilibrium"] = global_kw.pop("equilibrium") carbon_kw = { k: v._asdict() if hasattr(v, "_asdict") else v for k, v in settings.carbon_budget_control._asdict().items() } for key in {"budget", "commodities", "method"}: carbon_kw[f"carbon_{key}"] = carbon_kw[key] carbon_kw.pop(key) return cls( sectors=sectors, market=market, outputs=outputs, # type: ignore outputs_cache=outputs_cache, # type: ignore **global_kw, **carbon_kw, )
def __init__( self, sectors: List[AbstractSector], market: Dataset, outputs: Optional[Callable[[List[AbstractSector], Dataset], Any]] = None, outputs_cache: Optional[OutputCache] = None, time_framework: Sequence[int] = list(range(2010, 2100, 10)), equilibrium: bool = True, equilibrium_variable: Text = "demand", maximum_iterations: int = 3, tolerance: float = 0.1, tolerance_unmet_demand: float = -0.1, excluded_commodities: Optional[Sequence[Text]] = None, carbon_budget: Optional[Sequence] = None, carbon_price: Optional[Sequence] = None, carbon_commodities: Optional[Sequence[Text]] = None, debug: bool = False, control_undershoot: bool = True, control_overshoot: bool = True, carbon_method: Text = "fitting", method_options: Optional[Mapping] = None, ): """Market clearing algorithm class which rules the whole MUSE.""" from logging import getLogger from numpy import array from xarray import DataArray from muse.carbon_budget import CARBON_BUDGET_METHODS from muse.outputs.mca import factory as ofactory getLogger(__name__).info("MCA Initialisation") self.sectors: List[AbstractSector] = list(sectors) self.market = market # Simulation flow parameters self.time_framework = array(time_framework) self.equilibrium = equilibrium self.equilibrium_variable = equilibrium_variable self.maximum_iterations = maximum_iterations self.tolerance = tolerance self.tolerance_unmet_demand = tolerance_unmet_demand if excluded_commodities: self.excluded_commodities = excluded_commodities else: self.excluded_commodities = [] # Carbon budget parameters if isinstance(carbon_budget, DataArray) and "year" in carbon_budget.dims: self.carbon_budget: DataArray = ( carbon_budget.interp(year=time_framework).ffill("year").bfill("year") ) elif isinstance(carbon_budget, DataArray): self.carbon_budget = carbon_budget elif carbon_budget is not None and len(carbon_budget) > 0: assert len(carbon_budget) == len(time_framework) self.carbon_budget = DataArray( carbon_budget, dims="year", coords={"year": time_framework} ) else: self.carbon_budget = DataArray([], dims="year") self.carbon_price = ( carbon_price if carbon_price is not None else zeros_like(self.carbon_budget) ) self.carbon_commodities = ( carbon_commodities if carbon_commodities is not None else [] ) self.debug = debug self.control_undershoot = control_undershoot self.control_overshoot = control_overshoot self.carbon_method = CARBON_BUDGET_METHODS[carbon_method] self.method_options = method_options self.outputs = ofactory() if outputs is None else outputs self.outputs_cache = OutputCache() if outputs_cache is None else outputs_cache
[docs] def find_equilibrium( self, market: Dataset, sectors: Optional[List[AbstractSector]] = None, maxiter: Optional[int] = None, ) -> FindEquilibriumResults: """Specialised version of the find_equilibrium function. Arguments: market: Commodities market, with the prices, supply, consumption and demand. sectors: A list of the sectors participating in the simulation. maxiter: Maximum number of iterations. Returns: A tuple with the updated market (prices, supply, consumption and demand) and sector. """ maxiter = self.maximum_iterations if not maxiter else maxiter return find_equilibrium( market=market, sectors=self.sectors if sectors is None else sectors, maxiter=maxiter, tol=self.tolerance, equilibrium_variable=self.equilibrium_variable, tol_unmet_demand=self.tolerance_unmet_demand, excluded_commodities=self.excluded_commodities, equilibrium=self.equilibrium, )
[docs] def update_carbon_budget(self, market: Dataset, year_idx: int) -> float: """Specialised version of the update_carbon_budget function. Arguments: market: Commodities market, with the prices, supply, consumption and demand. year_idx: Index of the year of interest. Returns: An updated market with prices, supply, consumption and demand. """ from muse.carbon_budget import update_carbon_budget emission = ( market.supply.sel(year=market.year[-1], commodity=self.carbon_commodities) .sum() .values ) return update_carbon_budget( cast(Sequence[float], self.carbon_budget), emission, year_idx, self.control_overshoot, self.control_undershoot, )
[docs] def update_carbon_price(self, market) -> Optional[float]: """Calculates the updated carbon price, if required. If the emission calculated for the next time period is larger than the limit, then the carbon price needs to be updated to ensure that whatever the sectors do, the carbon budget limit is not exceeded. Arguments: market: Market, with the prices, supply, consumption and demand. Returns: The new carbon price or None """ from numpy import median future = market.year[-1] market, _ = single_year_iteration(market, self.sectors) threshold = self.carbon_budget.interp( year=future, kwargs=dict(fill_value=self.carbon_budget.isel(year=-1).values) ).values emissions = ( market.supply.sel(year=future, commodity=self.carbon_commodities) .sum(["region", "timeslice", "commodity"]) .values ) # Future emissions are OK, so we move on cp = median(market.prices.sel(commodity=self.carbon_commodities, year=future)) if emissions < threshold and not self.debug and cp == 0.0: return None new_carbon_price = self.carbon_method( # type: ignore market, self.sectors, self.find_equilibrium, self.carbon_budget, self.carbon_price, self.carbon_commodities, **self.method_options, ) return new_carbon_price
[docs] def run(self) -> None: """Initiates the calculation, starting with the loop over years. This method starts the main MUSE loop, going over the years of the simulation. Internally, it runs the carbon budget loop, which updates the carbon prices, if needed, and the equilibrium loop, which tries to reach an equilibrium between prices, demand and supply. Returns: None """ from logging import getLogger from numpy import where from xarray import DataArray from muse.utilities import future_propagation _, self.sectors, hist_years = self.calibrate_legacy_sectors() if len(hist_years) > 0: hist = where(self.time_framework <= hist_years[-1])[0] start = hist[-1] else: start = -1 nyear = len(self.time_framework) - 1 check_carbon_budget = len(self.carbon_budget) and len(self.carbon_commodities) shoots = self.control_undershoot or self.control_overshoot variables = ["supply", "consumption", "prices"] for year_idx in range(start + 1, nyear): years = self.time_framework[year_idx : year_idx + 2] getLogger(__name__).info(f"Running simulation year {years[0]}...") new_market = self.market[variables].sel(year=years) assert isinstance(new_market, Dataset) new_market.supply[:] = 0 new_market.consumption[:] = 0 # If we need to account for the carbon budget, we do it now. if check_carbon_budget: new_price = self.update_carbon_price(new_market) if new_price is not None: future_price = DataArray(new_price, coords=dict(year=years[1])) new_market.prices.loc[dict(commodity=self.carbon_commodities)] = ( future_propagation( new_market.prices.sel(commodity=self.carbon_commodities), future_price, ) ) self.carbon_price = future_propagation( self.carbon_price, future_price ) _, new_market, self.sectors = self.find_equilibrium(new_market) # If we need to account for the carbon budget, we might need to change # the budget for the future, too. if check_carbon_budget and shoots and year_idx < nyear - 2: self.carbon_budget[year_idx + 2] = self.update_carbon_budget( new_market, year_idx ) dims = {i: new_market[i] for i in new_market.dims} self.market.supply.loc[dims] = new_market.supply self.market.consumption.loc[dims] = new_market.consumption dims = {i: new_market[i] for i in new_market.prices.dims if i != "year"} self.market.prices.loc[dims] = future_propagation( self.market.prices.sel(dims), new_market.prices.sel(year=years[1]) ) self.outputs(self.market, self.sectors, year=self.time_framework[year_idx]) # type: ignore self.outputs_cache.consolidate_cache(year=self.time_framework[year_idx]) getLogger(__name__).info( f"Finish simulation year {years[0]} ({year_idx+1}/{nyear})!" )
[docs] def calibrate_legacy_sectors(self): """Run a calibration step in the legacy sectors. Run historical years. """ from copy import deepcopy from logging import getLogger from numpy import clip, where hist_years = [] if len([s for s in self.sectors if "LegacySector" in str(type(s))]) == 0: return None, self.sectors, hist_years sectors = [] idx = [] for i, s in enumerate(self.sectors): if "LegacySector" in str(type(s)): s.mode = "Calibration" sectors.append(s) idx.append(i) getLogger(__name__).info("Calibrating LegacySectors...") if 2015 in self.time_framework: hist_years = self.time_framework[where(self.time_framework <= 2015)] hist = len(hist_years) for year_idx in range(hist): # range(nyear): years = self.time_framework[year_idx : year_idx + 1] sectors = deepcopy(sectors) variables = ["supply", "consumption", "prices"] new_market = self.market[variables].sel(year=years).copy(deep=True) for sector in sectors: sector_market = sector.next( new_market[["supply", "consumption", "prices"]] # type:ignore ) sector_market = sector_market.sel(year=new_market.year) dims = {i: sector_market[i] for i in sector_market.consumption.dims} sector_market.consumption.loc[dims] = clip( sector_market.consumption.loc[dims] - sector_market.supply.loc[dims], 0.0, None, ) new_market.consumption.loc[dims] += sector_market.consumption dims = {i: sector_market[i] for i in sector_market.supply.dims} new_market.supply.loc[dims] += sector_market.supply for i, s in enumerate(sectors): s.mode = "Iteration" self.sectors[idx[i]] = s getLogger(__name__).info("Finish calibration of LegacySectors!") return None, self.sectors, hist_years
[docs] class SingleYearIterationResult(NamedTuple): """Result of iterating over sectors for a year. Convenience tuple naming naming the return values from of :py:func:`single_year_iteration`. """ market: Dataset sectors: List[AbstractSector]
[docs] def single_year_iteration( market: Dataset, sectors: List[AbstractSector] ) -> SingleYearIterationResult: """Runs one iteration of the sectors (runs each sector once). Arguments: market: An initial market with prices, supply, consumption. sectors: A list of the sectors participating in the simulation. Returns: A tuple with the new market and sectors. """ from copy import deepcopy from numpy import clip from muse.commodities import is_enduse sectors = deepcopy(sectors) market = market.copy(deep=True) if "updated_prices" not in market.data_vars: market["updated_prices"] = market.prices.copy() # eventually, the first market should be one that creates the initial demand for sector in sectors: sector_market = sector.next( market[["supply", "consumption", "prices"]] # type:ignore ) sector_market = sector_market.sel(year=market.year) dims = {i: sector_market[i] for i in sector_market.consumption.dims} sector_market.consumption.loc[dims] = clip( sector_market.consumption.loc[dims] - sector_market.supply.loc[dims], 0.0, None, ) market.consumption.loc[dims] += sector_market.consumption dims = {i: sector_market[i] for i in sector_market.supply.dims} market.supply.loc[dims] += sector_market.supply costs = sector_market.costs.sel(commodity=is_enduse(sector_market.comm_usage)) # do not write costs lower than 1e-4 # should correspond to rounding value if len(costs.commodity) > 0: costs = costs.where(costs > 1e-4, 0) dims = {i: costs[i] for i in costs.dims} costs = costs.where(costs > 0, market.prices.loc[dims]) market.updated_prices.loc[dims] = costs.transpose( *market.updated_prices.dims ) return SingleYearIterationResult(market, sectors)
[docs] class FindEquilibriumResults(NamedTuple): """Result of find equilibrium.""" converged: bool market: Dataset sectors: List[AbstractSector]
[docs] def find_equilibrium( market: Dataset, sectors: List[AbstractSector], maxiter: int = 3, tol: float = 0.1, equilibrium_variable: Text = "demand", tol_unmet_demand: float = -0.1, excluded_commodities: Optional[Sequence] = None, equilibrium: bool = True, ) -> FindEquilibriumResults: """Runs the equilibrium loop. If convergence is reached, then the function returns the new market. If the maximum number of iterations are reached, then a warning issued in the log and the function returns with the current status. Arguments: market: Commodities market, with the prices, supply, consumption and demand. sectors: A list of the sectors participating in the simulation. maxiter: Maximum number of iterations. tol: Tolerance for reaching equilibrium. equilibrium_variable: Variable to use to calculate the equilibrium condition. tol_unmet_demand: Tolerance for the unmet demand. excluded_commodities: Commodities to be excluded in check_demand_fulfillment equilibrium: if equilibrium should be reached. Useful to testing. Returns: A tuple with the updated market (prices, supply, consumption and demand), sectors, and convergence status. """ from logging import getLogger from numpy import ones from muse.utilities import future_propagation market = market.copy(deep=True) if excluded_commodities: included = ~market.commodity.isin(excluded_commodities) else: included = ones(len(market.commodity), dtype=bool) market["updated_prices"] = market.prices.copy() prior_market = market.copy(deep=True) converged = False equilibrium_sectors = sectors for i in range(maxiter): market["prices"] = market.updated_prices prior_market, market = market, prior_market market.consumption[:] = 0.0 market.supply[:] = 0.0 market, equilibrium_sectors = single_year_iteration(market, sectors) check_demand_fulfillment(market.sel(commodity=included), tol_unmet_demand) equilibrium_reached = check_equilibrium( market.sel(commodity=included), prior_market.sel(commodity=included), tol, equilibrium_variable, market.year[1], ) if equilibrium_reached: converged = True new_price = prior_market["prices"].sel(year=market.year[1]).copy() new_price.loc[dict(commodity=included)] = market.updated_prices.sel( commodity=included, year=market.year[1] ) market["prices"] = future_propagation( # type: ignore market["prices"], new_price ) break if equilibrium and not converged: new_price = prior_market["prices"].sel(year=market.year[1]).copy() new_price.loc[dict(commodity=included)] = ( # type: ignore 0.8 * new_price.loc[dict(commodity=included)] # type: ignore ) new_price.loc[dict(commodity=included)] += ( 0.2 * market.updated_prices.loc[ dict(year=market.year[1], commodity=included) ] ) market["prices"] = future_propagation( # type: ignore market["prices"], new_price ) if not equilibrium: equilibrium_reached = True converged = True break if maxiter == 1: equilibrium_reached = True converged = True break else: # We didn't reached convergence and the loop is over: the simulation failed! msg = ( f"CONVERGENCE ERROR: Maximum number of iterations ({maxiter}) reached " f"in year {int(market.year[0])}" ) new_price = prior_market["prices"].sel(year=market.year[1]).copy() new_price.loc[dict(commodity=included)] = ( 0.8 * new_price.loc[dict(commodity=included)] # type: ignore ) new_price.loc[dict(commodity=included)] += ( 0.2 * market.updated_prices.loc[dict(year=market.year[1], commodity=included)] ) market["prices"] = future_propagation( # type: ignore market["prices"], new_price ) getLogger(__name__).critical(msg) return FindEquilibriumResults( converged, market.drop_vars("updated_prices"), equilibrium_sectors )
[docs] def check_demand_fulfillment(market: Dataset, tol: float) -> bool: """Checks if the supply will fulfill all the demand in the future. If it does not, it logs a warning. Arguments: market: Commodities market, with the prices, supply, consumption and demand. tol: Tolerance for the unmet demand. Returns: True if the supply fulfils the demand; False otherwise """ from logging import getLogger future = market.year[-1] delta = (market.supply - market.consumption).sel(year=future) unmet = (delta < tol).any([u for u in delta.dims if u != "commodity"]) if unmet.any(): commodities = ", ".join(unmet.commodity.sel(commodity=unmet.values).values) getLogger(__name__).warning(f"Check growth constraints for {commodities}.") return False return True
[docs] def check_equilibrium( market: Dataset, int_market: Dataset, tolerance: float, equilibrium_variable: Text, year: Optional[int] = None, ) -> bool: """Checks if equilibrium has been reached. This function checks if the difference in either the demand or the prices between iterations if smaller than certain tolerance. If is, then it is assumed that the process has converged. Arguments: market: The market values in this iteration. int_market: The market values in the previous iteration. tolerance: Tolerance for reaching equilibrium. equilibrium_variable: Variable to use to calculate the equilibrium condition. year: year for which to check changes. Default to minimum year in market. Returns: True if converged, False otherwise. """ from numpy import abs if year is None: year = market.year.min() if equilibrium_variable == "demand": delta = ( market.consumption.sel(year=year) - market.supply.sel(year=year) - int_market.consumption.sel(year=year) + int_market.supply.sel(year=year) ) else: delta = market.prices.sel(year=year) - int_market.prices.sel(year=year) return bool((abs(delta) < tolerance).all())