Source code for muse.readers.csv

"""Ensemble of functions to read MUSE data."""

from __future__ import annotations

__all__ = [
    "read_attribute_table",
    "read_csv_agent_parameters",
    "read_global_commodities",
    "read_initial_assets",
    "read_initial_market",
    "read_io_technodata",
    "read_macro_drivers",
    "read_presets",
    "read_regression_parameters",
    "read_technodictionary",
    "read_technologies",
    "read_timeslice_shares",
]

from collections.abc import Sequence
from pathlib import Path
from typing import cast

import numpy as np
import pandas as pd
import xarray as xr

from muse.defaults import DEFAULT_SECTORS_DIRECTORY
from muse.errors import UnitsConflictInCommodities


def to_numeric(x):
    """Converts a value to numeric if possible.

    Args:
        x: The value to convert.

    Returns:
        The value converted to numeric if possible, otherwise the original value.
    """
    try:
        return pd.to_numeric(x)
    except ValueError:
        return x


def find_sectors_file(
    filename: str | Path,
    sector: str | None = None,
    sectors_directory: str | Path = DEFAULT_SECTORS_DIRECTORY,
) -> Path:
    """Looks through a few standard place for sector files."""
    filename = Path(filename)

    if sector is not None:
        dirs: Sequence[Path] = (
            Path(sectors_directory) / sector.title(),
            Path(sectors_directory),
        )
    else:
        dirs = (Path(sectors_directory),)
    for directory in dirs:
        path = directory / filename
        if path.is_file():
            return path
    if sector is not None:
        msg = f"Could not find sector {sector.title()} file {filename}."
    else:
        msg = f"Could not find file {filename}."
    raise OSError(msg)


