Skip to content

Instantly share code, notes, and snippets.

@cmutel
Created November 20, 2024 09:09
Show Gist options
  • Save cmutel/159a99be89139a381a243035eb8d4c05 to your computer and use it in GitHub Desktop.
Save cmutel/159a99be89139a381a243035eb8d4c05 to your computer and use it in GitHub Desktop.
Verify ecoinvent 3.11 cutoff scores against published values
from bw2io.extractors.excel import ExcelExtractor
from ecoinvent_interface import EcoinventRelease, Settings, ReleaseType
from openpyxl import load_workbook
from scipy import sparse
from time import time
from tqdm import tqdm
import bw2calc as bc
import bw2data as bd
import bw2io as bi
import itertools
import matrix_utils as mu
import numpy as np
bd.projects.set_current("ecoinvent-3.11-cutoff")
if "ecoinvent-3.11-cutoff" not in bd.databases:
bi.import_ecoinvent_release("3.11", "cutoff")
my_settings = Settings()
ei = EcoinventRelease(my_settings)
excel_fp = (
ei.get_release(version='3.11', system_model='cutoff', release_type=ReleaseType.cumulative_lcia)
/ 'Cut-off Cumulative LCIA v3.11.xlsx'
)
excel_data = ExcelExtractor.extract_sheet(wb=load_workbook(excel_fp), name="LCIA")
impact_categories = list(zip(
itertools.repeat("ecoinvent-3.11"),
excel_data[0][6:],
excel_data[1][6:],
excel_data[2][6:]
))
for tpl in impact_categories:
assert tpl in bd.methods
bw_datasets = {
(ds['name'], ds['location'], ds['reference product']): ds
for ds in bd.Database("ecoinvent-3.11-cutoff")
}
assert len(bw_datasets) == len(bd.Database("ecoinvent-3.11-cutoff"))
datasets = [bw_datasets[tuple(row[1:4])] for row in excel_data[4:]]
scores = (
np.array([row[5] for row in excel_data[4:]], dtype=np.float64).reshape((-1, 1))
* np.array([row[6:] for row in excel_data[4:]])
)
assert scores.shape == (len(datasets), len(impact_categories))
excel_data = None
functional_units = {str(x.id): {x.id: 1} for x in datasets}
lca = bc.LCA(
demand={datasets[0]: 1},
)
lca.lci()
demand_matrix = np.zeros((len(datasets), len(datasets)))
for index, dataset in enumerate(datasets):
demand_matrix[lca.dicts.activity[dataset.id], index] = 1
start = time()
supply_matrix = sparse.csr_matrix(
bc.spsolve(lca.technosphere_matrix, demand_matrix)
)
stop = time()
print(f"Calculating LCI took {stop - start} seconds")
inventories = lca.biosphere_matrix @ supply_matrix
our_scores = np.zeros_like(scores)
for index, impact_category in enumerate(impact_categories):
lca.switch_method(impact_category)
our_scores[:, index] = np.array((lca.characterization_matrix @ inventories).sum(axis=0)).ravel()
normalizer = scores.copy()
normalizer[scores == 0] = 1
difference = np.abs((scores - our_scores) / normalizer)
num_elem = (difference > 0.001).sum()
flat_indices = np.argpartition(-1 * difference.ravel(), num_elem - 1)[:num_elem]
x, y = np.unravel_index(flat_indices, difference.shape)
scores[x, y]
our_scores[x, y]
difference[x, y]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment