Skip to content

Instantly share code, notes, and snippets.

@paulrougieux
Last active July 2, 2025 14:52
Show Gist options
  • Save paulrougieux/1eec273b8edefbad5acd20ca64cf0270 to your computer and use it in GitHub Desktop.
Save paulrougieux/1eec273b8edefbad5acd20ca64cf0270 to your computer and use it in GitHub Desktop.
Aiphoria test python script
"""Load sample data for the aiphoria tool for assessing and visualizing dynamic wood material flows, carbon stocks, and stock changes
- aiphoria repository https://github.com/EuropeanForestInstitute/aiphoria/
- The following script is based on the getting started documentation at
https://github.com/EuropeanForestInstitute/aiphoria/tree/main/docs/content/getting_started
- **Current status**: Fails at the last reported error, because the flow_solver object is empty.
There were several errors when running documentation scripts, see the comments below:
- TypeError: cannot unpack non-iterable NoneType object
- returned by is_checker_ok, checker_messages = data_checker.check_for_errors()
- KeyError: 'scenario_name'
- returned by network_visualizer.build(scenarios[0].scenario_data)
- NameError: name 'msc' is not defined
- AttributeError: 'NoneType' object has no attribute 'get_unique_processes'
- returned by unique_processes = flow_solver.get_unique_processes()
"""
import os
import sys
import pandas as pd
aiphoria_path = "/home/paul/rp/zz_forks/aiphoria"
os.chdir(aiphoria_path)
# Add aiphoria and ODUM to the python path
sys.path.insert(0, aiphoria_path)
sys.path.insert(0, "/home/paul/rp/zz_forks/ODYM/src")
from core.datachecker import DataChecker
from core.dataprovider import DataProvider
from core.datavisualizer import DataVisualizer
from core.flowsolver import FlowSolver
from core.network_graph import NetworkGraph
from core.parameters import ParameterName
import odym.classes as msc
#############
# Load data #
#############
filename = "data/example_data.xlsx"
print("Loading data from file {}...".format(filename))
dataprovider = DataProvider(filename)
model_params = dataprovider.get_model_params()
print("Using following parameters for running the model:")
for param_name, param_value in model_params.items():
print("\t{:42}= {}".format(param_name, param_value))
rel_path_to_output = model_params[ParameterName.OutputPath]
abs_output_path = os.path.abspath(os.path.join(os.getcwd(), rel_path_to_output))
model_params[ParameterName.OutputPath] = abs_output_path
print("Checking errors in data...")
data_checker = DataChecker(dataprovider)
scenarios = data_checker.build_scenarios()
# is_checker_ok, checker_messages = data_checker.check_for_errors()
# TypeError: cannot unpack non-iterable NoneType object
############################
# Create a network graph # #
############################
# scenarios[0] is always the baseline scenario and is guaranteed to exist
if model_params[ParameterName.CreateNetworkGraphs]:
network_visualizer = NetworkGraph()
# network_visualizer.build(scenarios[0].scenario_data)
# KeyError: 'scenario_name'
# Because the default params is an empty dictionnary
# but the build method expects this scenario_name value
network_visualizer.build(scenarios[0].scenario_data, options=dict(scenario_name="test"))
network_visualizer.show()
###########################################
# Set up a dynamic Material Flow Analysis #
###########################################
for scenario_index, scenario in enumerate(scenarios):
print("Building ODYM MFA for scenario '{}' ({}/{})...".format(scenario.name, scenario_index + 1, len(scenarios)))
# Track solid wood equivalent and carbon.
# Dictionary of classifications enters the index table defined for the system.
# The index table lists all aspects needed and assigns a classification and index letter to each aspect.
# More info on ODYM model classifications and index table see: https://github.com/IndEcol/ODYM
scenario_data = scenario.scenario_data
years = scenario_data.years
model_time_start = scenario_data.start_year
model_time_end = scenario_data.end_year
model_elements = ['Solid wood equivalent', 'Carbon']
model_years = years
model_classifications = {
'Time': msc.Classification(Name='Time', Dimension='Time', ID=1, Items=model_years),
# NameError: name 'msc' is not defined
'Cohort': msc.Classification(Name='Age-cohort', Dimension='Time', ID=2, Items=model_years),
'Element': msc.Classification(Name='Elements', Dimension='Element', ID=3, Items=model_elements),
}
index_table = pd.DataFrame({'Aspect': ['Time', 'Age-cohort', 'Element'], # 'Time' and 'Element' must be present!
'Description': ['Model aspect "time"', 'Model aspect "age-cohort"', 'Model aspect "Element"'],
'Dimension': ['Time', 'Time', 'Element'], # 'Time' and 'Element' are also dimensions
'Classification': [model_classifications[Aspect] for Aspect in ['Time', 'Cohort', 'Element']],
'IndexLetter': ['t', 'c', 'e' ]}) # Unique one letter (upper or lower case) indices to be used later for calculations.
index_table.set_index('Aspect', inplace=True) # Default indexing of IndexTable, other indices are produced on the fly
index_table
# ODYM initialization: the dynamic stock models are developed to track how
# stocks change over time based on their inflows and lifetime parameters and
# each result is stored in a dictionary.
flow_solver = scenario.flow_solver
mfa_system = msc.MFAsystem(Name='Wood product system', Geogr_Scope='Europe', Unit='Mm3',
ProcessList=[], FlowDict={}, StockDict={}, ParameterDict={},
Time_Start=model_time_start, Time_End=model_time_end, IndexTable=index_table,
Elements=index_table.loc['Element'].Classification.Items)
# Get inflow values to stock
year_index_to_year = dict(enumerate(model_years))
unique_processes = flow_solver.get_unique_processes()
# AttributeError: 'NoneType' object has no attribute 'get_unique_processes'
unique_flows = flow_solver.get_unique_flows()
print("Building ODYM processes...")
odym_processes = []
process_id_to_index = {}
for process_id, process in unique_processes.items():
process_index = len(odym_processes)
process_id_to_index[process_id] = process_index
new_process = msc.Process(ID=process_index, Name=process.name)
odym_processes.append(new_process)
print("Building ODYM flows...")
odym_flows = {}
for flow_id, flow in unique_flows.items():
source_process_index = process_id_to_index[flow.source_process_id]
target_process_index = process_id_to_index[flow.target_process_id]
new_flow = msc.Flow(ID=flow.id, P_Start=source_process_index, P_End=target_process_index, Indices='t,e', Values=None)
odym_flows[flow.id] = new_flow
print("Building ODYM stocks...")
odym_stocks = {}
for stock in flow_solver.get_all_stocks():
process_index = process_id_to_index[stock.id]
new_stock = msc.Stock(ID=stock.id, Name=stock.name, P_Res=process_index, Indices='t,e', Type=0, Values=None)
odym_stocks[stock.id] = new_stock
mfa_system.ProcessList = odym_processes
mfa_system.FlowDict = odym_flows
mfa_system.StockDict = odym_stocks
mfa_system.Initialize_FlowValues()
mfa_system.Initialize_StockValues()
mfa_system.Consistency_Check()
# Update ODYM flow values from flow values DataFrame
for flow_id, flow in mfa_system.FlowDict.items():
for year_index, value in enumerate(flow.Values):
# Skip to next year if FlowSolver does not have data for this year
# This is possible because ODYM flow and stock values are already initialized to 0.0
flow_has_data_for_year = flow_solver.has_flow(year=year_index_to_year[year_index], flow_id=flow_id)
if not flow_has_data_for_year:
continue
# NOTE: Virtual flows use default value defined in Flow for carbon content (now 1.0).
solved_flow = flow_solver.get_flow(year=year_index_to_year[year_index], flow_id=flow_id)
flow.Values[year_index, 0] = solved_flow.evaluated_value
flow.Values[year_index, 1] = solved_flow.evaluated_value_carbon
# Process stocks (fill with data)
for stock_id, stock in odym_stocks.items():
# Calculate cohorts for "Solid wood equivalent"
dsm_swe = flow_solver.get_dynamic_stocks_swe()[stock_id]
swe_stock_by_cohort = dsm_swe.compute_s_c_inflow_driven()
swe_outflow_by_cohort = dsm_swe.compute_o_c_from_s_c()
swe_stock_total = dsm_swe.compute_stock_total()
swe_stock_change = dsm_swe.compute_stock_change()
stock.Values[:, 0] = swe_stock_change
# Calculate cohorts for "Carbon"
dsm_carbon = flow_solver.get_dynamic_stocks_carbon()[stock_id]
carbon_stock_by_cohort = dsm_carbon.compute_s_c_inflow_driven()
carbon_outflow_by_cohort = dsm_carbon.compute_o_c_from_s_c()
carbon_stock_total = dsm_carbon.compute_stock_total()
carbon_stock_change = dsm_carbon.compute_stock_change()
stock.Values[:, 1] = carbon_stock_change
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment