Source code for muse.outputs.mca

"""Output quantities.

Functions that compute MCA quantities for post-simulation analysis should all follow the
same signature:

.. code-block:: python

    @register_output_quantity
    def quantity(
        sectors: List[AbstractSector],
        market: xr.Dataset,
        year: int,
        **kwargs
    ) -> Union[pd.DataFrame, xr.DataArray]:
        pass

The function should never modify it's arguments. It can return either a pandas dataframe
or an xarray xr.DataArray.
"""

from collections.abc import Mapping, MutableMapping
from operator import attrgetter
from pathlib import Path
from typing import Any, Callable, Union

import numpy as np
import pandas as pd
import xarray as xr
from mypy_extensions import KwArg

from muse.outputs.sector import market_quantity
from muse.registration import registrator
from muse.sectors import AbstractSector
from muse.timeslices import broadcast_timeslice, distribute_timeslice
from muse.utilities import multiindex_to_coords

OUTPUT_QUANTITY_SIGNATURE = Callable[
    [xr.Dataset, list[AbstractSector], KwArg(Any)], Union[xr.DataArray, pd.DataFrame]
]
"""Signature of functions computing quantities for later analysis."""

OUTPUT_QUANTITIES: MutableMapping[str, OUTPUT_QUANTITY_SIGNATURE] = {}
"""Quantity for post-simulation analysis."""

OUTPUTS_PARAMETERS = Union[str, Mapping]
"""Acceptable Datastructures for outputs parameters"""


