from typing import Callable, MutableMapping, Sequence, Text
import numpy as np
import xarray as xr
from scipy.optimize import curve_fit
from muse.mca import FindEquilibriumResults
from muse.registration import registrator
from muse.sectors import AbstractSector
CARBON_BUDGET_METHODS_SIGNATURE = Callable[
[xr.Dataset, list, Callable, xr.DataArray, xr.DataArray], float
]
"""carbon budget fitters signature."""
CARBON_BUDGET_FITTERS_SIGNATURE = Callable[[np.ndarray, np.ndarray, int], float]
"""carbon budget fitters signature."""
CARBON_BUDGET_METHODS: MutableMapping[Text, CARBON_BUDGET_METHODS_SIGNATURE] = {}
"""Dictionary of carbon budget methods checks."""
CARBON_BUDGET_FITTERS: MutableMapping[Text, CARBON_BUDGET_FITTERS_SIGNATURE] = {}
"""Dictionary of carbon budget fitters."""
[docs]
@registrator(registry=CARBON_BUDGET_FITTERS, loglevel="debug")
def register_carbon_budget_fitter(function: CARBON_BUDGET_FITTERS_SIGNATURE = None):
"""Decorator to register a carbon budget function."""
return function
[docs]
@registrator(registry=CARBON_BUDGET_METHODS, loglevel="debug")
def register_carbon_budget_method(function: CARBON_BUDGET_METHODS_SIGNATURE = None):
"""Decorator to register a carbon budget function."""
return function
[docs]
def update_carbon_budget(
carbon_budget: Sequence[float],
emissions: float,
year_idx: int,
over: bool = True,
under: bool = True,
) -> float:
"""Adjust the carbon budget in the far future if emissions too high or low.
This feature can allow to simulate overshoot shifting.
Arguments:
carbon_budget: budget for future year,
emissions: emission for future year,
year_idx: index of year for estimation,
over: if True, allows overshoot,
under: if True, allows undershoot.
Returns:
An adjusted threshold for the future year
"""
delta = emissions - carbon_budget[year_idx + 1]
new_future_budget = carbon_budget[year_idx + 2]
if over:
new_future_budget -= max(delta, 0)
if under:
new_future_budget -= min(delta, 0)
return new_future_budget
[docs]
@register_carbon_budget_method
def fitting(
market: xr.Dataset,
sectors: list,
equilibrium: Callable[
[xr.Dataset, Sequence[AbstractSector]], FindEquilibriumResults
],
carbon_budget: xr.DataArray,
carbon_price: xr.DataArray,
commodities: list,
sample_size: int = 4,
refine_price: bool = True,
price_too_high_threshold: float = 10,
fitter: Text = "slinear",
) -> float:
"""Used to solve the carbon market.
Given the emission of a period, adjusts carbon price to meet the budget. A carbon
market is meant as a pool of emissions for all the modelled regions; therefore, the
carbon price applies to all modelled regions. The method solves an equation applying
a fitting of the emission-carbon price relation.
Arguments:
market: Market, with the prices, supply, and consumption,
sectors: list of market sectors,
equilibrium: Method for searching market equilibrium,
carbon_budget: limit on emissions,
carbon_price: current carbon price
commodities: list of commodities to limit (ie. emissions),
sample_size: sample size for fitting,
refine_price: if True, performs checks on estimated carbon price,
price_too_high_threshold: threshold on carbon price,
fitter: method to fit emissions with carbon price.
Returns:
new_price: adjusted carbon price to meet budget
"""
future = market.year[-1]
threshold = carbon_budget.sel(year=future).values
emissions = market.supply.sel(year=future, commodity=commodities).sum().values
price = market.prices.sel(year=future, commodity=commodities).mean().values
# We create a sample of prices at which we want to calculate emissions
sample_prices = create_sample(price, emissions, threshold, sample_size)
sample_emissions = np.zeros_like(sample_prices)
sample_emissions[0] = emissions
# For each sample price, we calculate the new emissions
new_market = None
for i, new_price in enumerate(sample_prices[1:]):
# Reset market and sectors
new_market = market.copy(deep=True)
# Assign new carbon price
new_market.prices.loc[{"year": future, "commodity": commodities}] = new_price
new_market = equilibrium(new_market, sectors).market
sample_emissions[i + 1] = new_market.supply.sel(
year=future, commodity=commodities
).sum(["region", "timeslice", "commodity"])
# Based on these results, we finally adjust the carbon price
new_price = CARBON_BUDGET_FITTERS[fitter](
sample_prices,
sample_emissions,
threshold, # type: ignore
)
if refine_price and new_market is not None:
new_price = refine_new_price(
new_market,
carbon_price,
carbon_budget,
sample_prices,
new_price,
commodities,
price_too_high_threshold,
)
return new_price
[docs]
def refine_new_price(
market: xr.Dataset,
historic_price: xr.DataArray,
carbon_budget: xr.DataArray,
sample: np.ndarray,
price: float,
commodities: list,
price_too_high_threshold: float,
) -> float:
"""Refine the value of the carbon price.
Ensure it is not too high or low compared to heuristic values.
Arguments:
market: Market, with prices, supply, and consumption,
historic_price: DataArray with the historic carbon prices,
carbon_budget: DataArray with the carbon budget,
sample: Sample carbon price points,
price: Current carbon price, to be refined,
commodities: List of carbon-related commodities,
price_too_high_threshold: Threshold to decide what is a price too high.
Returns:
The new carbon price
"""
future = market.year[-1]
emissions = (
market.supply.sel(year=future, commodity=commodities)
.sum(["region", "timeslice", "commodity"])
.values
)
carbon_price = historic_price.sel(year=historic_price.year < future).values
if (carbon_price[-2:] > 0).all():
relative_price_increase = np.diff(carbon_price) / carbon_price[-1]
average = np.mean(relative_price_increase)
else:
average = 0.2
if price > price_too_high_threshold: # * max(min(carbon_price), 0.1):
price = min(price_too_high_threshold, max(sample) * (1 + average))
elif price <= 0:
threshold = carbon_budget.sel(year=future).values
exponent = (emissions - threshold) / threshold
magnitude = max(1 - np.exp(exponent), -0.1)
price = min(sample) * (1 + magnitude)
return price
def linear_fun(x, a, b):
return a + b * x
def exponential_fun(x, a, b, c):
return a * np.exp(-b * x) + c
[docs]
def create_sample(carbon_price, current_emissions, budget, size=4):
"""Calculates a sample of carbon prices to estimate the adjusted carbon price.
For each of these prices, the equilibrium loop will be run, obtaining a new value
for the emissions. Out of those price-emissions pairs, the final carbon price will
be estimated.
Arguments:
carbon_price: Current carbon price,
current_emissions: Current emissions,
budget: Carbon budget,
size: Number of points in the sample.
Returns:
An array with the sample prices.
"""
exponent = (current_emissions - budget) / budget
magnitude = max(1 - np.exp(-exponent), -0.1)
sample = carbon_price * (1 + np.linspace(0, 1, size) * magnitude)
return np.abs(sample)
[docs]
@register_carbon_budget_fitter
def linear(prices: np.ndarray, emissions: np.ndarray, budget: int) -> float:
"""Fits the prices-emissions pairs to a linear function.
Once that is done, an optimal carbon price is estimated
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The optimal carbon price.
"""
guess, weights = linear_guess_and_weights(prices, emissions, budget)
sol, _ = curve_fit(
linear_fun, emissions, prices, guess, sigma=weights, absolute_sigma=True
)
new_price = linear_fun(budget, *sol)
return new_price
[docs]
def linear_guess_and_weights(
prices: np.ndarray, emissions: np.ndarray, budget: int
) -> tuple:
"""Estimates initial values for the linear fitting algorithm and the weights.
The points closest to the budget are used to estimate the initial guess. They also
have the highest weight.
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The initial guess and weights
"""
weights = np.abs(emissions - budget)
idx = np.argsort(weights)
p = prices[idx][:2]
e = emissions[idx][:2]
den = e[1] - e[0]
num = p[1] - p[0]
if den != 0:
b = num / den
a = p[0] - b * e[0]
else:
b = 0
a = p[0]
return (a, b), weights
[docs]
@register_carbon_budget_fitter
def exponential(prices: np.ndarray, emissions: np.ndarray, budget: int) -> float:
"""Fits the prices-emissions pairs to an exponential function.
Once that is done, an optimal carbon price is estimated
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The optimal carbon price.
"""
guess, weights = exp_guess_and_weights(prices, emissions, budget)
sol, _ = curve_fit(
exponential_fun, emissions, prices, guess, sigma=weights, absolute_sigma=True
)
new_price = exponential_fun(budget, *sol)
return new_price
[docs]
def exp_guess_and_weights(
prices: np.ndarray, emissions: np.ndarray, budget: int
) -> tuple:
"""Estimates initial values for the exponential fitting algorithm and the weights.
The points closest to the budget are used to estimate the initial guess. They also
have the highest weight.
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The initial guess and weights
"""
weights = np.abs(emissions - budget)
idx = np.argsort(weights)
p = prices[idx][:2]
e = emissions[idx][:2]
b = 1 / e[0]
e = np.exp(-b * e)
den = e[1] - e[0]
num = p[1] - p[0]
if den != 0:
a = num / den
c = p[0] - a * e[0]
else:
a = 0
c = p[0]
return (a, b, c), weights
[docs]
@register_carbon_budget_method
def bisection(
market: xr.Dataset,
sectors: list,
equilibrium: Callable[
[xr.Dataset, Sequence[AbstractSector], int], FindEquilibriumResults
],
carbon_budget: xr.DataArray,
carbon_price: xr.DataArray,
commodities: list,
sample_size: int = 2,
refine_price: bool = True,
price_too_high_threshold: float = 10,
fitter: Text = "slinear",
) -> float:
"""Applies bisection algorithm to escalate carbon price and meet the budget.
A carbon market is meant as a pool of emissions for all
the modelled regions; therefore, the carbon price applies to all modelled regions.
Bisection applies an iterative estimations of the emissions
varying the carbon price until convergence or stop criteria
are met.
Builds on 'register_carbon_budget_method'.
Arguments:
market: Market, with the prices, supply, consumption and demand,
sectors: List of sectors,
equilibrium: Method for searching market equilibrium,
carbon_budget: DataArray with the carbon budget,
carbon_price: DataArray with the carbon price,
commodities: List of carbon-related commodities,
sample_size: Number of iterations for bisection,
refine_price: Boolean to decide on whether carbon price should be refined,
price_too_high_threshold: Threshold to decide what is a price too high.
fitter: Interpolation method.
Returns:
Value of global carbon price
"""
# We estimate carbon price, and emission threshold in forecast year
current = carbon_price.year.min() + sectors[-1].forecast
future = market.year[-1]
threshold = carbon_budget.sel(year=future).values
price = market.prices.sel(year=future, commodity=commodities).mean().values
# We create a 2-price sample being lower and upper bound of bisection
time_exp = max(0, (int(future - current)))
small = round((1 + 0.01) ** time_exp, 4)
large = round((1 + 0.02) ** time_exp, 4)
sample_prices = price * np.linspace(small, large, 2, endpoint=True)
# Out of the sample_prices, we apply bisection on max and min
# carbon prices estimated
low0 = round(min(sample_prices), 7)
up0 = round(max(sample_prices), 7)
# We create a 2-price sample being lower and upper bound of bisection
lb = bisect_loop(market, sectors, equilibrium, commodities, up0)
ub = bisect_loop(market, sectors, equilibrium, commodities, low0)
# Start bisection loop over a number of iterations
# sample_size is in this method used to determine
# the number of iterations
niter = sample_size
for n in range(niter):
if refine_price:
# We apply a cap if carbon price beyond threshold
if max(sample_prices) > price_too_high_threshold:
price_too_high_threshold = round(
price_too_high_threshold * (1 + 0.1) ** time_exp, 7
)
up0 = round(min(max(sample_prices), price_too_high_threshold), 7)
low0 = round(
min(min(sample_prices), price_too_high_threshold, up0 * 0.9), 7
)
lb = bisect_loop(market, sectors, equilibrium, commodities, up0)
ub = bisect_loop(market, sectors, equilibrium, commodities, low0)
if low0 == up0:
new_price = low0
break
if lb == threshold:
new_price = up0
break
elif ub == threshold:
new_price = low0
break
else:
low, up = min_max_bisect(
low0,
lb,
up0,
ub,
market,
sectors,
equilibrium,
commodities,
threshold, # type: ignore
)
# Exit loop if low and upper bound on emissions are close
if abs(low - up) <= 0.001:
new_price = round((low + up) / 2.0, 7)
break
# Exit loop if low and upper bound on emissions are close to threshold
elif abs(ub - threshold) <= abs(0.1 * threshold):
new_price = low
break
elif abs(lb - threshold) <= abs(0.1 * threshold):
new_price = up
break
# Call bisect_loop function and update up and lb prices
if low != low0:
low0 = low
ub = bisect_loop(market, sectors, equilibrium, commodities, low)
if up != up0:
up0 = up
lb = bisect_loop(market, sectors, equilibrium, commodities, up)
new_price = round((low + up) / 2.0, 7)
# We apply a minimum value greater than zero with a negative carbon price
if new_price <= 0:
new_price = 1e-2
return new_price
[docs]
def min_max_bisect(
low: float,
lb: float,
up: float,
ub: float,
market: xr.Dataset,
sectors: list,
equilibrium: Callable[
[xr.Dataset, Sequence[AbstractSector], int], FindEquilibriumResults
],
commodities: list,
threshold: float,
):
"""Refines bisection algorithm to escalate carbon price and meet the budget.
As emissions can be a discontinuous function of the carbon price, this method is
used to improve the solution search when discontinuities are met, improving the
bounds search.
Arguments:
low: Value of carbon price at lower bound,
lb: Value of emissions at lower bound,
up: Value of carbon price at upper bound,
ub: Value of emissions at upper bound,
market: Market, with the prices, supply, consumption and demand,
sectors: List of sectors,
equilibrium: Method for searching market equilibrium,
commodities: List of carbon-related commodities,
sample_size: Number of iterations for bisection,
refine_price: Boolean to decide on whether carbon price should be refined,
threshold: Threshold to decide what is a price too high.
Returns:
Value of lower and upper global carbon price
"""
denominator = threshold if threshold != 0.0 else 1e-3
if lb < threshold and ub < threshold:
# ub too small -> decrease low
# negative exponent (-(threshold - ub)) < 1
pow = -(threshold - lb) / abs(denominator)
pow = pow if pow < 1 else 1
pow = pow if pow > -1 else -1
up = low if ub > lb else up
low = low * np.exp(pow)
low = low if low > 0.0 else 1e-3
if ub > threshold and lb > threshold:
# lb too big -> increase up
# positive exponent (-(threshold - lb)) < 1
pow = -2 * (threshold - lb) / abs(denominator)
pow = pow if pow < 1 else 1
pow = pow if pow > -2 else -2
low = up if lb < ub else low
up = up * np.exp(pow)
up = up if up > 0.0 else 1e-3
if ub > threshold and lb < threshold:
midpoint = round((low + up) / 2.0, 7)
m = bisect_loop(market, sectors, equilibrium, commodities, midpoint)
if m < threshold:
# midpoint is smaller than lb
# lb is a function of up price
up = midpoint
else:
# midpoint is smaller than ub
# ub is a function of low price
low = midpoint
if ub < threshold and lb > threshold:
"Inverted bounds"
low1 = up
up = low
low = low1
if m < threshold:
# midpoint is smaller than lb
# lb is a function of up price
up = midpoint
else:
# midpoint is smaller than ub
# ub is a function of low price
low = midpoint
return low, up
[docs]
def bisect_loop(
market: xr.Dataset,
sectors: list,
equilibrium: Callable[
[xr.Dataset, Sequence[AbstractSector], int], FindEquilibriumResults
],
commodities: list,
new_price: float,
) -> float:
"""Calls market equilibrium iteration in bisection.
This updates emissions during iterations.
Arguments:
market: Market, with the prices, supply, consumption and demand,
sectors: List of sectors,
equilibrium: Method for searching market equilibrium,
commodities: List of carbon-related commodities,
new_price: New carbon price from bisection,
Returns:
Emissions estimated at the new carbon price.
"""
future = market.year[-1]
new_market = market.copy(deep=True)
# Assign new carbon price
new_market.prices.loc[{"year": future, "commodity": commodities}] = new_price
new_market = equilibrium(new_market, sectors, 1).market
new_emissions = (
new_market.supply.sel(year=future, commodity=commodities)
.sum(["region", "timeslice", "commodity"])
.round(decimals=3)
).values
return new_emissions