[docs] def read_technodictionary(filename: str | Path) -> xr.Dataset: """Reads and formats technodata into a dataset. There are three axes: technologies, regions, and year. """ from logging import getLogger from muse.readers import camel_to_snake csv = pd.read_csv(filename, float_precision="high", low_memory=False) csv.drop(csv.filter(regex="Unname"), axis=1, inplace=True) # Check for deprecated Fuel and EndUse columns (#715) columns_lower = [col.lower() for col in csv.columns] if "fuel" in columns_lower: msg = ( f"The 'Fuel' column in {filename} has been deprecated. " "This information is now determined from CommIn files. " "Please remove this column from your Technodata files." ) getLogger(__name__).warning(msg) if "enduse" in columns_lower: msg = ( f"The 'EndUse' column in {filename} has been deprecated. " "This information is now determined from CommOut files. " "Please remove this column from your Technodata files." ) getLogger(__name__).warning(msg) if "scaling_size" in columns_lower: msg = ( f"The 'ScalingSize' column in {filename} has been deprecated. " "Please remove this column from your Technodata files." ) getLogger(__name__).warning(msg) csv = csv.rename(columns=camel_to_snake) data = csv[csv.process_name != "Unit"] ts = pd.MultiIndex.from_arrays( [data.process_name, data.region_name, [int(u) for u in data.time]], names=("technology", "region", "year"), ) data.index = ts data.columns.name = "technodata" data.index.name = "technology" data = data.drop(["process_name", "region_name", "time"], axis=1) data = data.apply(to_numeric, axis=0) result = xr.Dataset.from_dataframe(data.sort_index()) if "type" in result.variables: result["tech_type"] = result.type.isel(region=0, year=0) result["tech_type"].values = [ camel_to_snake(name) for name in result["tech_type"].values ] units = csv[csv.process_name == "Unit"].drop( ["process_name", "region_name", "time"], axis=1 ) for variable, value in units.items(): if all(u not in {"-", "Retro", "New"} for u in value.values): result[variable].attrs["units"] = value.values[0] # Sanity checks if "year" in result.dims: assert len(set(result.year.data)) == result.year.data.size result = result.sortby("year") if "year" in result.dims and len(result.year) == 1: result = result.isel(year=0, drop=True) return result
def read_technodata_timeslices(filename: str | Path) -> xr.Dataset: from muse.readers import camel_to_snake from muse.timeslices import sort_timeslices csv = pd.read_csv(filename, float_precision="high", low_memory=False) csv = csv.rename(columns=camel_to_snake) csv = csv.rename( columns={"process_name": "technology", "region_name": "region", "time": "year"} ) data = csv[csv.technology != "Unit"] data = data.apply(to_numeric) ts = pd.MultiIndex.from_frame( data.drop( columns=["utilization_factor", "minimum_service_factor", "obj_sort"], errors="ignore", ) ) data.index = ts data.columns.name = "technodata_timeslice" data.index.name = "technology" data = data.filter(["utilization_factor", "minimum_service_factor"]) result = xr.Dataset.from_dataframe(data) timeslice_levels = [ item for item in list(result.coords) if item not in ["technology", "region", "year"] ] result = result.stack(timeslice=timeslice_levels) return sort_timeslices(result)
[docs] def read_io_technodata(filename: str | Path) -> xr.Dataset: """Reads process inputs or outputs. There are four axes: (technology, region, year, commodity) """ from muse.readers import camel_to_snake csv = pd.read_csv(filename, float_precision="high", low_memory=False) # Unspecified Level values default to "fixed" if "Level" in csv.columns: csv["Level"] = csv["Level"].fillna("fixed") else: # Particularly relevant to outputs files where the Level column is omitted by # default, as only "fixed" outputs are allowed. csv["Level"] = "fixed" data = csv[csv.ProcessName != "Unit"] region = np.array(data.RegionName, dtype=str) process = data.ProcessName year = [int(u) for u in data.Time] data = data.drop(["ProcessName", "RegionName", "Time"], axis=1) ts = pd.MultiIndex.from_arrays( [process, region, year], names=("technology", "region", "year") ) data.index = ts data.columns.name = "commodity" data.index.name = "technology" data = data.rename(columns=camel_to_snake) data = data.apply(to_numeric, axis=0) fixed_set = xr.Dataset.from_dataframe(data[data.level == "fixed"]).drop_vars( "level" ) flexible_set = xr.Dataset.from_dataframe(data[data.level == "flexible"]).drop_vars( "level" ) commodity = xr.DataArray( list(fixed_set.data_vars.keys()), dims="commodity", name="commodity" ) fixed = xr.concat(fixed_set.data_vars.values(), dim=commodity) flexible = xr.concat(flexible_set.data_vars.values(), dim=commodity) result = xr.Dataset(data_vars={"fixed": fixed, "flexible": flexible}) result["flexible"] = result.flexible.fillna(0) # add units for flexible and fixed units = csv[csv.ProcessName == "Unit"].drop( ["ProcessName", "RegionName", "Time", "Level"], axis=1 ) units.index.name = "units" units.columns.name = "commodity" units = xr.DataArray(units).isel(units=0, drop=True) result["commodity_units"] = units return result
[docs] def read_initial_assets(filename: str | Path) -> xr.DataArray: """Reads and formats data about initial capacity into a dataframe.""" data = pd.read_csv(filename, float_precision="high", low_memory=False) if "Time" in data.columns: result = cast( xr.DataArray, read_trade(filename, skiprows=[1], columns_are_source=True) ) else: result = read_initial_capacity(data) technology = result.technology result = result.drop_vars("technology").rename(technology="asset") result["technology"] = "asset", technology.values result["installed"] = ("asset", [int(result.year.min())] * len(result.technology)) result["year"] = result.year.astype(int) return result
def read_initial_capacity(data: str | Path | pd.DataFrame) -> xr.DataArray: if not isinstance(data, pd.DataFrame): data = pd.read_csv(data, float_precision="high", low_memory=False) if "Unit" in data.columns: data = data.drop(columns="Unit") data = ( data.rename(columns=dict(ProcessName="technology", RegionName="region")) .melt(id_vars=["technology", "region"], var_name="year") .set_index(["region", "technology", "year"]) ) result = xr.DataArray.from_series(data["value"]) result = result.sel(year=result.year != "2100.1") result["year"] = result.year.astype(int) return result
[docs] def read_technologies( technodata_path_or_sector: str | Path | None = None, technodata_timeslices_path: str | Path | None = None, comm_out_path: str | Path | None = None, comm_in_path: str | Path | None = None, commodities: str | Path | xr.Dataset | None = None, sectors_directory: str | Path = DEFAULT_SECTORS_DIRECTORY, ) -> xr.Dataset: """Reads data characterising technologies from files. Arguments: technodata_path_or_sector: If `comm_out_path` and `comm_in_path` are not given, then this argument refers to the name of the sector. The three paths are then determined using standard locations and name. Specifically, technodata looks for a "technodataSECTORNAME.csv" file in the standard location for that sector. However, if `comm_out_path` and `comm_in_path` are given, then this should be the path to the the technodata file. technodata_timeslices_path: This argument refers to the TechnodataTimeslices file which specifies the utilization factor per timeslice for the specified technology. comm_out_path: If given, then refers to the path of the file specifying output commmodities. If not given, then defaults to "commOUTtechnodataSECTORNAME.csv" in the relevant sector directory. comm_in_path: If given, then refers to the path of the file specifying input commmodities. If not given, then defaults to "commINtechnodataSECTORNAME.csv" in the relevant sector directory. commodities: Optional. If commodities is given, it should point to a global commodities file, or a dataset akin to reading such a file with `read_global_commodities`. In either case, the information pertaining to commodities will be added to the technologies dataset. sectors_directory: Optional. If `paths_or_sector` is a string indicating the name of the sector, then this is a path to a directory where standard input files are contained. Returns: A dataset with all the characteristics of the technologies. """ from logging import getLogger from muse.commodities import CommodityUsage if (not comm_out_path) and (not comm_in_path): sector = technodata_path_or_sector assert sector is None or isinstance(sector, str) tpath = find_sectors_file( f"technodata{sector.title()}.csv", sector, sectors_directory, # type: ignore ) opath = find_sectors_file( f"commOUTtechnodata{sector.title()}.csv", # type: ignore sector, sectors_directory, ) ipath = find_sectors_file( f"commINtechnodata{sector.title()}.csv", # type: ignore sector, sectors_directory, ) else: assert isinstance(technodata_path_or_sector, (str, Path)) assert comm_out_path is not None assert comm_in_path is not None tpath = Path(technodata_path_or_sector) opath = Path(comm_out_path) ipath = Path(comm_in_path) msg = f"""Reading technology information from: - technodata: {tpath} - outputs: {opath} - inputs: {ipath} """ if technodata_timeslices_path and isinstance( technodata_timeslices_path, (str, Path) ): ttpath = Path(technodata_timeslices_path) msg += f"""- technodata_timeslices: {ttpath} """ else: ttpath = None if isinstance(commodities, (str, Path)): msg += f"""- global commodities file: {commodities}""" logger = getLogger(__name__) logger.info(msg) result = read_technodictionary(tpath) if any(result[u].isnull().any() for u in result.data_vars): raise ValueError(f"Inconsistent data in {tpath} (e.g. inconsistent years)") outs = read_io_technodata(opath).rename( flexible="flexible_outputs", fixed="fixed_outputs" ) if not (outs["flexible_outputs"] == 0).all(): raise ValueError( f"'flexible' outputs are not permitted in {opath}. " "All outputs must be 'fixed'" ) outs = outs.drop_vars("flexible_outputs") ins = read_io_technodata(ipath).rename( flexible="flexible_inputs", fixed="fixed_inputs" ) if "year" in result.dims and len(result.year) > 1: if all(len(outs[d]) > 1 for d in outs.dims if outs[d].dtype.kind in "uifc"): outs = outs.interp(year=result.year) if all(len(ins[d]) > 1 for d in ins.dims if ins[d].dtype.kind in "uifc"): ins = ins.interp(year=result.year) try: result = result.merge(outs).merge(ins) except xr.core.merge.MergeError: raise UnitsConflictInCommodities if isinstance(ttpath, (str, Path)): technodata_timeslice = read_technodata_timeslices(ttpath) result = result.drop_vars("utilization_factor") result = result.merge(technodata_timeslice) else: technodata_timeslice = None # try and add info about commodities if isinstance(commodities, (str, Path)): try: commodities = read_global_commodities(commodities) except OSError: logger.warning("Could not load global commodities file.") commodities = None if isinstance(commodities, xr.Dataset): if result.commodity.isin(commodities.commodity).all(): result = result.merge(commodities.sel(commodity=result.commodity)) else: raise OSError( "Commodities not found in global commodities file: check spelling." ) result["comm_usage"] = ( "commodity", CommodityUsage.from_technologies(result).values, ) result = result.set_coords("comm_usage") if "comm_type" in result.data_vars or "comm_type" in result.coords: result = result.drop_vars("comm_type") check_utilization_and_minimum_service_factors( result.to_dataframe(), [tpath, ttpath] ) return result
[docs] def read_global_commodities(path: str | Path) -> xr.Dataset: """Reads commodities information from input.""" from logging import getLogger from muse.readers import camel_to_snake path = Path(path) if path.is_dir(): path = path / "MuseGlobalCommodities.csv" if not path.is_file(): raise OSError(f"File {path} does not exist.") getLogger(__name__).info(f"Reading global commodities from {path}.") data = pd.read_csv(path, float_precision="high", low_memory=False) data.index = [camel_to_snake(u) for u in data.CommodityName] data.CommodityType = [camel_to_snake(u) for u in data.CommodityType] data = data.drop("CommodityName", axis=1) data = data.rename( columns={ "CommodityType": "comm_type", "Commodity": "comm_name", "CommodityEmissionFactor_CO2": "emmission_factor", "HeatRate": "heat_rate", "Unit": "unit", } ) data.index.name = "commodity" return xr.Dataset(data)
[docs] def read_timeslice_shares( path: str | Path = DEFAULT_SECTORS_DIRECTORY, sector: str | None = None, ) -> xr.DataArray: """Reads sliceshare information into a xr.Dataset. Additionally, this function will try and recover the timeslice multi- index from a import file "Timeslices{sector}.csv" in the same directory as the timeslice shares. Pass `None` if this behaviour is not required. """ from logging import getLogger from re import match path = Path(path) if sector is None: if path.is_dir(): sector = path.name else: path, filename = path.parent, path.name re = match(r"TimesliceShare(.*)\.csv", filename) sector = path.name if re is None else re.group(1) share_path = find_sectors_file(f"TimesliceShare{sector}.csv", sector, path) getLogger(__name__).info(f"Reading timeslice shares from {share_path}") data = pd.read_csv(share_path, float_precision="high", low_memory=False) data.index = pd.MultiIndex.from_arrays( (data.RegionName, data.SN), names=("region", "timeslice") ) data.index.name = "rt" data = data.drop(["RegionName", "SN"], axis=1) data.columns.name = "commodity" result = xr.DataArray(data).unstack("rt").to_dataset(name="shares") return result.shares
[docs] def read_csv_agent_parameters(filename) -> list: """Reads standard MUSE agent-declaration csv-files. Returns a list of dictionaries, where each dictionary can be used to instantiate an agent in :py:func:`muse.agents.factories.factory`. """ from logging import getLogger from muse.readers import camel_to_snake if ( isinstance(filename, str) and Path(filename).suffix != ".csv" and not Path(filename).is_file() ): filename = find_sectors_file(f"BuildingAgent{filename}.csv", filename) data = pd.read_csv(filename, float_precision="high", low_memory=False) if "AgentNumber" in data.columns: data = data.drop(["AgentNumber"], axis=1) result = [] # We remove rows with missing information, and next over the rest for _, row in data.iterrows(): objectives = row[[i.startswith("Objective") for i in row.index]] floats = row[[i.startswith("ObjData") for i in row.index]] sorting = row[[i.startswith("Objsort") for i in row.index]] if len(objectives) != len(floats) or len(objectives) != len(sorting): raise ValueError( f"Agent Objective, ObjData, and Objsort columns are inconsistent in {filename}" # noqa: E501 ) objectives = objectives.dropna().to_list() for u in objectives: if not issubclass(type(u), str): raise ValueError( f"Agent Objective requires a string entry in {filename}" ) sort = sorting.dropna().to_list() for u in sort: if not issubclass(type(u), bool): raise ValueError( f"Agent Objsort requires a boolean entry in {filename}" ) floats = floats.dropna().to_list() for u in floats: if not issubclass(type(u), (int, float)): raise ValueError(f"Agent ObjData requires a float entry in {filename}") decision_params = [ u for u in zip(objectives, sorting, floats) if isinstance(u[0], str) ] agent_type = { "new": "newcapa", "newcapa": "newcapa", "retrofit": "retrofit", "retro": "retrofit", "agent": "agent", "default": "agent", }[getattr(row, "Type", "agent").lower()] # Add warning about retrofit agents if agent_type == "retrofit": msg = ( "Retrofit agents will be deprecated in a future release. " "Please modify your model to use only agents of the 'New' type." ) getLogger(__name__).warning(msg) data = { "name": row.Name, "region": row.RegionName, "objectives": [u[0] for u in decision_params], "search_rules": row.SearchRule, "decision": {"name": row.DecisionMethod, "parameters": decision_params}, "agent_type": agent_type, } if hasattr(row, "Quantity"): data["quantity"] = row.Quantity if hasattr(row, "MaturityThreshold"): data["maturity_threshold"] = row.MaturityThreshold if hasattr(row, "SpendLimit"): data["spend_limit"] = row.SpendLimit data["share"] = camel_to_snake(row.AgentShare) if agent_type == "retrofit" and data["decision"] == "lexo": data["decision"] = "retro_lexo" result.append(data) return result
[docs] def read_macro_drivers(path: str | Path) -> xr.Dataset: """Reads a standard MUSE csv file for macro drivers.""" from logging import getLogger path = Path(path) getLogger(__name__).info(f"Reading macro drivers from {path}") table = pd.read_csv(path, float_precision="high", low_memory=False) table.index = table.RegionName table.index.name = "region" table.columns.name = "year" table = table.drop(["Unit", "RegionName"], axis=1) population = table[table.Variable == "Population"] population = population.drop("Variable", axis=1) gdp = table[table.Variable == "GDP|PPP"].drop("Variable", axis=1) result = xr.Dataset({"gdp": gdp, "population": population}) result["year"] = "year", result.year.values.astype(int) result["region"] = "region", result.region.values.astype(str) return result
[docs] def read_initial_market( projections: xr.DataArray | Path | str, base_year_import: str | Path | xr.DataArray | None = None, base_year_export: str | Path | xr.DataArray | None = None, ) -> xr.Dataset: """Read projections, import and export csv files.""" from logging import getLogger from muse.timeslices import TIMESLICE, distribute_timeslice # Projections must always be present if isinstance(projections, (str, Path)): getLogger(__name__).info(f"Reading projections from {projections}") projections = read_attribute_table(projections) # Base year export is optional. If it is not there, it's set to zero if isinstance(base_year_export, (str, Path)): getLogger(__name__).info(f"Reading base year export from {base_year_export}") base_year_export = read_attribute_table(base_year_export) elif base_year_export is None: getLogger(__name__).info("Base year export not provided. Set to zero.") base_year_export = xr.zeros_like(projections) # Base year import is optional. If it is not there, it's set to zero if isinstance(base_year_import, (str, Path)): getLogger(__name__).info(f"Reading base year import from {base_year_import}") base_year_import = read_attribute_table(base_year_import) elif base_year_import is None: getLogger(__name__).info("Base year import not provided. Set to zero.") base_year_import = xr.zeros_like(projections) base_year_export = distribute_timeslice(base_year_export, level=None) base_year_import = distribute_timeslice(base_year_import, level=None) base_year_export.name = "exports" base_year_import.name = "imports" static_trade = base_year_import - base_year_export static_trade.name = "static_trade" result = xr.Dataset( { projections.name: projections, base_year_export.name: base_year_export, base_year_import.name: base_year_import, static_trade.name: static_trade, } ) result = result.rename( commodity_price="prices", units_commodity_price="units_prices" ) result["prices"] = ( result["prices"].expand_dims({"timeslice": TIMESLICE}).drop_vars("timeslice") ) return result
[docs] def read_attribute_table(path: str | Path) -> xr.DataArray: """Read a standard MUSE csv file for price projections.""" from logging import getLogger from muse.readers import camel_to_snake path = Path(path) if not path.is_file(): raise OSError(f"{path} does not exist.") getLogger(__name__).info(f"Reading prices from {path}") table = pd.read_csv(path, float_precision="high", low_memory=False) units = table.loc[0].drop(["RegionName", "Attribute", "Time"]) table = table.drop(0) table.columns.name = "commodity" table = table.rename( columns={"RegionName": "region", "Attribute": "attribute", "Time": "year"} ) region, year = table.region, table.year.astype(int) table = table.drop(["region", "year"], axis=1) table.index = pd.MultiIndex.from_arrays([region, year], names=["region", "year"]) attribute = camel_to_snake(table.attribute.unique()[0]) table = table.drop(["attribute"], axis=1) table = table.rename(columns={c: camel_to_snake(c) for c in table.columns}) result = xr.DataArray(table, name=attribute).astype(float) result = result.unstack("dim_0").fillna(0) result.coords["units_" + attribute] = ("commodity", units) return result
[docs] def read_regression_parameters(path: str | Path) -> xr.Dataset: """Reads the regression parameters from a standard MUSE csv file.""" from logging import getLogger from muse.readers import camel_to_snake path = Path(path) if not path.is_file(): raise OSError(f"{path} does not exist or is not a file.") getLogger(__name__).info(f"Reading regression parameters from {path}.") table = pd.read_csv(path, float_precision="high", low_memory=False) # Normalize column names table.columns.name = "commodity" table = table.rename( columns={ "RegionName": "region", "SectorName": "sector", "FunctionType": "function_type", } ) # Create a multiindex based on three of the columns sector, region, function_type = ( table.sector.apply(lambda x: x.lower()), table.region, table.function_type, ) table = table.drop(["sector", "region", "function_type"], axis=1) table.index = pd.MultiIndex.from_arrays( [sector, region], names=["sector", "region"] ) table = table.rename(columns={c: camel_to_snake(c) for c in table.columns}) # Create a dataset, separating each type of coeeficient as a separate xr.DataArray coeffs = xr.Dataset( { k: xr.DataArray(table[table.coeff == k].drop("coeff", axis=1)) for k in table.coeff.unique() } ) # Unstack the multi-index into separate dimensions coeffs = coeffs.unstack("dim_0").fillna(0) # We pair each sector with its function type function_type = list(zip(*set(zip(sector, function_type)))) function_type = xr.DataArray( list(function_type[1]), dims=["sector"], coords={"sector": list(function_type[0])}, ) coeffs["function_type"] = function_type return coeffs
[docs] def read_presets( paths: str | Path | Sequence[str | Path], columns: str = "commodity", indices: Sequence[str] = ("RegionName", "Timeslice"), drop: Sequence[str] = ("Unnamed: 0",), ) -> xr.Dataset: """Read consumption or supply files for preset sectors.""" from logging import getLogger from re import match from muse.readers import camel_to_snake def expand_paths(path): from glob import glob if isinstance(paths, str): return [Path(p) for p in glob(path)] return Path(path) if isinstance(paths, str): allfiles = expand_paths(paths) else: allfiles = [expand_paths(p) for p in cast(Sequence, paths)] if len(allfiles) == 0: raise OSError(f"No files found with paths {paths}") datas = {} for path in allfiles: data = pd.read_csv(path, low_memory=False) assert all(u in data.columns for u in indices) # Legacy: drop ProcessName column and sum data (PR #448) if "ProcessName" in data.columns: data = ( data.drop(columns=["ProcessName"]) .groupby(list(indices)) .sum() .reset_index() ) msg = ( f"The ProcessName column (in file {path}) is deprecated. " "Data has been summed across processes, and this column has been " "dropped." ) getLogger(__name__).warning(msg) data = data.drop(columns=[k for k in drop if k in data.columns]) data.index = pd.MultiIndex.from_arrays([data[u] for u in indices]) data.index.name = "asset" data.columns.name = columns data = data.drop(columns=list(indices)) reyear = match(r"\S*.(\d{4})\S*\.csv", path.name) if reyear is None: raise OSError(f"Unexpected filename {path.name}") year = int(reyear.group(1)) if year in datas: raise OSError(f"Year f{year} was found twice") data.year = year datas[year] = xr.DataArray(data) result = ( xr.Dataset(datas) .to_array(dim="year") .sortby("year") .fillna(0) .unstack("asset") .rename({k: k.replace("Name", "").lower() for k in indices}) ) if "commodity" in result.coords: result.coords["commodity"] = [ camel_to_snake(u) for u in result.commodity.values ] return result
def read_trade( data: pd.DataFrame | str | Path, columns_are_source: bool = True, parameters: str | None = None, skiprows: Sequence[int] | None = None, name: str | None = None, drop: str | Sequence[str] | None = None, ) -> xr.DataArray | xr.Dataset: """Read CSV table with source and destination regions.""" from muse.readers import camel_to_snake if not isinstance(data, pd.DataFrame): data = pd.read_csv(data, skiprows=skiprows) if parameters is None and "Parameter" in data.columns: parameters = "Parameter" if columns_are_source: col_region = "src_region" row_region = "dst_region" else: row_region = "src_region" col_region = "dst_region" data = data.apply(to_numeric, axis=0) if isinstance(drop, str): drop = [drop] if drop: drop = list(set(drop).intersection(data.columns)) if drop: data = data.drop(columns=drop) data = data.rename( columns=dict( Time="year", ProcessName="technology", RegionName=row_region, Commodity="commodity", ) ) indices = list( {"commodity", "year", "src_region", "dst_region", "technology"}.intersection( data.columns ) ) data = data.melt( id_vars={parameters}.union(indices).intersection(data.columns), var_name=col_region, ) if parameters is None: result: xr.DataArray | xr.Dataset = xr.DataArray.from_series( data.set_index([*indices, col_region])["value"] ).rename(name) else: result = xr.Dataset.from_dataframe( data.pivot_table( values="value", columns=parameters, index=[*indices, col_region] ).rename(columns=camel_to_snake) ) return result.rename(src_region="region") def check_utilization_and_minimum_service_factors( data: pd.DataFrame, filename: str | list[str] ) -> None: filename = [filename] if isinstance(filename, (str, Path)) else filename filename = [name for name in filename if name is not None] if "utilization_factor" not in data.columns: raise ValueError( f"""A technology needs to have a utilization factor defined for every timeslice. Please check files: {filename}.""" ) _check_utilization_not_all_zero(data, filename) _check_utilization_in_range(data, filename) if "minimum_service_factor" in data.columns: _check_minimum_service_factors_in_range(data, filename) _check_utilization_not_below_minimum(data, filename) def _check_utilization_not_all_zero(data, filename): utilization_sum = data.groupby(["technology", "region", "year"]).sum() if (utilization_sum.utilization_factor == 0).any(): raise ValueError( f"""A technology can not have a utilization factor of 0 for every timeslice. Please check files: {filename}.""" ) def _check_utilization_in_range(data, filename): utilization = data["utilization_factor"] if not np.all((0 <= utilization) & (utilization <= 1)): raise ValueError( f"""Utilization factor values must all be between 0 and 1 inclusive. Please check files: {filename}.""" ) def _check_utilization_not_below_minimum(data, filename): if (data["utilization_factor"] < data["minimum_service_factor"]).any(): raise ValueError(f"""Utilization factors must all be greater than or equal to their corresponding minimum service factors. Please check {filename}.""") def _check_minimum_service_factors_in_range(data, filename): min_service_factor = data["minimum_service_factor"] if not np.all((0 <= min_service_factor) & (min_service_factor <= 1)): raise ValueError( f"""Minimum service factor values must all be between 0 and 1 inclusive. Please check files: {filename}.""" )