from collections.abc import MutableMapping, Sequence
from logging import getLogger
from typing import Callable
import numpy as np
import xarray as xr
from scipy.optimize import curve_fit
from muse.mca import FindEquilibriumResults
from muse.registration import registrator
CARBON_BUDGET_METHODS_SIGNATURE = Callable[
[xr.Dataset, Callable, xr.DataArray, list], float
]
"""carbon budget fitters signature."""
CARBON_BUDGET_FITTERS_SIGNATURE = Callable[[np.ndarray, np.ndarray, int], float]
"""carbon budget fitters signature."""
CARBON_BUDGET_METHODS: MutableMapping[str, CARBON_BUDGET_METHODS_SIGNATURE] = {}
"""Dictionary of carbon budget methods checks."""
CARBON_BUDGET_FITTERS: MutableMapping[str, CARBON_BUDGET_FITTERS_SIGNATURE] = {}
"""Dictionary of carbon budget fitters."""
[docs]
@registrator(registry=CARBON_BUDGET_FITTERS, loglevel="debug")
def register_carbon_budget_fitter(function: CARBON_BUDGET_FITTERS_SIGNATURE = None):
"""Decorator to register a carbon budget function."""
return function
[docs]
@registrator(registry=CARBON_BUDGET_METHODS, loglevel="debug")
def register_carbon_budget_method(function: CARBON_BUDGET_METHODS_SIGNATURE = None):
"""Decorator to register a carbon budget function."""
return function
[docs]
def update_carbon_budget(
carbon_budget: Sequence[float],
emissions: float,
year_idx: int,
over: bool = True,
under: bool = True,
) -> float:
"""Adjust the carbon budget in the far future if emissions too high or low.
This feature can allow to simulate overshoot shifting.
Arguments:
carbon_budget: budget for future year,
emissions: emission for future year,
year_idx: index of year for estimation,
over: if True, allows overshoot,
under: if True, allows undershoot.
Returns:
An adjusted threshold for the future year
"""
delta = emissions - carbon_budget[year_idx + 1]
new_future_budget = carbon_budget[year_idx + 2]
if over:
new_future_budget -= max(delta, 0)
if under:
new_future_budget -= min(delta, 0)
return new_future_budget
[docs]
@register_carbon_budget_method
def fitting(
market: xr.Dataset,
equilibrium: Callable[[xr.Dataset], FindEquilibriumResults],
carbon_budget: xr.DataArray,
commodities: list,
refine_price: bool = False,
price_too_high_threshold: float = 10,
sample_size: int = 5,
fitter: str = "linear",
resolution: int = 2,
) -> float:
"""Used to solve the carbon market.
Given the emission of a period, adjusts carbon price to meet the budget. A carbon
market is meant as a pool of emissions for all the modelled regions; therefore, the
carbon price applies to all modelled regions. The method solves an equation applying
a fitting of the emission-carbon price relation.
Arguments:
market: Market, with the prices, supply, and consumption
equilibrium: Method for searching market equilibrium
carbon_budget: limit on emissions
commodities: list of commodities to limit (ie. emissions)
refine_price: Boolean to decide on whether carbon price should be capped, with
the upper bound given by price_too_high_threshold
price_too_high_threshold: threshold on carbon price
sample_size: sample size for fitting
fitter: method to fit emissions with carbon price
resolution: Number of decimal places to solve the carbon price to
Returns:
Adjusted carbon price to meet budget
"""
# Calculate the carbon price and emissions threshold in the investment year
future = market.year[-1]
threshold = carbon_budget.sel(year=future).values.item()
price = market.prices.sel(year=future, commodity=commodities).mean().values.item()
# Solve market with current carbon price
emissions = solve_market(market, equilibrium, commodities, price)
# Create a sample of prices at which we want to calculate emissions
sample_prices = create_sample(price, emissions, threshold, sample_size)
sample_emissions = np.zeros_like(sample_prices)
sample_emissions[0] = emissions
# For each sample price, we calculate the new emissions
for i, new_price in enumerate(sample_prices[1:]):
sample_emissions[i + 1] = solve_market(
market, equilibrium, commodities, new_price
)
# Based on these results, we finally adjust the carbon price
new_price = CARBON_BUDGET_FITTERS[fitter](
sample_prices,
sample_emissions,
threshold, # type: ignore
)
# Cap price between 0.0 and price_too_high_threshold
if refine_price:
new_price = min(new_price, price_too_high_threshold)
new_price = max(new_price, 0.0)
return round(new_price, resolution)
def linear_fun(x, a, b):
return a + b * x
def exponential_fun(x, a, b, c):
return a * np.exp(-b * x) + c
[docs]
def create_sample(carbon_price, current_emissions, budget, size=4):
"""Calculates a sample of carbon prices to estimate the adjusted carbon price.
For each of these prices, the equilibrium loop will be run, obtaining a new value
for the emissions. Out of those price-emissions pairs, the final carbon price will
be estimated.
Arguments:
carbon_price: Current carbon price,
current_emissions: Current emissions,
budget: Carbon budget,
size: Number of points in the sample.
Returns:
An array with the sample prices.
"""
exponent = (current_emissions - budget) / budget
magnitude = max(1 - np.exp(-exponent), -0.1)
sample = carbon_price * (1 + np.linspace(0, 1, size) * magnitude)
return np.abs(sample)
[docs]
@register_carbon_budget_fitter
def linear(prices: np.ndarray, emissions: np.ndarray, budget: int) -> float:
"""Fits the prices-emissions pairs to a linear function.
Once that is done, an optimal carbon price is estimated
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The optimal carbon price.
"""
guess, weights = linear_guess_and_weights(prices, emissions, budget)
sol, _ = curve_fit(
linear_fun, emissions, prices, guess, sigma=weights, absolute_sigma=True
)
new_price = linear_fun(budget, *sol)
return new_price
[docs]
def linear_guess_and_weights(
prices: np.ndarray, emissions: np.ndarray, budget: int
) -> tuple:
"""Estimates initial values for the linear fitting algorithm and the weights.
The points closest to the budget are used to estimate the initial guess. They also
have the highest weight.
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The initial guess and weights
"""
weights = np.abs(emissions - budget)
idx = np.argsort(weights)
p = prices[idx][:2]
e = emissions[idx][:2]
den = e[1] - e[0]
num = p[1] - p[0]
if den != 0:
b = num / den
a = p[0] - b * e[0]
else:
b = 0
a = p[0]
return (a, b), weights
[docs]
@register_carbon_budget_fitter
def exponential(prices: np.ndarray, emissions: np.ndarray, budget: int) -> float:
"""Fits the prices-emissions pairs to an exponential function.
Once that is done, an optimal carbon price is estimated
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The optimal carbon price.
"""
guess, weights = exp_guess_and_weights(prices, emissions, budget)
sol, _ = curve_fit(
exponential_fun, emissions, prices, guess, sigma=weights, absolute_sigma=True
)
new_price = exponential_fun(budget, *sol)
return new_price
[docs]
def exp_guess_and_weights(
prices: np.ndarray, emissions: np.ndarray, budget: int
) -> tuple:
"""Estimates initial values for the exponential fitting algorithm and the weights.
The points closest to the budget are used to estimate the initial guess. They also
have the highest weight.
Arguments:
prices: An array with the sample carbon prices,
emissions: An array with the corresponding emissions,
budget: The carbon budget for the time period.
Returns:
The initial guess and weights
"""
weights = np.abs(emissions - budget)
idx = np.argsort(weights)
p = prices[idx][:2]
e = emissions[idx][:2]
b = 1 / e[0]
e = np.exp(-b * e)
den = e[1] - e[0]
num = p[1] - p[0]
if den != 0:
a = num / den
c = p[0] - a * e[0]
else:
a = 0
c = p[0]
return (a, b, c), weights
[docs]
@register_carbon_budget_method
def bisection(
market: xr.Dataset,
equilibrium: Callable[[xr.Dataset], FindEquilibriumResults],
carbon_budget: xr.DataArray,
commodities: list,
refine_price: bool = False,
price_too_high_threshold: float = 10,
max_iterations: int = 5,
tolerance: float = 0.1,
early_termination_count: int = 5,
resolution: int = 2,
price_penalty: float = 0.1,
) -> float:
"""Applies bisection algorithm to escalate carbon price and meet the budget.
A carbon market is meant as a pool of emissions for all
the modelled regions; therefore, the carbon price applies to all modelled regions.
Bisection applies an iterative estimations of the emissions
varying the carbon price until convergence or stop criteria
are met.
Builds on 'register_carbon_budget_method'.
Arguments:
market: Market, with the prices, supply, consumption and demand
equilibrium: Method for searching market equilibrium
carbon_budget: DataArray with the carbon budget
commodities: List of carbon-related commodities
refine_price: Boolean to decide on whether carbon price should be capped, with
the upper bound given by price_too_high_threshold
price_too_high_threshold: Upper limit for carbon price
max_iterations: Maximum number of iterations for bisection
tolerance: Maximum permitted deviation of emissions from the budget
early_termination_count: Will terminate the loop early if the last n solutions
are the same
resolution: Number of decimal places to solve the carbon price to
price_penalty: Penalty factor applied to carbon price when selecting optimal
solution when convergence isn't reached. Higher values favor lower prices
when emissions are similar.
Returns:
New value of global carbon price
"""
# Create cache for emissions at different price points
emissions_cache = EmissionsCache(market, equilibrium, commodities)
# Carbon price and emissions threshold in the investment year
future = market.year[-1]
target = carbon_budget.sel(year=future).values.item()
price = market.prices.sel(year=future, commodity=commodities).mean().values.item()
# Test if emissions are already below the budget without imposing a carbon price
if emissions_cache[0.0] < target:
message = (
f"Emissions for the year {int(future)} are already below the carbon budget "
"without imposing a carbon price. The carbon price has been set to zero."
)
getLogger(__name__).warning(message)
return 0.0
# Initial lower and upper bounds on carbon price for the bisection algorithm
lb_price = 0.0
epsilon = 10**-resolution # smallest nonzero price
ub_price = round(max(price, epsilon), resolution) # i.e. current price
# Bisection loop
for _ in range(max_iterations): # maximum number of iterations before terminating
# Cap prices between 0.0 and price_too_high_threshold
if refine_price:
ub_price = min(ub_price, price_too_high_threshold)
lb_price = max(lb_price, 0.0)
# Calculate/retrieve carbon emissions at new bounds
lb_price_emissions = emissions_cache[lb_price]
ub_price_emissions = emissions_cache[ub_price]
# Terminate early if many consecutive emissions are the same
if len(emissions_cache) >= early_termination_count + 2:
recent_values = list(emissions_cache.values())[-early_termination_count:]
if all(x == recent_values[0] for x in recent_values):
break
# Exit loop if lower or upper bound on emissions is close to threshold
if abs(lb_price_emissions - target) <= abs(tolerance * target):
return lb_price
if abs(ub_price_emissions - target) <= abs(tolerance * target):
return ub_price
# Convergence not yet reached -> calculate new bounds
lb_price, ub_price = adjust_bounds(
lb_price,
ub_price,
emissions_cache,
target,
resolution,
)
# If convergence isn't reached, select the price with lowest emissions
# We apply a small penalty to the carbon price to keep prices low if there are
# solutions with equal (or almost equal) emissions.
new_price = min(
emissions_cache.keys(),
key=lambda price: emissions_cache[price] + (price * price_penalty),
)
# Output the new price to the log
getLogger(__name__).info(
f"Final carbon price for {future.item()}: {new_price} "
f"(emissions = {emissions_cache[new_price]})"
)
# Raise warning message
if all(emissions_cache[k] > target for k in emissions_cache):
message = (
f"Carbon budget could not be met for the year {int(future)} "
f"(budget: {target}, emissions: {emissions_cache[new_price]}). "
"This may be because there are no processes available that can meet the "
"budget, or because emissions from capacity installed earlier in the time "
"horizon is preventing the budget from being met. "
"The CO2 price in this year should be interpreted with caution."
)
else:
message = (
f"Carbon budget could not be matched for the year {int(future)} to within "
"the specified tolerance. "
"This is sometimes unavoidable due to a discontinuous emissions landscape "
"which can make the budget unreachable, but can sometimes be "
"fixed by increasing max_iterations, early_termination_count or resolution."
)
getLogger(__name__).warning(message)
return new_price
[docs]
class EmissionsCache(dict):
"""Cache of emissions at different price points for bisection algorithm.
If a price is queried that is not in the cache, it calculates the emissions at that
price using solve_market and stores the result in the cache.
"""
def __init__(self, market, equilibrium, commodities):
super().__init__()
self.market = market
self.equilibrium = equilibrium
self.commodities = commodities
def __missing__(self, price):
value = solve_market(self.market, self.equilibrium, self.commodities, price)
self[price] = value
return value
[docs]
def adjust_bounds(
lb_price: float,
ub_price: float,
emissions_cache: dict[float, float],
target: float,
resolution: int = 2,
) -> tuple[float, float]:
"""Adjust the bounds of the carbon price for the bisection algorithm.
As emissions can be a discontinuous function of the carbon price, this method is
used to improve the solution search when discontinuities are met, improving the
bounds search.
Arguments:
lb_price: Value of carbon price at lower bound
ub_price: Value of carbon price at upper bound
emissions_cache: Dictionary of emissions at different price points
target: Carbon budget
resolution: Number of decimal places to solve the carbon price to
Returns:
New lower and upper bounds for the carbon price.
"""
lb_price_emissions = emissions_cache[lb_price]
ub_price_emissions = emissions_cache[ub_price]
if lb_price_emissions < target and ub_price_emissions < target:
# Emissions too low at both prices -> decrease prices
method = decrease_bounds
elif lb_price_emissions > target and ub_price_emissions > target:
# Emissions too high at both prices -> increase prices
method = increase_bounds
elif lb_price_emissions > target and ub_price_emissions < target:
# Threshold is between bounds -> perform bisection
method = bisect_bounds
elif lb_price_emissions < target and ub_price_emissions > target:
# Inverted bounds (i.e. increasing price leads to increasing emissions)
# Unlikely case, but included for completeness
method = bisect_bounds_inverted
else:
# lb_price_emissions or ub_price_emissions is equal to the target, so we can
# return the bounds unchanged
return lb_price, ub_price
return method(
lb_price,
ub_price,
emissions_cache,
target,
resolution,
)
[docs]
def decrease_bounds(
lb_price: float,
ub_price: float,
emissions_cache: dict[float, float],
target: float,
resolution: int = 2,
) -> tuple[float, float]:
"""Decreases the lb of the carbon price, and sets the ub to the previous lb."""
denominator = max(target, 1e-3)
lb_price_emissions = emissions_cache[lb_price]
exponent = (lb_price_emissions - target) / abs(denominator) # will be negative
exponent = max(exponent, -1) # cap exponent at -1
exponent = min(exponent, -0.1) # upper bound cap to force exploration
ub_price = lb_price
lb_price = round(lb_price * np.exp(exponent), resolution)
if lb_price == ub_price:
lb_price -= 10**-resolution
return lb_price, ub_price
[docs]
def increase_bounds(
lb_price: float,
ub_price: float,
emissions_cache: dict[float, float],
target: float,
resolution: int = 2,
) -> tuple[float, float]:
"""Increases the ub of the carbon price, and sets the lb to the previous ub."""
denominator = max(target, 1e-3)
ub_price_emissions = emissions_cache[ub_price]
exponent = (ub_price_emissions - target) / abs(denominator) # will be positive
exponent = min(exponent, 1) # cap exponent at 1
exponent = max(exponent, 0.1) # lower bound cap to force exploration
lb_price = ub_price
ub_price = round(ub_price * np.exp(exponent), resolution)
if ub_price == lb_price:
ub_price += 10**-resolution
return lb_price, ub_price
[docs]
def bisect_bounds(
lb_price: float,
ub_price: float,
emissions_cache: dict[float, float],
target: float,
resolution: int = 2,
) -> tuple[float, float]:
"""Bisects the bounds of the carbon price."""
midpoint = round((lb_price + ub_price) / 2.0, resolution)
midpoint_emissions = emissions_cache[midpoint]
if midpoint_emissions < target:
ub_price = midpoint
else:
lb_price = midpoint
return lb_price, ub_price
[docs]
def bisect_bounds_inverted(
lb_price: float,
ub_price: float,
emissions_cache: dict[float, float],
target: float,
resolution: int = 2,
) -> tuple[float, float]:
"""Bisects the bounds of the carbon price, in the case of inverted bounds."""
midpoint = round((lb_price + ub_price) / 2.0, resolution)
midpoint_emissions = emissions_cache[midpoint]
if midpoint_emissions > target:
ub_price = midpoint
else:
lb_price = midpoint
return lb_price, ub_price
[docs]
def solve_market(
market: xr.Dataset,
equilibrium: Callable[[xr.Dataset], FindEquilibriumResults],
commodities: list,
carbon_price: float,
) -> float:
"""Solves the market with a new carbon price and returns the emissions.
Arguments:
market: Market, with the prices, supply, consumption and demand
equilibrium: Method for searching market equilibrium
commodities: List of carbon-related commodities
carbon_price: New carbon price
Returns:
Emissions at the new carbon price.
"""
future = market.year[-1]
new_market = market.copy(deep=True)
# Assign new carbon price and solve market
new_market.prices.loc[{"year": future, "commodity": commodities}] = carbon_price
new_market = equilibrium(new_market).market
new_emissions = (
new_market.supply.sel(year=future, commodity=commodities)
.sum(["region", "timeslice", "commodity"])
.round(decimals=2)
).values.item()
# Log the emissions
getLogger(__name__).info(
f"Emissions in {future.item()} with carbon price {carbon_price}: "
f"{new_emissions}"
)
return new_emissions