"""Ensemble of functions to read MUSE data."""
__all__ = ["read_settings"]
import importlib.util as implib
from collections import namedtuple
from copy import deepcopy
from logging import getLogger
from pathlib import Path
from typing import (
IO,
Any,
Dict,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Text,
Tuple,
Union,
)
import numpy as np
import pandas as pd
import xarray as xr
from muse.decorators import SETTINGS_CHECKS, register_settings_check
from muse.defaults import DATA_DIRECTORY, DEFAULT_SECTORS_DIRECTORY
DEFAULT_SETTINGS_PATH = DATA_DIRECTORY / "default_settings.toml"
"""Default settings path."""
class InputError(Exception):
"""Root for TOML input errors."""
class MissingSettings(InputError):
"""Error when an input is missing."""
class IncorrectSettings(InputError):
"""Error when an input exists but is incorrect."""
def convert(dictionary):
"""Converts a dictionary (with nested ones) to a nametuple."""
for key, value in dictionary.items():
if isinstance(value, Dict):
dictionary[key] = convert(value)
return namedtuple("MUSEOptions", dictionary.keys())(**dictionary)
def undo_damage(nt):
"""Unconvert nested nametuple."""
if not hasattr(nt, "_asdict"):
return nt
result = nt._asdict()
for key, value in result.items():
result[key] = undo_damage(value)
return result
class FormatDict(dict):
"""Allows partial formatting of a string."""
def __missing__(self, key):
return FormatDict.FormatPlaceholder(key)
class FormatPlaceholder:
def __init__(self, key):
self.key = key
def __format__(self, spec):
result = f"{self.key}:{spec}" if spec else self.key
return f"{{{result}}}"
def format_path(
filepath: Text,
replacements: Optional[Mapping] = None,
path: Optional[Union[Text, Path]] = None,
cwd: Optional[Union[Text, Path]] = None,
muse_sectors: Optional[Text] = None,
):
"""Replaces known patterns in a path.
Unknown patterns are left alone. This allows downstream object factories to format
the paths according to their own specifications.
"""
from string import Formatter
patterns = FormatDict(
{
**{
"cwd": Path("" if cwd is None else cwd).absolute(),
"muse_sectors": Path(
DEFAULT_SECTORS_DIRECTORY if muse_sectors is None else muse_sectors
).absolute(),
"path": Path("" if path is None else path).absolute(),
},
**({} if replacements is None else replacements),
}
)
formatter = Formatter()
return str(Path(formatter.vformat(str(filepath), (), patterns)).absolute())
def format_paths(
settings: Mapping,
replacements: Optional[Mapping] = None,
path: Optional[Union[Text, Path]] = None,
cwd: Optional[Union[Text, Path]] = None,
muse_sectors: Optional[Text] = None,
suffixes: Sequence[Text] = (".csv", ".nc", ".xls", ".xlsx", ".py", ".toml"),
):
"""Format paths passed to settings.
A setting is recognized as a path if it's name ends in `_path`, `_file`, or `_dir`,
or if the associated value is text object and ends with `.csv, as well as settings
called `path`.
Paths are first formatted using the input replacement keywords. These replacements
will include "cwd" and "sectors" by default. For simplicity, any item called `path`
is considered first in any dictionary, and then used within that dictionary and
nested dictionaries.
Examples:
Starting from a simple example, we see `data_path` has been modified to point to
the current working directory:
>>> from pathlib import Path
>>> from muse.readers.toml import format_paths
>>> a = format_paths({"a_path": "{cwd}/a/b/c"})
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_path"]
True
Or it can be modified to point to the default locations for sectorial data:
>>> from muse.defaults import DEFAULT_SECTORS_DIRECTORY
>>> a = format_paths({"a_path": "{muse_sectors}/a/b/c"})
>>> str(DEFAULT_SECTORS_DIRECTORY.absolute() / "a" / "b" / "c") == a["a_path"]
True
Similarly, if not given, `path` defaults to the current working directory:
>>> a = format_paths({"a_path": "{path}/a/b/c"})
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_path"]
True
However, it can be made to point to anything of interest:
>>> a = format_paths({"path": "{cwd}/a/b", "a_path": "{path}/c"})
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_path"]
True
Any property ending in `_path`, `_dir`, `_file`, or with a value that can be
interpreted as a path with suffix `.csv`, `.nc`, `.xls`, `.xlsx`, `.py` or
`.toml` is considered a path and transformed:
>>> a = format_paths({"path": "{cwd}/a/b", "a_dir": "{path}/c"})
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_dir"]
True
>>> a = format_paths({"path": "{cwd}/a/b", "a_file": "{path}/c"})
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_file"]
True
>>> a = format_paths({"path": "{cwd}/a/b", "a": "{path}/c.csv"})
>>> str(Path().absolute() / "a" / "b" / "c.csv") == a["a"]
True
>>> a = format_paths({"path": "{cwd}/a/b", "a": "{path}/c.toml"})
>>> str(Path().absolute() / "a" / "b" / "c.toml") == a["a"]
True
Finally, paths in nested directories are also processed:
>>> a = format_paths(
... {
... "path": "{cwd}/a/b",
... "nested": { "a_path": "{path}/c" }
... }
... )
>>> str(Path().absolute() / "a" / "b" / "c") == a["nested"]["a_path"]
True
Note that `path` points to the latest one:
>>> a = format_paths(
... {
... "path": "{cwd}/a/b",
... "a_path": "{path}/c",
... "nested": {
... "path": "{cwd}/toot/suite",
... "b_path": "{path}/c"
... }
... }
... )
>>> str(Path().absolute() / "a" / "b" / "c") == a["a_path"]
True
>>> str(Path().absolute() / "toot" / "suite" / "c") == a["nested"]["b_path"]
True
"""
import re
from pathlib import Path
patterns = {
**{
"cwd": Path("" if cwd is None else cwd).absolute(),
"muse_sectors": Path(
DEFAULT_SECTORS_DIRECTORY if muse_sectors is None else muse_sectors
).absolute(),
"path": Path("" if path is None else path).absolute(),
},
**({} if replacements is None else replacements),
}
def format(path: Text) -> Text:
if path.lower() in ("optional", "required"):
return path
return format_path(path, **patterns) # type: ignore
path_names = (
re.compile(r"_path$"),
re.compile("_dir$"),
re.compile("_file$"),
re.compile("filename"),
)
def is_a_path(key, value):
return any(re.search(x, key) is not None for x in path_names) or (
isinstance(value, Text) and Path(value).suffix in suffixes
)
path = format(settings.get("path", str(patterns["path"])))
patterns["path"] = path # type: ignore
result = dict(**settings)
if "path" in settings:
result["path"] = path
for key, value in result.items():
if is_a_path(key, value):
result[key] = format(value)
elif isinstance(value, Mapping):
result[key] = format_paths(value, patterns, path)
elif isinstance(value, List):
result[key] = [
format_paths(item, patterns, path)
if isinstance(item, Mapping)
else format_path(item, patterns, path)
if is_a_path("", item)
else item
for item in result[key]
]
return result
def read_split_toml(
tomlfile: Union[Text, Path, IO[Text], Mapping],
path: Optional[Union[Text, Path]] = None,
) -> MutableMapping:
"""Reads and consolidate TOML files.
Our TOML accepts as input sections that are farmed off to other files:
[some_section]
include_path = "path/to/included.toml"
[another_section]
option_a = "a"
The section `some_section` should contain only one item, `include_path`, giving the
path to the toml file to include. This file is then spliced into the original toml.
It **must** repeat the section that it splices. Hence if `included.toml` looks like:
[some_section]
some_option = "b"
[some_section.inner_section]
other_option = "c"
Then the spliced toml would look like:
[some_section]
some_option = "b"
[some_section.inner_section]
other_option = "c"
[another_section]
option_a = "a"
`included.toml` must contain a single section (possibly with inner options).
Anything else will result in an error:
This is an error:
outer_option = "b"
[some_section]
some_option = "b"
This is also an error:
[some_section]
some_option = "b"
[some_other_section]
some_other_option = "c"
Arguments:
tomlfile: path to the toml file. Can be any input to `toml.load`.
path: Root path when formatting path options. See `format_paths`.
"""
from toml import load
def splice_section(settings: Mapping):
settings = dict(**settings)
for key, section in settings.items():
if not isinstance(section, Mapping):
continue
if "include_path" in section and len(section) > 1:
raise IncorrectSettings(
"Sections with an `include_path` option "
"should contain only that option."
)
elif "include_path" in section:
inner = read_split_toml(section["include_path"], path=path)
if key not in inner:
raise MissingSettings(
f"Could not find section {key} in {section['include_path']}"
)
if len(inner) != 1:
raise IncorrectSettings(
"More than one section found in included"
f"file {section['include_path']}"
)
settings[key] = inner[key]
else:
settings[key] = splice_section(section)
return settings
toml = tomlfile if isinstance(tomlfile, Mapping) else load(tomlfile)
settings = format_paths(toml, path=path) # type: ignore
return splice_section(settings)
[docs]
def read_settings(
settings_file: Union[Text, Path, IO[Text], Mapping],
path: Optional[Union[Text, Path]] = None,
) -> Any:
"""Loads the input settings for any MUSE simulation.
Loads a MUSE settings file. This must be a TOML formatted file. Missing settings are
loaded from the DEFAULT_SETTINGS. Custom Python modules, if present, are loaded
and checks are run to validate the settings and ensure that they are compatible with
a MUSE simulation.
Arguments:
settings_file: A string or a Path to the settings file
path: A string or path to the settings folder
Returns:
A dictionary with the settings
"""
getLogger(__name__).info("Reading MUSE settings")
# The user data
if path is None and not isinstance(settings_file, (Mapping, IO)):
path = Path(settings_file).parent
elif path is None:
path = Path()
user_settings = read_split_toml(settings_file, path=path)
# User defined default settings
default_path = Path(user_settings.get("default_settings", DEFAULT_SETTINGS_PATH))
if not default_path.is_absolute():
default_path = path / default_path
default_settings = read_split_toml(default_path, path=path)
# Check that there is at least 1 sector.
msg = "ERROR - There must be at least 1 sector."
assert len(user_settings["sectors"]) >= 1, msg
# timeslice information cannot be merged. Accept only information from one.
if "timeslices" in user_settings:
default_settings.pop("timeslices", None)
# We update the default information with the user provided data
settings = add_known_parameters(default_settings, user_settings)
settings = add_unknown_parameters(settings, user_settings)
# Finally, we run some checks to make sure all makes sense and files exist.
validate_settings(settings)
return convert(settings)
def read_ts_multiindex(
settings: Optional[Union[Mapping, Text]] = None,
timeslice: Optional[xr.DataArray] = None,
transforms: Optional[Dict[Tuple, np.ndarray]] = None,
) -> pd.MultiIndex:
'''Read multiindex for a timeslice from TOML.
Example:
The timeslices are read from ``timeslice_levels``. The levels (keyword) and
slice (list of values) correspond to the level, slices and slice aggregates
defined in the the ``timeslices`` section.
>>> toml = """
... ["timeslices"]
... winter.weekday.day = 5
... winter.weekday.night = 5
... winter.weekend.day = 2
... winter.weekend.night = 2
... winter.weekend.dusk = 1
... summer.weekday.day = 5
... summer.weekday.night = 5
... summer.weekend.day = 2
... summer.weekend.night = 2
... summer.weekend.dusk = 1
... level_names = ["semester", "week", "day"]
... aggregates.allday = ["day", "night"]
... [timeslice_levels]
... day = ["dusk", "allday"]
... """
>>> from muse.timeslices import (
... reference_timeslice, aggregate_transforms
... )
>>> from muse.readers.toml import read_ts_multiindex
>>> ref = reference_timeslice(toml)
>>> transforms = aggregate_transforms(toml, ref)
>>> read_ts_multiindex(toml, ref, transforms)
MultiIndex([('summer', 'weekday', 'allday'),
('summer', 'weekend', 'dusk'),
('summer', 'weekend', 'allday'),
('winter', 'weekday', 'allday'),
('winter', 'weekend', 'dusk'),
('winter', 'weekend', 'allday')],
names=['semester', 'week', 'day'])
It is an error to refer to a level or a slice that does not exist:
>>> read_ts_multiindex(dict(days=["dusk", "allday"]), ref, transforms)
Traceback (most recent call last):
...
muse.readers.toml.IncorrectSettings: Unexpected level name(s): ...
>>> read_ts_multiindex(dict(day=["usk", "allday"]), ref, transforms)
Traceback (most recent call last):
...
muse.readers.toml.IncorrectSettings: Unexpected slice(s): ...
'''
from itertools import product
from toml import loads
from muse.timeslices import TIMESLICE, TRANSFORMS
indices = (TIMESLICE if timeslice is None else timeslice).get_index("timeslice")
if transforms is None:
transforms = TRANSFORMS
if isinstance(settings, Text):
settings = loads(settings)
elif settings is None:
return indices
elif not isinstance(settings, Mapping):
settings = undo_damage(settings)
settings = settings.get("timeslice_levels", settings)
assert isinstance(settings, Mapping)
if not set(settings).issubset(indices.names):
msg = "Unexpected level name(s): " + ", ".join(
set(settings).difference(indices.names)
)
raise IncorrectSettings(msg)
levels = [
settings.get(name, level) for name, level in zip(indices.names, indices.levels)
]
levels = [[level] if isinstance(level, Text) else level for level in levels]
for i, level in enumerate(levels):
known = [index[i] for index in transforms if len(index) > i]
unexpected = set(level).difference(known)
if unexpected:
raise IncorrectSettings("Unexpected slice(s): " + ", ".join(unexpected))
return pd.MultiIndex.from_tuples(
[index for index in product(*levels) if index in transforms],
names=indices.names,
)
def read_timeslices(
settings: Optional[Union[Text, Mapping]] = None,
timeslice: Optional[xr.DataArray] = None,
transforms: Optional[Dict[Tuple, np.ndarray]] = None,
) -> xr.Dataset:
'''Reads timeslice levels and create resulting timeslice coordinate.
Args:
settings: TOML dictionary. It should contain a ``timeslice_levels`` section.
Otherwise, the timeslices will default to the global (finest) timeslices.
timeslice: Finest timeslices. Defaults to the global in
:py:mod:`~muse.timeslices`. If using the default, then this function
should be called *after* the timeslice module has been setup with a call to
:py:func:`~muse.timeslice.setup_module`.
transforms: Transforms from desired timeslices to the finest timeslice. Defaults
to the global in :py:mod:`~muse.timeslices`. If using the default,
then this function should be called *after* the timeslice module has been
setup with a call to :py:func:`~muse.timeslice.setup_module`.
Returns:
A xr.Dataset with the timeslice coordinates.
Example:
>>> toml = """
... ["timeslices"]
... winter.weekday.day = 5
... winter.weekday.night = 5
... winter.weekend.day = 2
... winter.weekend.night = 2
... winter.weekend.dusk = 1
... summer.weekday.day = 5
... summer.weekday.night = 5
... summer.weekend.day = 2
... summer.weekend.night = 2
... summer.weekend.dusk = 1
... level_names = ["semester", "week", "day"]
... aggregates.allday = ["day", "night"]
... [timeslice_levels]
... day = ["dusk", "allday"]
... """
>>> from muse.timeslices import (
... reference_timeslice, aggregate_transforms
... )
>>> from muse.readers.toml import read_timeslices
>>> ref = reference_timeslice(toml)
>>> transforms = aggregate_transforms(toml, ref)
>>> read_timeslices(toml, ref, transforms)
<xarray.Dataset>
Dimensions: (timeslice: 6)
Coordinates:
* timeslice (timeslice) MultiIndex
- semester (timeslice) object 'summer' 'summer' ... 'winter'
- week (timeslice) object 'weekday' 'weekend' ... 'weekend'
- day (timeslice) object 'allday' 'dusk' ... 'dusk' 'allday'
represent_hours (timeslice) ... 10 1 4 10 1 4
Data variables:
*empty*
'''
from muse.timeslices import TIMESLICE, timeslice_projector
if timeslice is None:
timeslice = TIMESLICE
if settings is None:
return xr.Dataset({"represent_hours": timeslice}).set_coords("represent_hours")
indices = read_ts_multiindex(settings, timeslice=timeslice, transforms=transforms)
units = xr.DataArray(
np.ones(len(indices)), coords={"timeslice": indices}, dims="timeslice"
)
proj = timeslice_projector(units, finest=timeslice, transforms=transforms)
proj *= xr.DataArray(
timeslice.values,
coords={"finest_timeslice": proj.finest_timeslice},
dims="finest_timeslice",
)
return xr.Dataset({"represent_hours": proj.sum("finest_timeslice")}).set_coords(
"represent_hours"
)
def add_known_parameters(dd, u, parent=None):
"""Function for updating the settings dictionary recursively.
Those variables that take default values are logged.
"""
defaults_used = []
missing = []
d = deepcopy(dd)
for k in dd:
# Known parameters with user-defined values
if k in u:
v = u[k]
if isinstance(v, Mapping):
new_parent = k
if parent is not None:
new_parent = "{}.{}".format(parent, k)
d[k] = add_known_parameters(d.get(k, {}), v, new_parent)
else:
d[k] = v
# Required parameters
elif isinstance(d[k], Text) and d[k].lower() == "required":
missing.append(k)
# Optional parameters with default values
elif isinstance(d[k], Text) and d[k].lower() == "optional":
d.pop(k)
elif parent is not None:
defaults_used.append("{}.{}".format(parent, k))
else:
defaults_used.append(k)
msg = "ERROR - Required parameters missing in input file: {}.".format(missing)
if len(missing) > 0:
raise MissingSettings(msg)
msg = ", ".join(defaults_used)
msg = " Default input values used: " + msg
if len(defaults_used) > 0:
getLogger(__name__).info(msg)
return d
def add_unknown_parameters(dd, u):
"""Function for adding new parameters not known in the defaults file."""
d = deepcopy(dd)
for k, v in u.items():
if isinstance(v, Mapping):
d[k] = add_unknown_parameters(d.get(k, {}), v)
else:
d[k] = v
return d
def validate_settings(settings: Dict) -> None:
"""Run the checks on the settings file."""
msg = " Validating input settings..."
getLogger(__name__).info(msg)
check_plugins(settings)
for check in SETTINGS_CHECKS:
SETTINGS_CHECKS[check](settings)
def check_plugins(settings: Dict) -> None:
"""Checks that the user custom defined python files exist.
Checks that the user custom defined python files exist. If flagged to use, they are
also loaded.
While this is a settings check, it is run separately to ensure that custom defined
settings checks are all loaded before validating the settings.
"""
plugins = settings.get("plugins", [])
if isinstance(plugins, (Dict, Mapping)):
plugins = plugins.get("plugins")
if isinstance(plugins, (Path, Text)):
plugins = [plugins]
if not plugins:
return
for path in map(lambda x: Path(format_path(x)), plugins):
if not path.exists():
msg = f"ERROR plugin does not exist: {path}"
getLogger(__name__).critical(msg)
raise IncorrectSettings(msg)
# The module is loaded, registering anything inside that is decorated
spec = implib.spec_from_file_location(path.stem, path)
mod = implib.module_from_spec(spec)
spec.loader.exec_module(mod) # type: ignore
getLogger(__name__).info(f"Loaded plugin {path.stem} from {path}")
@register_settings_check(vary_name=False)
def check_log_level(settings: Dict) -> None:
"""Check the log level required in the simulation."""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
msg = "ERROR - Valid log levels are {}.".format(", ".join(valid_levels))
assert settings["log_level"].upper() in valid_levels, msg
settings["log_level"] = settings["log_level"].upper()
@register_settings_check(vary_name=False)
def check_interpolation_mode(settings: Dict) -> None:
"""Just updates the interpolation mode to a bool.
There's no check, actually.
"""
settings["interpolation_mode"] = settings["interpolation_mode"].lower()
valid_modes = ["off", "false", "linear", "active", "cubic"]
msg = 'ERROR - Valid interpolation modes are "off", "linear" and "cubic"'
assert settings["interpolation_mode"] in valid_modes, msg
# And we normalize the interpolation mode
# If there's no interpolation, we get the nearest value
if settings["interpolation_mode"] in ["off", "false"]:
settings["interpolation_mode"] = "nearest"
elif settings["interpolation_mode"] in ["linear", "active"]:
settings["interpolation_mode"] = "linear"
@register_settings_check(vary_name=False)
def check_budget_parameters(settings: Dict) -> None:
"""Check the parameters that are required if carbon_budget > 0."""
length = len(settings["carbon_budget_control"]["budget"])
if length > 0:
msg = "ERROR - budget_check must have the same length that time_framework"
if isinstance(settings["time_framework"], list):
assert length == len(settings["time_framework"]), msg
coords = settings["time_framework"]
else:
assert length + 1 == len(settings["time_framework"]), msg
coords = settings["time_framework"][:-1]
# If Ok, we transform the list into an xr.DataArray
settings["carbon_budget_control"]["budget"] = xr.DataArray(
np.array(settings["carbon_budget_control"]["budget"]),
dims="year",
coords={"year": coords},
)
else:
settings["carbon_budget_control"]["budget"] = xr.DataArray([])
@register_settings_check(vary_name=False)
def check_foresight(settings: Dict) -> None:
"""Check that foresight is a multiple of the smaller time_framework difference.
If so, we update the time framework adding the foresight year to the list and
transforming it into an array
"""
tfmin = np.diff(settings["time_framework"]).min()
msg = "ERROR - foresight is not a multiple of the smaller time_framework difference"
assert settings["foresight"] % tfmin == 0, msg
settings["time_framework"].sort()
# This adds to the years list a new year separated from the last one a “foresight”
# number of years.
settings["time_framework"].append(
settings["time_framework"][-1] + settings["foresight"]
)
settings["time_framework"] = np.array(settings["time_framework"], dtype=int)
@register_settings_check(vary_name=False)
def check_iteration_control(settings: Dict) -> None:
"""Checks the variables related to the control of the iterations.
This includes whether equilibrium must be reached, the maximum number of iterations
or the tolerance to consider convergence.
"""
# Anything that is not "off" or False, means that equilibrium should be reached.
if str(settings["equilibrium"]).lower() in ("false", "off"):
settings["equilibrium"] = False
else:
settings["equilibrium"] = True
msg = "ERROR - The number of iterations must be a positive number."
assert settings["maximum_iterations"] > 0, msg
settings["maximum_iterations"] = int(settings["maximum_iterations"])
msg = "ERROR - The convergence tolerance must be a positive number."
assert settings["tolerance"] > 0, msg
@register_settings_check(vary_name=False)
def check_time_slices(settings: Dict) -> None:
"""Check the time slices.
If there is no error, they are transformed into a xr.DataArray
"""
from muse.timeslices import setup_module
setup_module(settings)
settings["timeslices"] = read_timeslices(
settings.get("mca", settings).get("timeslice_levels", None)
).timeslice
@register_settings_check(vary_name=False)
def check_global_data_files(settings: Dict) -> None:
"""Checks that the global user files exist."""
user_data = settings["global_input_files"]
if Path(user_data["path"]).is_absolute():
basedir = Path(user_data["path"])
else:
basedir = settings["root"] / Path(user_data["path"])
msg = "ERROR Directory of global user files does not exist: {}.".format(basedir)
assert basedir.exists(), msg
# Update the path to the base directory
user_data["path"] = basedir
files = list(user_data.keys())
files.remove("path")
for m in files:
if user_data[m] == "":
user_data.pop(m)
continue
if Path(user_data[m]).is_absolute():
f = Path(user_data[m])
else:
f = basedir / user_data[m]
assert f.exists(), f"{m.title()} file does not exist ({f})"
# The path is updated so it can be readily used
user_data[m] = f
@register_settings_check(vary_name=False)
def check_sectors_files(settings: Dict) -> None:
"""Checks that the sector files exist."""
sectors = settings["sectors"]
priorities = {
"preset": 0,
"presets": 0,
"demand": 10,
"conversion": 20,
"supply": 30,
"last": 100,
}
if "list" in sectors:
sectors = {k: sectors[k] for k in sectors["list"]}
for name, sector in sectors.items():
# Finally the priority of the sectors is used to set the order of execution
sector["priority"] = sector.get("priority", priorities["last"])
sector["priority"] = int(
priorities.get(str(sector["priority"]).lower().strip(), sector["priority"])
)
sectors["list"] = sorted(
settings["sectors"].keys(), key=lambda x: settings["sectors"][x]["priority"]
)
settings["sectors"] = sectors
def read_technodata(
settings: Any,
sector_name: Optional[Text] = None,
time_framework: Optional[Sequence[int]] = None,
commodities: Optional[Union[Text, Path]] = None,
regions: Optional[Sequence[Text]] = None,
**kwargs,
) -> xr.Dataset:
"""Helper function to create technodata for a given sector."""
from muse.readers.csv import read_technologies, read_trade
if time_framework is None:
time_framework = getattr(settings, "time_framework", [2010, 2050])
if commodities is None:
commodities = settings.global_input_files.global_commodities
if regions is None:
regions = settings.regions
if sector_name is not None:
settings = getattr(settings.sectors, sector_name)
technodata_timeslices = getattr(settings, "technodata_timeslices", None)
# normalizes case where technodata is not in own subsection
if not hasattr(settings, "technodata") and sector_name is not None:
raise MissingSettings(f"Missing technodata section in {sector_name}")
elif not hasattr(settings, "technodata"):
raise MissingSettings("Missing technodata section")
technosettings = undo_damage(settings.technodata)
if isinstance(technosettings, Text):
technosettings = dict(
technodata=technosettings,
technodata_timeslices=technodata_timeslices,
commodities_in=settings.commodities_in,
commodities_out=settings.commodities_out,
)
else:
for comm in ("in", "out"):
name = f"commodities_{comm}"
if hasattr(settings, comm) and comm in technosettings:
raise IncorrectSettings(f"{name} specified twice")
elif hasattr(settings, comm):
technosettings[name] = getattr(settings, name)
for name in ("technodata", "commodities_in", "commodities_out"):
if name not in technosettings:
raise MissingSettings(f"Missing required technodata input {name}")
filename = technosettings[name]
if not Path(filename).exists():
raise IncorrectSettings(f"File {filename} does not exist.")
if not Path(filename).is_file():
raise IncorrectSettings(f"File {filename} is not a file.")
technologies = read_technologies(
technodata_path_or_sector=technosettings.pop("technodata"),
technodata_timeslices_path=technosettings.pop("technodata_timeslices", None),
comm_out_path=technosettings.pop("commodities_out"),
comm_in_path=technosettings.pop("commodities_in"),
commodities=commodities,
).sel(region=regions)
ins = (technologies.fixed_inputs > 0).any(("year", "region", "technology"))
outs = (technologies.fixed_outputs > 0).any(("year", "region", "technology"))
techcomms = technologies.commodity[ins | outs]
technologies = technologies.sel(commodity=techcomms)
for name, value in technosettings.items():
if isinstance(name, (Text, Path)):
data = read_trade(value, drop="Unit")
if "region" in data.dims:
data = data.sel(region=regions)
if "dst_region" in data.dims:
data = data.sel(dst_region=regions)
if data.dst_region.size == 1:
data = data.squeeze("dst_region", drop=True)
else:
data = value
if isinstance(data, xr.Dataset):
technologies = technologies.merge(data)
else:
technologies[name] = data
# make sure technologies includes the requisite years
maxyear = getattr(settings, "forecast", 5) + max(time_framework)
if technologies.year.max() < maxyear:
msg = "Forward-filling technodata to fit simulation timeframe"
getLogger(__name__).info(msg)
years = [*technologies.year.data.tolist(), maxyear]
technologies = technologies.sel(year=years, method="ffill")
technologies["year"] = "year", years
minyear = min(time_framework)
if technologies.year.min() > minyear:
msg = "Back-filling technodata to fit simulation timeframe"
getLogger(__name__).info(msg)
years = [minyear, *technologies.year.data.tolist()]
technologies = technologies.sel(year=years, method="bfill")
technologies["year"] = "year", years
year = sorted(set(time_framework).union(technologies.year.data.tolist()))
technologies = technologies.interp(year=year, **kwargs)
return technologies