[docs] @registrator(registry=OUTPUT_QUANTITIES) def register_output_quantity( function: OUTPUT_QUANTITY_SIGNATURE | None = None, ) -> Callable: """Registers a function to compute an output quantity.""" from functools import wraps assert function is not None @wraps(function) def decorated(*args, **kwargs): result: xr.DataArray = function(*args, **kwargs) result.name = function.__name__ return result return decorated
[docs] def factory( *parameters: OUTPUTS_PARAMETERS, ) -> Callable[[xr.Dataset, list[AbstractSector]], list[Path]]: """Creates outputs functions for post-mortem analysis. Each parameter is a dictionary containing the following: - quantity (mandatory): name of the quantity to output. Mandatory. - sink (optional): name of the storage procedure, e.g. the file format or database format. When it cannot be guessed from `filename`, it defaults to "csv". - filename (optional): path to a directory or a file where to store the quantity. In the latter case, if sink is not given, it will be determined from the file extension. The filename can incorporate markers. By default, it is "{default_output_dir}/{sector}{year}{quantity}{suffix}". - any other parameter relevant to the sink, e.g. `pandas.to_csv` keyword arguments. For simplicity, it is also possible to give lone strings as input. They default to `{'quantity': string}` (and the sink will default to "csv"). """ from muse.outputs.sector import _factory return _factory(OUTPUT_QUANTITIES, *parameters, sector_name="MCA")
[docs] @register_output_quantity def consumption( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current consumption.""" result = market_quantity(market.consumption, **kwargs).to_dataframe().reset_index() return result[result.consumption != 0]
[docs] @register_output_quantity def supply( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current supply.""" result = market_quantity(market.supply, **kwargs).to_dataframe().reset_index() return result[result.supply != 0]
[docs] @register_output_quantity def prices( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs, ) -> pd.DataFrame: """Current MCA market prices.""" return market_quantity(market.prices, **kwargs).to_dataframe().reset_index()
[docs] @register_output_quantity def capacity( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current capacity across all sectors.""" return _aggregate_sectors(sectors, op=sector_capacity)
[docs] def sector_capacity(sector: AbstractSector) -> pd.DataFrame: """Sector capacity with agent annotations.""" capa_sector: list[xr.DataArray] = [] agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) for agent in agents: capa_agent = agent.assets.capacity capa_agent["agent"] = agent.name capa_agent["type"] = agent.category capa_agent["sector"] = getattr(sector, "name", "unnamed") if len(capa_agent) > 0 and len(capa_agent.technology.values) > 0: if "dst_region" not in capa_agent.coords: capa_agent["dst_region"] = agent.region a = capa_agent.to_dataframe() b = ( a.groupby( [ "technology", "dst_region", "region", "agent", "sector", "type", "year", "installed", ] ) .sum() # ("asset") .fillna(0) ) c = b.reset_index() capa_sector.append(c) if len(capa_sector) == 0: return pd.DataFrame() capacity = pd.concat([u for u in capa_sector]) capacity = capacity[capacity.capacity != 0] return capacity
def _aggregate_sectors( sectors: list[AbstractSector], *args, op: Callable ) -> pd.DataFrame: """Aggregate outputs from all sectors.""" alldata = [op(sector, *args) for sector in sectors] if len(alldata) == 0: return pd.DataFrame() return pd.concat(alldata, sort=True)
[docs] @register_output_quantity(name=["fuel_costs"]) def metric_fuel_costs( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current fuel costs across all sectors.""" return _aggregate_sectors(sectors, market, year, op=sector_fuel_costs)
[docs] def sector_fuel_costs( sector: AbstractSector, market: xr.Dataset, year: int, **kwargs ) -> pd.DataFrame: """Sector fuel costs with agent annotations.""" from muse.commodities import is_fuel from muse.dispatch import ( share_based_production, ) # TODO: should not have to recalculate supply from muse.quantities import consumption data_sector: list[xr.DataArray] = [] technologies = getattr(sector, "technologies", []) agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) agent_market = market.copy(deep=True) if len(technologies) > 0: for a in agents: agent_market["consumption"] = (market.consumption * a.quantity).sel( year=year ) commodity = is_fuel(technologies.comm_usage) capacity = a.filter_input( a.assets.capacity, year=year, ).fillna(0.0) production = share_based_production( agent_market.consumption, capacity, technologies, ) prices = a.filter_input(market.prices, year=year) fcons = consumption( technologies=technologies, production=production, prices=prices ) data_agent = (fcons * prices).sel(commodity=commodity) data_agent["agent"] = a.name data_agent["category"] = a.category data_agent["sector"] = getattr(sector, "name", "unnamed") data_agent["year"] = year data_agent = multiindex_to_coords(data_agent, "timeslice").to_dataframe( "fuel_consumption_costs" ) if not data_agent.empty: data_sector.append(data_agent) if len(data_sector) > 0: output = pd.concat(data_sector, sort=True).reset_index() else: output = pd.DataFrame() return output
[docs] @register_output_quantity(name=["capital_costs"]) def metric_capital_costs( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current capital costs across all sectors.""" return _aggregate_sectors(sectors, market, year, op=sector_capital_costs)
[docs] def sector_capital_costs( sector: AbstractSector, market: xr.Dataset, year: int, **kwargs ) -> pd.DataFrame: """Sector capital costs with agent annotations.""" data_sector: list[xr.DataArray] = [] technologies = getattr(sector, "technologies", []) agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) if len(technologies) > 0: for a in agents: capacity = a.filter_input(a.assets.capacity, year=year).fillna(0.0) data = a.filter_input( technologies[["cap_par", "cap_exp"]], year=year, technology=capacity.technology, ) data_agent = distribute_timeslice(data.cap_par * (capacity**data.cap_exp)) data_agent["agent"] = a.name data_agent["category"] = a.category data_agent["sector"] = getattr(sector, "name", "unnamed") data_agent["year"] = year data_agent = multiindex_to_coords(data_agent, "timeslice").to_dataframe( "capital_costs" ) if not data_agent.empty: data_sector.append(data_agent) if len(data_sector) > 0: output = pd.concat(data_sector, sort=True).reset_index() else: output = pd.DataFrame() return output
[docs] @register_output_quantity(name=["emission_costs"]) def metric_emission_costs( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current emission costs across all sectors.""" return _aggregate_sectors(sectors, market, year, op=sector_emission_costs)
[docs] def sector_emission_costs( sector: AbstractSector, market: xr.Dataset, year: int, **kwargs ) -> pd.DataFrame: """Sector emission costs with agent annotations.""" from muse.commodities import is_enduse, is_pollutant from muse.dispatch import ( share_based_production, ) # TODO: should not have to recalculate supply data_sector: list[xr.DataArray] = [] technologies = getattr(sector, "technologies", []) agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) agent_market = market.copy(deep=True) if len(technologies) > 0: for a in agents: agent_market["consumption"] = (market.consumption * a.quantity).sel( year=year ) capacity = a.filter_input(a.assets.capacity, year=year).fillna(0.0) allemissions = a.filter_input( technologies.fixed_outputs, commodity=is_pollutant(technologies.comm_usage), technology=capacity.technology, year=year, ) envs = is_pollutant(technologies.comm_usage) enduses = is_enduse(technologies.comm_usage) i = (np.where(envs))[0][0] red_envs = envs[i].commodity.values prices = a.filter_input(market.prices, year=year, commodity=red_envs) production = share_based_production( agent_market.consumption, capacity, technologies, ) total = production.sel(commodity=enduses).sum("commodity") data_agent = total * (allemissions * prices).sum("commodity") data_agent["agent"] = a.name data_agent["category"] = a.category data_agent["sector"] = getattr(sector, "name", "unnamed") data_agent["year"] = year data_agent = multiindex_to_coords(data_agent, "timeslice").to_dataframe( "emission_costs" ) if not data_agent.empty: data_sector.append(data_agent) if len(data_sector) > 0: output = pd.concat(data_sector, sort=True).reset_index() else: output = pd.DataFrame() return output
[docs] @register_output_quantity(name=["LCOE"]) def metric_lcoe( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current lifetime levelised cost across all sectors.""" return _aggregate_sectors(sectors, market, year, op=sector_lcoe)
[docs] def sector_lcoe( sector: AbstractSector, market: xr.Dataset, year: int, **kwargs ) -> pd.DataFrame: """Levelized cost of energy () of technologies over their lifetime.""" from muse.commodities import is_enduse from muse.costs import levelized_cost_of_energy as LCOE from muse.quantities import capacity_to_service_demand, consumption market = market.copy(deep=True) # Filtering of the inputs data_sector: list[xr.DataArray] = [] technologies = getattr(sector, "technologies", []) agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) retro = [a for a in agents if a.category == "retrofit"] new = [a for a in agents if a.category == "newcapa"] agents = retro if len(retro) > 0 else new if len(technologies) > 0: for agent in agents: agent_market = market.sel(year=agent.year).copy(deep=True) agent_market["consumption"] = agent_market.consumption * agent.quantity # Filter commodities based on end-use status enduse_mask = is_enduse(technologies.comm_usage) commodities = agent_market.commodity.values included_commodities = commodities[ np.isin(commodities, enduse_mask.commodity[enduse_mask]) ] excluded_commodities = commodities[ ~np.isin(commodities, enduse_mask.commodity[enduse_mask]) ] agent_market.loc[dict(commodity=excluded_commodities)] = 0 agent_market["prices"] = agent.filter_input( market["prices"], year=agent.year ) techs = agent.filter_input( technologies, year=agent.year, ) prices = agent_market["prices"].sel(commodity=techs.commodity) demand = agent_market.consumption.sel(commodity=included_commodities) capacity = agent.filter_input(capacity_to_service_demand(demand, techs)) production = ( broadcast_timeslice(capacity) * distribute_timeslice(techs.fixed_outputs) * broadcast_timeslice(techs.utilization_factor) ) consump = consumption( technologies=techs, prices=prices, production=production ) result = LCOE( technologies=techs, prices=prices, capacity=capacity, production=production, consumption=consump, method="lifetime", ) data_agent = result data_agent["agent"] = agent.name data_agent["category"] = agent.category data_agent["sector"] = getattr(sector, "name", "unnamed") data_agent["year"] = agent.year data_agent = data_agent.fillna(0) data_agent = multiindex_to_coords(data_agent, "timeslice").to_dataframe( "LCOE" ) if not data_agent.empty: data_sector.append(data_agent) if len(data_sector) > 0: output = pd.concat(data_sector, sort=True).reset_index() else: output = pd.DataFrame() return output
[docs] @register_output_quantity(name=["EAC"]) def metric_eac( market: xr.Dataset, sectors: list[AbstractSector], year: int, **kwargs ) -> pd.DataFrame: """Current emission costs across all sectors.""" return _aggregate_sectors(sectors, market, year, op=sector_eac)
[docs] def sector_eac( sector: AbstractSector, market: xr.Dataset, year: int, **kwargs ) -> pd.DataFrame: """Net Present Value of technologies over their lifetime.""" from muse.commodities import is_enduse from muse.costs import equivalent_annual_cost as EAC from muse.quantities import capacity_to_service_demand, consumption market = market.copy(deep=True) # Filtering of the inputs data_sector: list[xr.DataArray] = [] technologies = getattr(sector, "technologies", []) agents = sorted(getattr(sector, "agents", []), key=attrgetter("name")) retro = [a for a in agents if a.category == "retrofit"] new = [a for a in agents if a.category == "newcapa"] agents = retro if len(retro) > 0 else new if len(technologies) > 0: for agent in agents: agent_market = market.sel(year=agent.year).copy(deep=True) agent_market["consumption"] = agent_market.consumption * agent.quantity # Filter commodities based on end-use status enduse_mask = is_enduse(technologies.comm_usage) commodities = agent_market.commodity.values included_commodities = commodities[ np.isin(commodities, enduse_mask.commodity[enduse_mask]) ] excluded_commodities = commodities[ ~np.isin(commodities, enduse_mask.commodity[enduse_mask]) ] agent_market.loc[dict(commodity=excluded_commodities)] = 0 agent_market["prices"] = agent.filter_input( market["prices"], year=agent.year ) techs = agent.filter_input( technologies, year=agent.year, ) prices = agent_market["prices"].sel(commodity=techs.commodity) demand = agent_market.consumption.sel(commodity=included_commodities) capacity = agent.filter_input(capacity_to_service_demand(demand, techs)) production = ( broadcast_timeslice(capacity) * distribute_timeslice(techs.fixed_outputs) * broadcast_timeslice(techs.utilization_factor) ) consump = consumption( technologies=techs, prices=prices, production=production ) result = EAC( technologies=techs, prices=prices, capacity=capacity, production=production, consumption=consump, ) data_agent = result data_agent["agent"] = agent.name data_agent["category"] = agent.category data_agent["sector"] = getattr(sector, "name", "unnamed") data_agent["year"] = agent.year data_agent = multiindex_to_coords(data_agent, "timeslice").to_dataframe( "capital_costs" ) if not data_agent.empty: data_sector.append(data_agent) if len(data_sector) > 0: output = pd.concat(data_sector, sort=True).reset_index() else: output = pd.DataFrame() return output