Source code for muse.timeslices

"""Timeslice utility functions."""

__all__ = [
    "broadcast_timeslice",
    "compress_timeslice",
    "distribute_timeslice",
    "drop_timeslice",
    "expand_timeslice",
    "get_level",
    "read_timeslices",
    "setup_module",
    "sort_timeslices",
    "timeslice_max",
]

from collections.abc import Mapping, Sequence

import numpy as np
import pandas as pd
from xarray import DataArray

from muse.utilities import broadcast_regions, broadcast_years

TIMESLICE: DataArray = None  # type: ignore


def read_timeslices(
    settings: Mapping | str,
    level_names: Sequence[str] = ("month", "day", "hour"),
) -> DataArray:
    from functools import reduce
    from logging import getLogger

    from toml import loads

    # Read timeslice settings
    if isinstance(settings, str):
        settings = loads(settings)
    settings = dict(**settings.get("timeslices", settings))

    # Legacy: warn user about deprecation of "aggregates" feature (#550)
    if "aggregates" in settings:
        msg = (
            "Timeslice aggregation has been deprecated since v1.3.0. Please see the "
            "release notes for that version for more information."
        )
        getLogger(__name__).warning(msg)
        settings.pop("aggregates")

    # Extract level names
    if "level_names" in settings:
        level_names = settings.pop("level_names")

    # Extract timeslice levels and lengths
    ts = list(settings.values())
    levels: list[tuple] = [(level,) for level in settings]
    while all(isinstance(v, Mapping) for v in ts):
        levels = [(*previous, b) for previous, a in zip(levels, ts) for b in a]
        ts = reduce(list.__add__, (list(u.values()) for u in ts), [])

    # Prepare multiindex
    nln = min(len(levels[0]), len(level_names))
    level_names = (
        list(level_names[:nln]) + [str(i) for i in range(len(levels[0]))][nln:]
    )
    indices = pd.MultiIndex.from_tuples(levels, names=level_names)

    # Make sure names from different levels don't overlap
    if any(
        reduce(set.union, indices.levels[:i], set()).intersection(indices.levels[i])
        for i in range(1, indices.nlevels)
    ):
        raise ValueError("Names from different levels should not overlap.")

    # Create DataArray
    return DataArray(ts, coords={"timeslice": indices}, dims="timeslice")


