Source code for muse.quantities

"""Collection of functions to compute model quantities.

This module is meant to collect functions computing quantities of interest to the model,
e.g. lcoe, maximum production for a given capacity, etc, especially where these
functions are used in different areas of the model.
"""

from typing import Callable, Optional, Sequence, Text, Tuple, Union, cast

import numpy as np
import xarray as xr


[docs] def supply( capacity: xr.DataArray, demand: xr.DataArray, technologies: Union[xr.Dataset, xr.DataArray], interpolation: Text = "linear", production_method: Optional[Callable] = None, ) -> xr.DataArray: """Production and emission for a given capacity servicing a given demand. Supply includes two components, end-uses outputs and environmental pollutants. The former consists of the demand that the current capacity is capable of servicing. Where there is excess capacity, then service is assigned to each asset a share of the maximum production (e.g. utilization across similar assets is the same in percentage). Then, environmental pollutants are computing as a function of commodity outputs. Arguments: capacity: number/quantity of assets that can service the demand demand: amount of each end-use required. The supply of each process will not exceed its share of the demand. technologies: factors bindings the capacity of an asset with its production of commodities and environmental pollutants. interpolation: Interpolation type production_method: Production for a given capacity Return: A data array where the commodity dimension only contains actual outputs (i.e. no input commodities). """ from muse.commodities import CommodityUsage, check_usage, is_pollutant if production_method is None: production_method = maximum_production maxprod = production_method(technologies, capacity) size = np.array(maxprod.region).size # in presence of trade demand needs to map maxprod dst_region if ( "region" in demand.dims and "region" in maxprod.coords and "dst_region" not in maxprod.dims and size == 1 ): demand = demand.sel(region=maxprod.region) prodsum = set(demand.dims).difference(maxprod.dims) demsum = set(maxprod.dims).difference(demand.dims) expanded_demand = (demand * maxprod / maxprod.sum(demsum)).fillna(0) elif ( "region" in demand.dims and "region" in maxprod.coords and "dst_region" not in maxprod.dims and size > 1 ): prodsum = set(demand.dims).difference(maxprod.dims) demsum = set(maxprod.dims).difference(demand.dims) expanded_demand = (demand * maxprod / maxprod.sum(demsum)).fillna(0) elif ( "region" in demand.dims and "region" in maxprod.coords and "dst_region" in maxprod.dims ): demand = demand.rename(region="dst_region") prodsum = {"timeslice"} demsum = {"asset"} expanded_demand = (demand * maxprod / maxprod.sum(demsum)).fillna(0) else: prodsum = set(demand.dims).difference(maxprod.dims) demsum = set(maxprod.dims).difference(demand.dims) expanded_demand = (demand * maxprod / maxprod.sum(demsum)).fillna(0) expanded_maxprod = (maxprod * demand / demand.sum(prodsum)).fillna(0) expanded_demand = expanded_demand.reindex_like(maxprod) result = expanded_demand.where( expanded_demand <= expanded_maxprod, expanded_maxprod ) # add production of environmental pollutants env = is_pollutant(technologies.comm_usage) result[{"commodity": env}] = emission(result, technologies.fixed_outputs).transpose( *result.dims ) result[ {"commodity": ~check_usage(technologies.comm_usage, CommodityUsage.PRODUCT)} ] = 0 return result
[docs] def emission(production: xr.DataArray, fixed_outputs: xr.DataArray): """Computes emission from current products. Emissions are computed as `sum(product) * fixed_outputs`. Arguments: production: Produced goods. Only those with non-environmental products are used when computing emissions. fixed_outputs: factor relating total production to emissions. For convenience, this can also be a `technologies` dataset containing `fixed_output`. Return: A data array containing emissions (and only emissions). """ from muse.commodities import is_enduse, is_pollutant from muse.utilities import broadcast_techs # just in case we are passed a technologies dataset, like in other functions fouts = broadcast_techs( getattr(fixed_outputs, "fixed_outputs", fixed_outputs), production ) envs = is_pollutant(fouts.comm_usage) enduses = is_enduse(fouts.comm_usage) return production.sel(commodity=enduses).sum("commodity") * fouts.sel( commodity=envs )
[docs] def gross_margin( technologies: xr.Dataset, capacity: xr.DataArray, prices: xr.Dataset ) -> xr.DataArray: """The percentage of revenue after direct expenses have been subtracted. .. _reference: https://www.investopedia.com/terms/g/grossmargin.asp We first calculate the revenues, which depend on prices We then deduct the direct expenses - energy commodities INPUTS are related to fuel costs - environmental commodities OUTPUTS are related to environmental costs - variable costs is given as technodata inputs - non-environmental commodities OUTPUTS are related to revenues. """ from muse.commodities import is_enduse, is_pollutant from muse.timeslices import QuantityType, convert_timeslice from muse.utilities import broadcast_techs tech = broadcast_techs( # type: ignore cast( xr.Dataset, technologies[ [ "technical_life", "interest_rate", "var_par", "var_exp", "fixed_outputs", "fixed_inputs", ] ], ), capacity, ) var_par = tech.var_par var_exp = tech.var_exp fixed_outputs = tech.fixed_outputs fixed_inputs = tech.fixed_inputs # We separate the case where we have one or more regions caparegions = np.array(capacity.region.values).reshape(-1) if len(caparegions) > 1: prices.sel(region=capacity.region) else: prices = prices.where(prices.region == capacity.region, drop=True) prices = prices.interp(year=capacity.year.values) # Filters for pollutants and output commodities environmentals = is_pollutant(technologies.comm_usage) enduses = is_enduse(technologies.comm_usage) # Variable costs depend on factors such as labour variable_costs = convert_timeslice( var_par * ((fixed_outputs.sel(commodity=enduses)).sum("commodity")) ** var_exp, prices.timeslice, QuantityType.EXTENSIVE, ) # The individual prices are selected # costs due to consumables, direct inputs consumption_costs = (prices * fixed_inputs).sum("commodity") # costs due to pollutants production_costs = prices * fixed_outputs environmental_costs = (production_costs.sel(commodity=environmentals)).sum( ("commodity") ) # revenues due to product sales revenues = (production_costs.sel(commodity=enduses)).sum("commodity") # Gross margin is the net between revenues and all costs result = revenues - environmental_costs - variable_costs - consumption_costs # Gross margin is defined as a ratio on revenues and as a percentage result *= 100 / revenues return result
[docs] def decommissioning_demand( technologies: xr.Dataset, capacity: xr.DataArray, year: Optional[Sequence[int]] = None, ) -> xr.DataArray: r"""Computes demand from process decommissioning. If `year` is not given, it defaults to all years in capacity. If there are more than two years, then decommissioning is with respect to first (or minimum) year. Let :math:`M_t^r(y)` be the retrofit demand, :math:`^{(s)}\mathcal{D}_t^r(y)` be the decommissioning demand at the level of the sector, and :math:`A^r_{t, \iota}(y)` be the assets owned by the agent. Then, the decommissioning demand for agent :math:`i` is : .. math:: \mathcal{D}^{r, i}_{t, c}(y) = \sum_\iota \alpha_{t, \iota}^r \beta_{t, \iota, c}^r \left(A^{i, r}_{t, \iota}(y) - A^{i, r}_{t, \iota, c}(y + 1) \right) given the utilization factor :math:`\alpha_{t, \iota}` and the fixed output factor :math:`\beta_{t, \iota, c}`. Furthermore, decommissioning demand is non-zero only for end-use commodities. ncsearch-nohlsearch).. SeeAlso: :ref:`indices`, :ref:`quantities`, :py:func:`~muse.quantities.maximum_production` :py:func:`~muse.commodities.is_enduse` """ if year is None: year = capacity.year.values year = sorted(year) capacity = capacity.interp(year=year, kwargs={"fill_value": 0.0}) baseyear = min(year) dyears = [u for u in year if u != baseyear] return maximum_production( technologies, capacity.sel(year=baseyear) - capacity.sel(year=dyears) ).clip(min=0)
[docs] def consumption( technologies: xr.Dataset, production: xr.DataArray, prices: Optional[xr.DataArray] = None, **kwargs, ) -> xr.DataArray: """Commodity consumption when fulfilling the whole production. Currently, the consumption is implemented for commodity_max == +infinity. If prices are not given, then flexible consumption is *not* considered. """ from muse.commodities import is_enduse, is_fuel from muse.timeslices import QuantityType, convert_timeslice from muse.utilities import filter_with_template params = filter_with_template( technologies[["fixed_inputs", "flexible_inputs"]], production, **kwargs ) # sum over end-use products, if the dimension exists in the input comm_usage = technologies.comm_usage.sel(commodity=production.commodity) production = production.sel(commodity=is_enduse(comm_usage)).sum("commodity") if prices is not None and "timeslice" in prices.dims: production = convert_timeslice( # type: ignore production, prices, QuantityType.EXTENSIVE ) params_fuels = is_fuel(params.comm_usage) consumption = production * params.fixed_inputs.where(params_fuels, 0) if prices is None: return consumption if not (params.flexible_inputs.sel(commodity=params_fuels) > 0).any(): return consumption prices = filter_with_template(prices, production, installed_as_year=False, **kwargs) # technology with flexible inputs flexs = params.flexible_inputs.where(params_fuels, 0) # cheapest fuel for each flexible technology assert prices is not None flexprice = [i for i in flexs.commodity.values if i in prices.commodity.values] assert all(flexprice) priceflex = prices.loc[dict(commodity=flexs.commodity)] minprices = flexs.commodity[ priceflex.where(flexs > 0, priceflex.max() + 1).argmin("commodity") ] # add consumption from cheapest fuel assert all(flexs.commodity.values == consumption.commodity.values) flex = flexs.where(minprices == flexs.commodity, 0) flex = flex / (flex > 0).sum("commodity").clip(min=1) return consumption + flex * production
[docs] def annual_levelized_cost_of_energy( prices: xr.DataArray, technologies: xr.Dataset, interpolation: Text = "linear", fill_value: Union[int, Text] = "extrapolate", **filters, ) -> xr.DataArray: """Undiscounted levelized cost of energy (LCOE) of technologies on each given year. It mostly follows the `simplified LCOE`_ given by NREL. In the argument description, we use the following: * [h]: hour * [y]: year * [$]: unit of currency * [E]: unit of energy * [1]: dimensionless Arguments: prices: [$/(Eh)] the price of all commodities, including consumables and fuels. This dataarray contains at least timeslice and commodity dimensions. technologies: Describe the technologies, with at least the following parameters: * cap_par: [$/E] overnight capital cost * interest_rate: [1] * fix_par: [$/(Eh)] fixed costs of operation and maintenance costs * var_par: [$/(Eh)] variable costs of operation and maintenance costs * fixed_inputs: [1] == [(Eh)/(Eh)] ratio indicating the amount of commodity consumed per units of energy created. * fixed_outputs: [1] == [(Eh)/(Eh)] ration indicating the amount of environmental pollutants produced per units of energy created. interpolation: interpolation method. fill_value: Fill value for values outside the extrapolation range. **filters: Anything by which prices can be filtered. Return: The lifetime LCOE in [$/(Eh)] for each technology at each timeslice. .. _simplified LCOE: https://www.nrel.gov/analysis/tech-lcoe-documentation.html """ from muse.commodities import is_pollutant from muse.timeslices import QuantityType, convert_timeslice techs = technologies[ [ "technical_life", "interest_rate", "cap_par", "var_par", "fix_par", "fixed_inputs", "flexible_inputs", "fixed_outputs", "utilization_factor", ] ] if "year" in techs.dims: techs = techs.interp( year=prices.year, method=interpolation, kwargs={"fill_value": fill_value} ) if filters is not None: prices = prices.sel({k: v for k, v in filters.items() if k in prices.dims}) techs = techs.sel({k: v for k, v in filters.items() if k in techs.dims}) assert {"timeslice", "commodity"}.issubset(prices.dims) life = techs.technical_life.astype(int) annualized_capital_costs = ( convert_timeslice( techs.cap_par * techs.interest_rate / (1 - (1 + techs.interest_rate) ** (-life)), prices.timeslice, QuantityType.EXTENSIVE, ) / techs.utilization_factor ) o_and_e_costs = ( convert_timeslice( (techs.fix_par + techs.var_par), prices.timeslice, QuantityType.EXTENSIVE, ) / techs.utilization_factor ) fuel_costs = (techs.fixed_inputs * prices).sum("commodity") fuel_costs += (techs.flexible_inputs * prices).sum("commodity") if "region" in techs.dims: env_costs = ( (techs.fixed_outputs * prices) .sel(region=techs.region) .sel(commodity=is_pollutant(techs.comm_usage)) .sum("commodity") ) else: env_costs = ( (techs.fixed_outputs * prices) .sel(commodity=is_pollutant(techs.comm_usage)) .sum("commodity") ) return annualized_capital_costs + o_and_e_costs + env_costs + fuel_costs
[docs] def maximum_production(technologies: xr.Dataset, capacity: xr.DataArray, **filters): r"""Production for a given capacity. Given a capacity :math:`\mathcal{A}_{t, \iota}^r`, the utilization factor :math:`\alpha^r_{t, \iota}` and the the fixed outputs of each technology :math:`\beta^r_{t, \iota, c}`, then the result production is: .. math:: P_{t, \iota}^r = \alpha^r_{t, \iota}\beta^r_{t, \iota, c}\mathcal{A}_{t, \iota}^r The dimensions above are only indicative. The function should work with many different input values, e.g. with capacities expanded over time-slices :math:`t` or agents :math:`i`. Arguments: capacity: Capacity of each technology of interest. In practice, the capacity can refer to asset capacity, the max capacity, or the capacity-in-use. technologies: xr.Dataset describing the features of the technologies of interests. It should contain `fixed_outputs` and `utilization_factor`. It's shape is matched to `capacity` using `muse.utilities.broadcast_techs`. filters: keyword arguments are used to filter down the capacity and technologies. Filters not relevant to the quantities of interest, i.e. filters that are not a dimension of `capacity` or `technologies`, are silently ignored. Return: `capacity * fixed_outputs * utilization_factor`, whittled down according to the filters and the set of technologies in `capacity`. """ from muse.commodities import is_enduse from muse.utilities import broadcast_techs, filter_input capa = filter_input( capacity, **{k: v for k, v in filters.items() if k in capacity.dims} ) btechs = broadcast_techs( # type: ignore cast(xr.Dataset, technologies[["fixed_outputs", "utilization_factor"]]), capa ) ftechs = filter_input( btechs, **{k: v for k, v in filters.items() if k in btechs.dims} ) result = capa * ftechs.fixed_outputs * ftechs.utilization_factor return result.where(is_enduse(result.comm_usage), 0)
[docs] def demand_matched_production( demand: xr.DataArray, prices: xr.DataArray, capacity: xr.DataArray, technologies: xr.Dataset, **filters, ) -> xr.DataArray: """Production matching the input demand. Arguments: demand: demand to match. prices: price from which to compute the annual levelized cost of energy. capacity: capacity from which to obtain the maximum production constraints. technologies: technologies we are looking at **filters: keyword arguments with which to filter the input datasets and data arrays., e.g. region, or year. """ from muse.demand_matching import demand_matching from muse.timeslices import QuantityType, convert_timeslice from muse.utilities import broadcast_techs technodata = cast(xr.Dataset, broadcast_techs(technologies, capacity)) cost = annual_levelized_cost_of_energy(prices, technodata, **filters) max_production = maximum_production(technodata, capacity, **filters) assert ("timeslice" in demand.dims) == ("timeslice" in cost.dims) if "timeslice" in demand.dims and "timeslice" not in max_production.dims: max_production = convert_timeslice( max_production, demand.timeslice, QuantityType.EXTENSIVE ) return demand_matching(demand, cost, max_production)
[docs] def capacity_in_use( production: xr.DataArray, technologies: xr.Dataset, max_dim: Optional[Union[Text, Tuple[Text]]] = "commodity", **filters, ): """Capacity-in-use for each asset, given production. Conceptually, this operation is the inverse of `production`. Arguments: production: Production from each technology of interest. technologies: xr.Dataset describing the features of the technologies of interests. It should contain `fixed_outputs` and `utilization_factor`. It's shape is matched to `capacity` using `muse.utilities.broadcast_techs`. max_dim: reduces the given dimensions using `max`. Defaults to "commodity". If None, then no reduction is performed. filters: keyword arguments are used to filter down the capacity and technologies. Filters not relevant to the quantities of interest, i.e. filters that are not a dimension of `capacity` or `technologies`, are silently ignored. Return: Capacity-in-use for each technology, whittled down by the filters. """ from muse.commodities import is_enduse from muse.utilities import broadcast_techs, filter_input prod = filter_input( production, **{k: v for k, v in filters.items() if k in production.dims} ) techs = technologies[["fixed_outputs", "utilization_factor"]] assert isinstance(techs, xr.Dataset) btechs = broadcast_techs(techs, prod) ftechs = filter_input( btechs, **{k: v for k, v in filters.items() if k in technologies.dims} ) factor = 1 / (ftechs.fixed_outputs * ftechs.utilization_factor) capa_in_use = (prod * factor).where(~np.isinf(factor), 0) capa_in_use = capa_in_use.where( is_enduse(technologies.comm_usage.sel(commodity=capa_in_use.commodity)), 0 ) if max_dim: capa_in_use = capa_in_use.max(max_dim) return capa_in_use
[docs] def supply_cost( production: xr.DataArray, lcoe: xr.DataArray, asset_dim: Optional[Text] = "asset" ) -> xr.DataArray: """Supply cost given production and the levelized cost of energy. In practice, the supply cost is the weighted average LCOE over assets (`asset_dim`), where the weights are the production. Arguments: production: Amount of goods produced. In practice, production can be obtained from the capacity for each asset via the method `muse.quantities.production`. lcoe: Levelized cost of energy for each good produced. In practice, it can be obtained from market prices via `muse.quantities.annual_levelized_cost_of_energy` or `muse.quantities.lifetime_levelized_cost_of_energy`. asset_dim: Name of the dimension(s) holding assets, processes or technologies. """ data = xr.Dataset(dict(production=production, prices=production * lcoe)) if asset_dim is not None: if "region" not in data.coords or len(data.region.dims) == 0: data = data.sum(asset_dim) else: data = data.groupby("region").sum(asset_dim) total = data.production.where(np.abs(data.production) > 1e-15, np.infty).sum( "timeslice" ) return data.prices / total
[docs] def costed_production( demand: xr.Dataset, costs: xr.DataArray, capacity: xr.DataArray, technologies: xr.Dataset, with_minimum_service: bool = True, ) -> xr.DataArray: """Computes production from ranked assets. The assets are ranked according to their cost. The asset with least cost are allowed to service the demand first, up to the maximum production. By default, the minimum service is applied first. """ from muse.quantities import maximum_production from muse.timeslices import QuantityType, convert_timeslice from muse.utilities import broadcast_techs technodata = cast(xr.Dataset, broadcast_techs(technologies, capacity)) if len(capacity.region.dims) == 0: def group_assets(x: xr.DataArray) -> xr.DataArray: return x.sum("asset") else: def group_assets(x: xr.DataArray) -> xr.DataArray: return xr.Dataset(dict(x=x)).groupby("region").sum("asset").x ranking = costs.rank("asset") maxprod = convert_timeslice( maximum_production(technodata, capacity), demand.timeslice, QuantityType.EXTENSIVE, ) commodity = (maxprod > 0).any([i for i in maxprod.dims if i != "commodity"]) commodity = commodity.drop_vars( [u for u in commodity.coords if u not in commodity.dims] ) demand = demand.sel(commodity=commodity).copy() constraints = ( xr.Dataset(dict(maxprod=maxprod, ranking=ranking, has_output=maxprod > 0)) .set_coords("ranking") .set_coords("has_output") .sel(commodity=commodity) ) if not with_minimum_service: production = xr.zeros_like(constraints.maxprod) else: production = ( getattr(technodata, "minimum_service_factor", 0) * constraints.maxprod ) demand = np.maximum(demand - group_assets(production), 0) for rank in sorted(set(constraints.ranking.values.flatten())): condition = (constraints.ranking == rank) & constraints.has_output current_maxprod = constraints.maxprod.where(condition, 0) fullprod = group_assets(current_maxprod) if (fullprod <= demand + 1e-10).all(): current_demand = fullprod current_prod = current_maxprod else: if "region" in demand.dims: demand_prod = demand.sel(region=production.region) else: demand_prod = demand demand_prod = ( current_maxprod / current_maxprod.sum("asset") * demand_prod ).where(condition, 0) current_prod = np.minimum(demand_prod, current_maxprod) current_demand = group_assets(current_prod) demand -= np.minimum(current_demand, demand) production = production + current_prod result = xr.zeros_like(maxprod) result[dict(commodity=commodity)] = result[dict(commodity=commodity)] + production return result