"""Ensemble of functions to read MUSE data."""
__all__ = [
"read_technodictionary",
"read_io_technodata",
"read_initial_assets",
"read_technologies",
"read_csv_timeslices",
"read_global_commodities",
"read_timeslice_shares",
"read_csv_agent_parameters",
"read_macro_drivers",
"read_initial_market",
"read_attribute_table",
"read_regression_parameters",
"read_csv_outputs",
]
from pathlib import Path
from typing import Hashable, List, Optional, Sequence, Text, Union, cast
import numpy as np
import pandas as pd
import xarray as xr
from muse.defaults import DEFAULT_SECTORS_DIRECTORY
from muse.errors import UnitsConflictInCommodities
def find_sectors_file(
filename: Union[Text, Path],
sector: Optional[Text] = None,
sectors_directory: Union[Text, Path] = DEFAULT_SECTORS_DIRECTORY,
) -> Path:
"""Looks through a few standard place for sector files."""
filename = Path(filename)
if sector is not None:
dirs: Sequence[Path] = (
Path(sectors_directory) / sector.title(),
Path(sectors_directory),
)
else:
dirs = (Path(sectors_directory),)
for directory in dirs:
path = directory / filename
if path.is_file():
return path
if sector is not None:
msg = "Could not find sector %s file %s." % (sector.title(), filename)
else:
msg = "Could not find file %s." % filename
raise IOError(msg)
[docs]
def read_technodictionary(filename: Union[Text, Path]) -> xr.Dataset:
"""Reads and formats technodata into a dataset.
There are three axes: technologies, regions, and year.
"""
from re import sub
from muse.readers import camel_to_snake
def to_agent_share(name):
return sub(r"agent(\d)", r"agent_share_\1", name)
csv = pd.read_csv(filename, float_precision="high", low_memory=False)
csv.drop(csv.filter(regex="Unname"), axis=1, inplace=True)
csv = (
csv.rename(columns=camel_to_snake)
.rename(columns=to_agent_share)
.rename(columns={"end_use": "enduse", "availabiliy year": "availability"})
)
data = csv[csv.process_name != "Unit"]
ts = pd.MultiIndex.from_arrays(
[data.process_name, data.region_name, [int(u) for u in data.time]],
names=("technology", "region", "year"),
)
data.index = ts
data.columns.name = "technodata"
data.index.name = "technology"
data = data.drop(["process_name", "region_name", "time"], axis=1)
data = data.apply(lambda x: pd.to_numeric(x, errors="ignore"), axis=0)
result = xr.Dataset.from_dataframe(data.sort_index())
if "fuel" in result.variables:
result["fuel"] = result.fuel.isel(region=0, year=0)
result["fuel"].values = [camel_to_snake(name) for name in result["fuel"].values]
if "type" in result.variables:
result["tech_type"] = result.type.isel(region=0, year=0)
result["tech_type"].values = [
camel_to_snake(name) for name in result["tech_type"].values
]
if "enduse" in result.variables:
result["enduse"] = result.enduse.isel(region=0, year=0)
result["enduse"].values = [
camel_to_snake(name) for name in result["enduse"].values
]
units = csv[csv.process_name == "Unit"].drop(
["process_name", "region_name", "time", "level"], axis=1
)
for variable, value in units.items():
if all(u not in {"-", "Retro", "New"} for u in value.values):
result[variable].attrs["units"] = value.values[0]
# Sanity checks
if "year" in result.dims:
assert len(set(result.year.data)) == result.year.data.size
result = result.sortby("year")
if "year" in result.dims and len(result.year) == 1:
result = result.isel(year=0, drop=True)
return result
def read_technodata_timeslices(filename: Union[Text, Path]) -> xr.Dataset:
from muse.readers import camel_to_snake
csv = pd.read_csv(filename, float_precision="high", low_memory=False)
csv = csv.rename(columns=camel_to_snake)
csv = csv.rename(
columns={"process_name": "technology", "region_name": "region", "time": "year"}
)
data = csv[csv.technology != "Unit"]
data = data.apply(lambda x: pd.to_numeric(x, errors="ignore"))
data = check_utilization_not_all_zero(data, filename)
ts = pd.MultiIndex.from_frame(
data.drop(
columns=["utilization_factor", "minimum_service_factor", "obj_sort"],
errors="ignore",
)
)
data.index = ts
data.columns.name = "technodata_timeslice"
data.index.name = "technology"
data = data.filter(["utilization_factor", "minimum_service_factor"])
result = xr.Dataset.from_dataframe(data.sort_index())
timeslice_levels = [
item
for item in list(result.coords)
if item not in ["technology", "region", "year"]
]
result = result.stack(timeslice=timeslice_levels)
return result
[docs]
def read_io_technodata(filename: Union[Text, Path]) -> xr.Dataset:
"""Reads process inputs or outputs.
There are four axes: (technology, region, year, commodity)
"""
from functools import partial
from muse.readers import camel_to_snake
csv = pd.read_csv(filename, float_precision="high", low_memory=False)
data = csv[csv.ProcessName != "Unit"]
region = np.array(data.RegionName, dtype=str)
process = data.ProcessName
year = [int(u) for u in data.Time]
data = data.drop(["ProcessName", "RegionName", "Time"], axis=1)
ts = pd.MultiIndex.from_arrays(
[process, region, year], names=("technology", "region", "year")
)
data.index = ts
data.columns.name = "commodity"
data.index.name = "technology"
data = data.rename(columns=camel_to_snake)
data = data.apply(partial(pd.to_numeric, errors="ignore"), axis=0)
fixed_set = xr.Dataset.from_dataframe(data[data.level == "fixed"]).drop_vars(
"level"
)
flexible_set = xr.Dataset.from_dataframe(data[data.level == "flexible"]).drop_vars(
"level"
)
commodity = xr.DataArray(
list(fixed_set.data_vars.keys()), dims="commodity", name="commodity"
)
fixed = xr.concat(fixed_set.data_vars.values(), dim=commodity)
flexible = xr.concat(flexible_set.data_vars.values(), dim=commodity)
result = xr.Dataset(data_vars={"fixed": fixed, "flexible": flexible})
result["flexible"] = result.flexible.fillna(0)
# add units for flexible and fixed
units = csv[csv.ProcessName == "Unit"].drop(
["ProcessName", "RegionName", "Time", "Level"], axis=1
)
units.index.name = "units"
units.columns.name = "commodity"
units = xr.DataArray(units).isel(units=0, drop=True)
result["commodity_units"] = units
return result
[docs]
def read_initial_assets(filename: Union[Text, Path]) -> xr.DataArray:
"""Reads and formats data about initial capacity into a dataframe."""
data = pd.read_csv(filename, float_precision="high", low_memory=False)
if "Time" in data.columns:
result = cast(
xr.DataArray, read_trade(filename, skiprows=[1], columns_are_source=True)
)
else:
result = read_initial_capacity(data)
technology = result.technology
result = result.drop_vars("technology").rename(technology="asset")
result["technology"] = "asset", technology.values
result["installed"] = ("asset", [int(result.year.min())] * len(result.technology))
result["year"] = result.year.astype(int)
return result
def read_initial_capacity(data: Union[Text, Path, pd.DataFrame]) -> xr.DataArray:
if not isinstance(data, pd.DataFrame):
data = pd.read_csv(data, float_precision="high", low_memory=False)
if "Unit" in data.columns:
data = data.drop(columns="Unit")
data = (
data.rename(columns=dict(ProcessName="technology", RegionName="region"))
.melt(id_vars=["technology", "region"], var_name="year")
.set_index(["region", "technology", "year"])
)
result = xr.DataArray.from_series(data["value"])
# inconsistent legacy data files.
result = result.sel(year=result.year != "2100.1")
result["year"] = result.year.astype(int)
return result
[docs]
def read_technologies(
technodata_path_or_sector: Optional[Union[Text, Path]] = None,
technodata_timeslices_path: Optional[Union[Text, Path]] = None,
comm_out_path: Optional[Union[Text, Path]] = None,
comm_in_path: Optional[Union[Text, Path]] = None,
commodities: Optional[Union[Text, Path, xr.Dataset]] = None,
sectors_directory: Union[Text, Path] = DEFAULT_SECTORS_DIRECTORY,
) -> xr.Dataset:
"""Reads data characterising technologies from files.
Arguments:
technodata_path_or_sector: If `comm_out_path` and `comm_in_path` are not given,
then this argument refers to the name of the sector. The three paths are
then determined using standard locations and name. Specifically, thechnodata
looks for a "technodataSECTORNAME.csv" file in the standard location for
that sector. However, if `comm_out_path` and `comm_in_path` are given, then
this should be the path to the the technodata file.
technodata_timeslices_path: This argument refers to the TechnodataTimeslices
file which specifies the utilization factor per timeslice for the specified
technology.
comm_out_path: If given, then refers to the path of the file specifying output
commmodities. If not given, then defaults to
"commOUTtechnodataSECTORNAME.csv" in the relevant sector directory.
comm_in_path: If given, then refers to the path of the file specifying input
commmodities. If not given, then defaults to
"commINtechnodataSECTORNAME.csv" in the relevant sector directory.
commodities: Optional. If commodities is given, it should point to a global
commodities file, or a dataset akin to reading such a file with
`read_global_commodities`. In either case, the information pertaining to
commodities will be added to the technologies dataset.
sectors_directory: Optional. If `paths_or_sector` is a string indicating the
name of the sector, then this is a path to a directory where standard input
files are contained.
Returns:
A dataset with all the characteristics of the technologies.
"""
from logging import getLogger
from muse.commodities import CommodityUsage
if (not comm_out_path) and (not comm_in_path):
sector = technodata_path_or_sector
assert sector is None or isinstance(sector, Text)
tpath = find_sectors_file(
f"technodata{sector.title()}.csv",
sector,
sectors_directory, # type: ignore
)
opath = find_sectors_file(
f"commOUTtechnodata{sector.title()}.csv", # type: ignore
sector,
sectors_directory,
)
ipath = find_sectors_file(
f"commINtechnodata{sector.title()}.csv", # type: ignore
sector,
sectors_directory,
)
else:
assert isinstance(technodata_path_or_sector, (Text, Path))
assert comm_out_path is not None
assert comm_in_path is not None
tpath = Path(technodata_path_or_sector)
opath = Path(comm_out_path)
ipath = Path(comm_in_path)
msg = f"""Reading technology information from:
- technodata: {tpath}
- outputs: {opath}
- inputs: {ipath}
"""
if technodata_timeslices_path and isinstance(
technodata_timeslices_path, (Text, Path)
):
ttpath = Path(technodata_timeslices_path)
msg += f"""- technodata_timeslices: {ttpath}
"""
else:
ttpath = None
if isinstance(commodities, (Text, Path)):
msg += f"""- global commodities file: {commodities}"""
logger = getLogger(__name__)
logger.info(msg)
result = read_technodictionary(tpath)
if any(result[u].isnull().any() for u in result.data_vars):
raise ValueError(f"Inconsistent data in {tpath} (e.g. inconsistent years)")
outs = read_io_technodata(opath).rename(
flexible="flexible_outputs", fixed="fixed_outputs"
)
ins = read_io_technodata(ipath).rename(
flexible="flexible_inputs", fixed="fixed_inputs"
)
if "year" in result.dims and len(result.year) > 1:
if all(len(outs[d]) > 1 for d in outs.dims if outs[d].dtype.kind in "uifc"):
outs = outs.interp(year=result.year)
if all(len(ins[d]) > 1 for d in ins.dims if ins[d].dtype.kind in "uifc"):
ins = ins.interp(year=result.year)
try:
result = result.merge(outs).merge(ins)
except xr.core.merge.MergeError:
raise UnitsConflictInCommodities
if isinstance(ttpath, (Text, Path)):
technodata_timeslice = read_technodata_timeslices(ttpath)
result = result.drop_vars("utilization_factor")
result = result.merge(technodata_timeslice)
else:
technodata_timeslice = None
# try and add info about commodities
if isinstance(commodities, (Text, Path)):
try:
commodities = read_global_commodities(commodities)
except IOError:
logger.warning("Could not load global commodities file.")
commodities = None
if isinstance(commodities, xr.Dataset):
if result.commodity.isin(commodities.commodity).all():
result = result.merge(commodities.sel(commodity=result.commodity))
else:
raise IOError(
"Commodities not found in global commodities file: check spelling."
)
result["comm_usage"] = (
"commodity",
CommodityUsage.from_technologies(result).values,
)
result = result.set_coords("comm_usage")
if "comm_type" in result.data_vars or "comm_type" in result.coords:
result = result.drop_vars("comm_type")
return result
[docs]
def read_csv_timeslices(path: Union[Text, Path], **kwargs) -> xr.DataArray:
"""Reads timeslice information from input."""
from logging import getLogger
getLogger(__name__).info("Reading timeslices from %s" % path)
data = pd.read_csv(path, float_precision="high", **kwargs)
def snake_case(string):
from re import sub
result = sub(r"((?<=[a-z])[A-Z]|(?<!\A)[A-Z](?=[a-z]))", r"-\1", string)
return result.lower().strip()
months = [snake_case(u) for u in data.Month.dropna()]
days = [snake_case(u) for u in data.Day.dropna()]
hours = [snake_case(u) for u in data.Hour.dropna()]
ts_index = pd.MultiIndex.from_arrays(
(months, days, hours), names=("month", "day", "hour")
)
result = xr.DataArray(
data.RepresentHours.dropna().astype(int),
coords={"timeslice": ts_index},
dims="timeslice",
name="represent_hours",
)
result.coords["represent_hours"] = result
return result.timeslice
[docs]
def read_global_commodities(path: Union[Text, Path]) -> xr.Dataset:
"""Reads commodities information from input."""
from logging import getLogger
from muse.readers import camel_to_snake
path = Path(path)
if path.is_dir():
path = path / "MuseGlobalCommodities.csv"
if not path.is_file():
raise IOError(f"File {path} does not exist.")
getLogger(__name__).info(f"Reading global commodities from {path}.")
data = pd.read_csv(path, float_precision="high", low_memory=False)
data.index = [camel_to_snake(u) for u in data.CommodityName]
data.CommodityType = [camel_to_snake(u) for u in data.CommodityType]
data = data.drop("CommodityName", axis=1)
data = data.rename(
columns={
"CommodityType": "comm_type",
"Commodity": "comm_name",
"CommodityEmissionFactor_CO2": "emmission_factor",
"HeatRate": "heat_rate",
"Unit": "unit",
}
)
data.index.name = "commodity"
return xr.Dataset(data)
[docs]
def read_timeslice_shares(
path: Union[Text, Path] = DEFAULT_SECTORS_DIRECTORY,
sector: Optional[Text] = None,
timeslice: Union[Text, Path, xr.DataArray] = "Timeslices{sector}.csv",
) -> xr.Dataset:
"""Reads sliceshare information into a xr.Dataset.
Additionally, this function will try and recover the timeslice multi- index from a
import file "Timeslices{sector}.csv" in the same directory as the timeslice shares.
Pass `None` if this behaviour is not required.
"""
from logging import getLogger
from re import match
path = Path(path)
if sector is None:
if path.is_dir():
sector = path.name
else:
path, filename = path.parent, path.name
re = match(r"TimesliceShare(.*)\.csv", filename)
sector = path.name if re is None else re.group(1)
if isinstance(timeslice, Text) and "{sector}" in timeslice:
timeslice = timeslice.format(sector=sector)
if isinstance(timeslice, (Text, Path)) and not Path(timeslice).is_file():
timeslice = find_sectors_file(timeslice, sector, path)
if isinstance(timeslice, (Text, Path)):
timeslice = read_csv_timeslices(timeslice, low_memory=False)
share_path = find_sectors_file("TimesliceShare%s.csv" % sector, sector, path)
getLogger(__name__).info("Reading timeslice shares from %s" % share_path)
data = pd.read_csv(share_path, float_precision="high", low_memory=False)
data.index = pd.MultiIndex.from_arrays(
(data.RegionName, data.SN), names=("region", "timeslice")
)
data.index.name = "rt"
data = data.drop(["RegionName", "SN"], axis=1)
data.columns.name = "commodity"
result = xr.DataArray(data).unstack("rt").to_dataset(name="shares")
if timeslice is None:
result = result.drop_vars("timeslice")
elif isinstance(timeslice, xr.DataArray) and hasattr(timeslice, "timeslice"):
result["timeslice"] = timeslice.timeslice
result[cast(Hashable, timeslice.name)] = timeslice
else:
result["timeslice"] = timeslice
return result.shares
[docs]
def read_csv_agent_parameters(filename) -> List:
"""Reads standard MUSE agent-declaration csv-files.
Returns a list of dictionaries, where each dictionary can be used to instantiate an
agent in :py:func:`muse.agents.factories.factory`.
"""
from re import sub
if (
isinstance(filename, Text)
and Path(filename).suffix != ".csv"
and not Path(filename).is_file()
):
filename = find_sectors_file(f"BuildingAgent{filename}.csv", filename)
data = pd.read_csv(filename, float_precision="high", low_memory=False)
if "AgentNumber" in data.columns:
data = data.drop(["AgentNumber"], axis=1)
result = []
# We remove rows with missing information, and next over the rest
for _, row in data.iterrows():
objectives = row[[i.startswith("Objective") for i in row.index]]
floats = row[[i.startswith("ObjData") for i in row.index]]
sorting = row[[i.startswith("Objsort") for i in row.index]]
if len(objectives) != len(floats) or len(objectives) != len(sorting):
raise ValueError(
f"Agent Objective, ObjData, and Objsort columns are inconsistent in {filename}" # noqa: E501
)
objectives = objectives.dropna().to_list()
for u in objectives:
if not issubclass(type(u), str):
raise ValueError(
f"Agent Objective requires a string entry in {filename}"
)
sort = sorting.dropna().to_list()
for u in sort:
if not issubclass(type(u), bool):
raise ValueError(
f"Agent Objsort requires a boolean entry in {filename}"
)
floats = floats.dropna().to_list()
for u in floats:
if not issubclass(type(u), (int, float)):
raise ValueError(f"Agent ObjData requires a float entry in {filename}")
decision_params = [
u for u in zip(objectives, sorting, floats) if isinstance(u[0], Text)
]
agent_type = {
"new": "newcapa",
"newcapa": "newcapa",
"retrofit": "retrofit",
"retro": "retrofit",
"agent": "agent",
"default": "agent",
}[getattr(row, "Type", "agent").lower()]
data = {
"name": row.Name,
"region": row.RegionName,
"objectives": [u[0] for u in decision_params],
"search_rules": row.SearchRule,
"decision": {"name": row.DecisionMethod, "parameters": decision_params},
"agent_type": agent_type,
}
if hasattr(row, "Quantity"):
data["quantity"] = row.Quantity
if hasattr(row, "MaturityThreshold"):
data["maturity_threshhold"] = row.MaturityThreshold
if hasattr(row, "SpendLimit"):
data["spend_limit"] = row.SpendLimit
# if agent_type != "newcapa":
data["share"] = sub(r"Agent(\d)", r"agent_share_\1", row.AgentShare)
if agent_type == "retrofit" and data["decision"] == "lexo":
data["decision"] = "retro_lexo"
result.append(data)
return result
[docs]
def read_macro_drivers(path: Union[Text, Path]) -> xr.Dataset:
"""Reads a standard MUSE csv file for macro drivers."""
from logging import getLogger
path = Path(path)
getLogger(__name__).info(f"Reading macro drivers from {path}")
table = pd.read_csv(path, float_precision="high", low_memory=False)
table.index = table.RegionName
table.index.name = "region"
table.columns.name = "year"
table = table.drop(["Unit", "RegionName"], axis=1)
population = table[table.Variable == "Population"]
population = population.drop("Variable", axis=1)
gdp = table[table.Variable == "GDP|PPP"].drop("Variable", axis=1)
result = xr.Dataset({"gdp": gdp, "population": population})
result["year"] = "year", result.year.values.astype(int)
result["region"] = "region", result.region.values.astype(str)
return result
[docs]
def read_initial_market(
projections: Union[xr.DataArray, Path, Text],
base_year_import: Optional[Union[Text, Path, xr.DataArray]] = None,
base_year_export: Optional[Union[Text, Path, xr.DataArray]] = None,
timeslices: Optional[xr.DataArray] = None,
) -> xr.Dataset:
"""Read projections, import and export csv files."""
from logging import getLogger
from muse.timeslices import QuantityType, convert_timeslice
# Projections must always be present
if isinstance(projections, (Text, Path)):
getLogger(__name__).info(f"Reading projections from {projections}")
projections = read_attribute_table(projections)
if timeslices is not None:
projections = convert_timeslice(projections, timeslices, QuantityType.INTENSIVE)
# Base year export is optional. If it is not there, it's set to zero
if isinstance(base_year_export, (Text, Path)):
getLogger(__name__).info(f"Reading base year export from {base_year_export}")
base_year_export = read_attribute_table(base_year_export)
elif base_year_export is None:
getLogger(__name__).info("Base year export not provided. Set to zero.")
base_year_export = xr.zeros_like(projections)
# Base year import is optional. If it is not there, it's set to zero
if isinstance(base_year_import, (Text, Path)):
getLogger(__name__).info(f"Reading base year import from {base_year_import}")
base_year_import = read_attribute_table(base_year_import)
elif base_year_import is None:
getLogger(__name__).info("Base year import not provided. Set to zero.")
base_year_import = xr.zeros_like(projections)
if timeslices is not None:
base_year_export = convert_timeslice(
base_year_export, timeslices, QuantityType.EXTENSIVE
)
base_year_import = convert_timeslice(
base_year_import, timeslices, QuantityType.EXTENSIVE
)
base_year_export.name = "exports"
base_year_import.name = "imports"
static_trade = base_year_import - base_year_export
static_trade.name = "static_trade"
result = xr.Dataset(
{
projections.name: projections,
base_year_export.name: base_year_export,
base_year_import.name: base_year_import,
static_trade.name: static_trade,
}
)
result = result.rename(
commodity_price="prices", units_commodity_price="units_prices"
)
result["prices"] = result["prices"].expand_dims({"timeslice": timeslices})
return result
[docs]
def read_attribute_table(path: Union[Text, Path]) -> xr.DataArray:
"""Read a standard MUSE csv file for price projections."""
from logging import getLogger
from muse.readers import camel_to_snake
path = Path(path)
if not path.is_file():
raise IOError(f"{path} does not exist.")
getLogger(__name__).info(f"Reading prices from {path}")
table = pd.read_csv(path, float_precision="high", low_memory=False)
units = table.loc[0].drop(["RegionName", "Attribute", "Time"])
table = table.drop(0)
table.columns.name = "commodity"
table = table.rename(
columns={"RegionName": "region", "Attribute": "attribute", "Time": "year"}
)
region, year = table.region, table.year.astype(int)
table = table.drop(["region", "year"], axis=1)
table.index = pd.MultiIndex.from_arrays([region, year], names=["region", "year"])
attribute = camel_to_snake(table.attribute.unique()[0])
table = table.drop(["attribute"], axis=1)
table = table.rename(columns={c: camel_to_snake(c) for c in table.columns})
result = xr.DataArray(table, name=attribute).astype(float)
result = result.unstack("dim_0").fillna(0)
result.coords["units_" + attribute] = ("commodity", units)
return result
[docs]
def read_regression_parameters(path: Union[Text, Path]) -> xr.Dataset:
"""Reads the regression parameters from a standard MUSE csv file."""
from logging import getLogger
from muse.readers import camel_to_snake
path = Path(path)
if not path.is_file():
raise IOError(f"{path} does not exist or is not a file.")
getLogger(__name__).info(f"Reading regression parameters from {path}.")
table = pd.read_csv(path, float_precision="high", low_memory=False)
# Normalize column names
table.columns.name = "commodity"
table = table.rename(
columns={
"RegionName": "region",
"SectorName": "sector",
"FunctionType": "function_type",
}
)
# Create a multiindex based on three of the columns
sector, region, function_type = (
table.sector.apply(lambda x: x.lower()),
table.region,
table.function_type,
)
table = table.drop(["sector", "region", "function_type"], axis=1)
table.index = pd.MultiIndex.from_arrays(
[sector, region], names=["sector", "region"]
)
table = table.rename(columns={c: camel_to_snake(c) for c in table.columns})
# Create a dataset, separating each type of coeeficient as a separate xr.DataArray
coeffs = xr.Dataset(
{
k: xr.DataArray(table[table.coeff == k].drop("coeff", axis=1))
for k in table.coeff.unique()
}
)
# Unstack the multi-index into separate dimensions
coeffs = coeffs.unstack("dim_0").fillna(0)
# We pair each sector with its function type
function_type = list(zip(*set(zip(sector, function_type))))
function_type = xr.DataArray(
list(function_type[1]),
dims=["sector"],
coords={"sector": list(function_type[0])},
)
coeffs["function_type"] = function_type
return coeffs
[docs]
def read_csv_outputs(
paths: Union[Text, Path, Sequence[Union[Text, Path]]],
columns: Text = "commodity",
indices: Sequence[Text] = ("RegionName", "ProcessName", "Timeslice"),
drop: Sequence[Text] = ("Unnamed: 0",),
) -> xr.Dataset:
"""Read standard MUSE output files for consumption or supply."""
from re import match
from muse.readers import camel_to_snake
def expand_paths(path):
from glob import glob
if isinstance(paths, Text):
return [Path(p) for p in glob(path)]
return Path(path)
if isinstance(paths, Text):
allfiles = expand_paths(paths)
else:
allfiles = [expand_paths(p) for p in cast(Sequence, paths)]
if len(allfiles) == 0:
raise IOError(f"No files found with paths {paths}")
datas = {}
for path in allfiles:
data = pd.read_csv(path, low_memory=False)
data = data.drop(columns=[k for k in drop if k in data.columns])
data.index = pd.MultiIndex.from_arrays(
[data[u] for u in indices if u in data.columns]
)
data.index.name = "asset"
data.columns.name = columns
data = data.drop(columns=list(indices))
reyear = match(r"\S*.(\d{4})\S*\.csv", path.name)
if reyear is None:
raise IOError(f"Unexpected filename {path.name}")
year = int(reyear.group(1))
if year in datas:
raise IOError(f"Year f{year} was found twice")
data.year = year
datas[year] = xr.DataArray(data)
result = (
xr.Dataset(datas)
.to_array(dim="year")
.sortby("year")
.fillna(0)
.unstack("asset")
.rename({k: k.replace("Name", "").lower() for k in indices})
)
if "commodity" in result.coords:
result.coords["commodity"] = [
camel_to_snake(u) for u in result.commodity.values
]
return result
def read_trade(
data: Union[pd.DataFrame, Text, Path],
columns_are_source: bool = True,
parameters: Optional[Text] = None,
skiprows: Optional[Sequence[int]] = None,
name: Optional[Text] = None,
drop: Optional[Union[Text, Sequence[Text]]] = None,
) -> Union[xr.DataArray, xr.Dataset]:
"""Read CSV table with source and destination regions."""
from functools import partial
from muse.readers import camel_to_snake
if not isinstance(data, pd.DataFrame):
data = pd.read_csv(data, skiprows=skiprows)
if parameters is None and "Parameter" in data.columns:
parameters = "Parameter"
if columns_are_source:
col_region = "src_region"
row_region = "dst_region"
else:
row_region = "src_region"
col_region = "dst_region"
data = data.apply(partial(pd.to_numeric, errors="ignore"), axis=0)
if isinstance(drop, Text):
drop = [drop]
if drop:
drop = list(set(drop).intersection(data.columns))
if drop:
data = data.drop(columns=drop)
data = data.rename(
columns=dict(
Time="year",
ProcessName="technology",
RegionName=row_region,
Commodity="commodity",
)
)
indices = list(
{"commodity", "year", "src_region", "dst_region", "technology"}.intersection(
data.columns
)
)
data = data.melt(
id_vars={parameters}.union(indices).intersection(data.columns),
var_name=col_region,
)
if parameters is None:
result: Union[xr.DataArray, xr.Dataset] = xr.DataArray.from_series(
data.set_index([*indices, col_region])["value"]
).rename(name)
else:
result = xr.Dataset.from_dataframe(
data.pivot_table(
values="value", columns=parameters, index=[*indices, col_region]
).rename(columns=camel_to_snake)
)
return result.rename(src_region="region")
def read_finite_resources(path: Union[Text, Path]) -> xr.DataArray:
"""Reads finite resources from csv file.
The CSV file is made up of columns "Region", "Year", as well
as three timeslice columns ("Month", "Day", "Hour"). All three sets of columns are
optional. The timeslice set should contain a full set of timeslices, if present.
Other columns correspond to commodities.
"""
from muse.timeslices import TIMESLICE
data = pd.read_csv(path)
data.columns = [c.lower() for c in data.columns]
ts_levels = TIMESLICE.get_index("timeslice").names
if set(data.columns).issuperset(ts_levels):
timeslice = pd.MultiIndex.from_arrays(
[data[u] for u in ts_levels], names=ts_levels
)
timeslice = pd.DataFrame(timeslice, columns=["timeslice"])
data = pd.concat((data, timeslice), axis=1)
data.drop(columns=ts_levels, inplace=True)
indices = list({"year", "region", "timeslice"}.intersection(data.columns))
data.set_index(indices, inplace=True)
return xr.Dataset.from_dataframe(data).to_array(dim="commodity")
def check_utilization_not_all_zero(data, filename):
if "utilization_factor" not in data.columns:
raise ValueError(
"""A technology needs to have a utilization factor defined for every
timeslice. Please check file {}.""".format(filename)
)
utilization_sum = data.groupby(["technology", "region", "year"]).sum()
if (utilization_sum.utilization_factor == 0).any():
raise ValueError(
"""A technology can not have a utilization factor of 0 for every
timeslice. Please check file {}.""".format(filename)
)
return data