[docs] def setup_module(settings: str | Mapping): """Sets up module singletons.""" global TIMESLICE TIMESLICE = read_timeslices(settings)
[docs] def broadcast_timeslice( data: DataArray, ts: DataArray | None = None, level: str | None = None ) -> DataArray: """Convert a non-timesliced array to a timesliced array by broadcasting. If data is already timesliced in the appropriate scheme, it will be returned unchanged. Args: data: Array to broadcast. ts: Dataarray with timeslice weights. If None, defaults to the global timeslice. level: Level to broadcast to. If None, use the finest level of ts. """ from xarray import Coordinates if ts is None: ts = TIMESLICE if level is not None: ts = compress_timeslice(ts, ts=ts, level=level, operation="sum") # If data already has timeslices, check that it matches the reference timeslice. if "timeslice" in data.dims: if data.timeslice.reset_coords(drop=True).equals( ts.timeslice.reset_coords(drop=True) ): return data raise ValueError( "Data is already timesliced, but does not match the reference." ) mindex_coords = Coordinates.from_pandas_multiindex(ts.timeslice, "timeslice") broadcasted = data.expand_dims(timeslice=ts["timeslice"]).assign_coords( mindex_coords ) return broadcasted
[docs] def distribute_timeslice( data: DataArray, ts: DataArray | None = None, level=None ) -> DataArray: """Convert a non-timesliced array to a timesliced array by distribution. Takes non-timesliced data and distributes it over the timeslice dimension according to the timeslice weights in `ts`. The sum of the output over all timeslices will be equal to the input. If data is already timesliced in the appropriate scheme, it will be returned unchanged. Args: data: Array to distribute. ts: Dataarray with timeslice weights. If None, defaults to the global timeslice. level: Level to distribute to. If None, use the finest level of ts. """ if ts is None: ts = TIMESLICE if level is not None: ts = compress_timeslice(ts, ts=ts, level=level, operation="sum") # If data already has timeslices, check that it matches the reference timeslice. if "timeslice" in data.dims: if data.timeslice.reset_coords(drop=True).equals( ts.timeslice.reset_coords(drop=True) ): return data raise ValueError( "Data is already timesliced, but does not match the reference." ) broadcasted = broadcast_timeslice(data, ts=ts) timeslice_sum = ts.sum("timeslice").clip(1e-6) # prevents zero division timeslice_fractions = ts / broadcast_timeslice(timeslice_sum, ts=ts) if "region" in data.dims: timeslice_fractions = broadcast_regions(timeslice_fractions, data) if "year" in data.dims: timeslice_fractions = broadcast_years(timeslice_fractions, data) return broadcasted * timeslice_fractions
[docs] def compress_timeslice( data: DataArray, ts: DataArray | None = None, level: str | None = None, operation: str = "sum", ) -> DataArray: """Convert a fully timesliced array to a coarser level. The operation can be either 'sum', or 'mean': - sum: sum values at each compressed timeslice level - mean: take a weighted average of values at each compressed timeslice level, according to the timeslice weights in ts Args: data: Timesliced array to compress. Must have the same timeslicing as ts. ts: Dataarray with timeslice weights. If None, defaults to the global timeslice. level: Level to compress to. If None, don't compress. operation: Operation to perform ("sum" or "mean"). Defaults to "sum". """ if ts is None: ts = TIMESLICE # Raise error if data is not timesliced appropriately if "timeslice" not in data.dims: raise ValueError("Data must have a 'timeslice' dimension.") if not data.timeslice.reset_coords(drop=True).equals( ts.timeslice.reset_coords(drop=True) ): raise ValueError("Data has incompatible timeslicing with reference.") # If level is not specified, don't compress if level is None: return data # level must be a valid timeslice level x_levels = data.timeslice.to_index().names if level not in x_levels: raise ValueError(f"Unknown level: {level}. Must be one of {x_levels}.") # Return data unchanged if already at the desired level if get_level(data) == level: return data # Prepare mask idx = x_levels.index(level) kept_levels, compressed_levels = x_levels[: idx + 1], x_levels[idx + 1 :] mask = ts.unstack(dim="timeslice") if operation == "sum": mask = mask.where(np.isnan(mask), 1) elif operation == "mean": mask = mask / mask.sum(compressed_levels) else: raise ValueError(f"Unknown operation: {operation}. Must be 'sum' or 'mean'.") # Perform the operation result = ( (data.unstack(dim="timeslice") * mask) .sum(compressed_levels) .stack(timeslice=kept_levels) ) return sort_timeslices(result, ts)
[docs] def expand_timeslice( data: DataArray, ts: DataArray | None = None, operation: str = "distribute" ) -> DataArray: """Convert a timesliced array to a finer level. The operation can be either 'distribute', or 'broadcast' - distribute: distribute values over the new timeslice level(s) according to timeslice weights in ``ts``, such that the sum of the output over all timeslices is equal to the sum of the input - broadcast: broadcast values across over the new timeslice level(s) Args: data: Timesliced array to expand. ts: Dataarray with timeslice weights. If None, defaults to the global timeslice. operation: Operation to perform ("distribute" or "broadcast"). Defaults to "distribute". """ if ts is None: ts = TIMESLICE # Raise error if data is not timesliced if "timeslice" not in data.dims: raise ValueError("Data must have a 'timeslice' dimension.") # Get level names ts_levels = ts.timeslice.to_index().names x_levels = data.timeslice.to_index().names # Raise error if x_levels is not a subset of ts_levels if not set(x_levels).issubset(ts_levels): raise ValueError( "Data has incompatible timeslicing with reference. " f"Timeslice levels of data ({x_levels}) must be a subset of ts " f"({ts_levels})." ) # Return data unchanged if already at the desired level finest_level = get_level(ts) current_level = get_level(data) if current_level == finest_level: return data # Prepare mask mask = ts.unstack(dim="timeslice") if operation == "broadcast": mask = mask.where(np.isnan(mask), 1) elif operation == "distribute": mask = mask / mask.sum(ts_levels[ts_levels.index(current_level) + 1 :]) else: raise ValueError( f"Unknown operation: {operation}. Must be 'distribute' or 'broadcast'." ) # Perform the operation result = ( (data.unstack(dim="timeslice") * mask) .stack(timeslice=ts_levels) .dropna("timeslice") ) return sort_timeslices(result, ts)
[docs] def drop_timeslice(data: DataArray) -> DataArray: """Drop the timeslice variable from a DataArray. If the array doesn't contain the timeslice variable, return the input unchanged. """ if "timeslice" not in data.dims: return data return data.drop_vars(data.timeslice.indexes)
[docs] def get_level(data: DataArray) -> str: """Get the timeslice level of a DataArray.""" if "timeslice" not in data.dims: raise ValueError("Data does not have a 'timeslice' dimension.") return data.timeslice.to_index().names[-1]
[docs] def sort_timeslices(data: DataArray, ts: DataArray | None = None) -> DataArray: """Sorts the timeslices of a DataArray according to a reference timeslice. This will only sort timeslices to match the reference if the data is at the same timeslice level as the reference. Otherwise, it will sort timeslices in alphabetical order. Args: data: Timesliced DataArray to sort. ts: Dataarray with reference timeslices in the appropriate order """ if ts is None: ts = TIMESLICE # If data is at the same timeslice level as ts, sort timeslices according to ts if get_level(data) == get_level(ts): return data.sel(timeslice=ts.timeslice) # Otherwise, sort timeslices in alphabetical order return data.sortby("timeslice")
[docs] def timeslice_max(data: DataArray, ts: DataArray | None = None) -> DataArray: """Find the max value over the timeslice dimension, normalized for timeslice length. This first annualizes the value in each timeslice by dividing by the fraction of the year that the timeslice occupies, then takes the maximum value Args: data: Timesliced DataArray to find the max of. ts: Dataarray with relative timeslice lengths. If None, defaults to the global timeslice. """ if ts is None: ts = TIMESLICE timeslice_level = get_level(data) timeslice_sum = ts.sum("timeslice").clip(1e-6) # prevents zero division timeslice_fractions = compress_timeslice( ts, ts=ts, level=timeslice_level, operation="sum" ) / broadcast_timeslice(timeslice_sum, ts=ts, level=timeslice_level) return (data / timeslice_fractions).max("timeslice")