Last active
July 15, 2023 15:13
-
-
Save JorgeMartinezG/a576743eb76f804962f4555e7b28deef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from difflib import SequenceMatcher | |
from qgis import processing | |
from qgis.PyQt.QtCore import QVariant | |
from qgis.processing import alg | |
from qgis.core import * | |
MASTER_LAYER = "MASTER_LAYER" | |
FILE_NAME = "FILE_NAME" | |
MASTER_COLUMN_NAME = "MASTER_COLUMN_NAME" | |
FILE_COLUMN_NAME = "FILE_COLUMN_NAME" | |
THRESOLD = "THRESHOLD" | |
OUTPUT = "OUTPUT" | |
def match_score(word_1, word_2): | |
return SequenceMatcher(None, word_1, word_2).ratio() | |
@alg( | |
name="qlrt", | |
label="Quantum Location resolution tool", | |
group="analysis", | |
group_label="General", | |
) | |
@alg.input( | |
type=alg.SOURCE, | |
name=MASTER_LAYER, | |
label="Master layer", | |
default="/Users/jorgemartinez/data/wfp/risk_analysis/ECU/ecu_bnd_adm2_ge/ecu_bnd_adm2_ge.shp", | |
) | |
@alg.input( | |
type=alg.FILE, | |
name=FILE_NAME, | |
label="File name", | |
default="/Users/jorgemartinez/Desktop/gsap_errors.xlsx", | |
) | |
@alg.input( | |
type=alg.STRING, | |
name=MASTER_COLUMN_NAME, | |
label="Master column name", | |
default="adm1_name", | |
) | |
@alg.input( | |
type=alg.STRING, | |
name=FILE_COLUMN_NAME, | |
label="File column name", | |
default="adm1_name", | |
) | |
@alg.input(type=alg.DISTANCE, name=THRESOLD, label="Threshold", default=0.8) | |
@alg.input(type=alg.SINK, name=OUTPUT, label="Output layer") | |
def location_resolution_tool(instance, parameters, context, feedback, inputs): | |
""" | |
Quantum Location resolution tool | |
""" | |
master_layer = instance.parameterAsSource(parameters, MASTER_LAYER, context) | |
master_column_name = instance.parameterAsString( | |
parameters, MASTER_COLUMN_NAME, context | |
) | |
file_name = instance.parameterAsFile(parameters, FILE_NAME, context) | |
file_column_name = instance.parameterAsString(parameters, FILE_COLUMN_NAME, context) | |
threshold = instance.parameterAsDouble(parameters, THRESOLD, context) | |
file_df = pd.read_excel(file_name) | |
file_list = file_df.to_dict(orient="records") | |
# Adding additional columns from excel file. | |
# TODO: Check field type. | |
output_fields = master_layer.fields() | |
columns_to_append = [l for l in file_df.columns if l != file_column_name] | |
[output_fields.append(QgsField(c, QVariant.String)) for c in columns_to_append] | |
(sink, dest_id) = instance.parameterAsSink( | |
parameters, | |
OUTPUT, | |
context, | |
output_fields, | |
master_layer.wkbType(), | |
master_layer.sourceCrs(), | |
) | |
features_iter = master_layer.getFeatures(QgsFeatureRequest()) | |
cache = {} | |
for feature in features_iter: | |
master_value = feature.attribute(master_column_name) | |
if master_value not in cache.keys(): | |
match_scores_list = [ | |
{**i, "score": match_score(master_value, i[file_column_name]), "row": r + 2} | |
for r, i in enumerate(file_list) | |
] | |
sorted_matched_scores = sorted( | |
match_scores_list, key=lambda x: x["score"], reverse=True | |
) | |
# Pick match with highest score. | |
closest_match = sorted_matched_scores[0] | |
cache[master_value] = closest_match | |
else: | |
closest_match = cache[master_value] | |
closest_score = closest_match["score"] | |
# Report conflict. | |
closest_value = closest_match[file_column_name] | |
closest_row = closest_match["row"] | |
out_feat = QgsFeature() | |
out_feat.setFields(output_fields) | |
out_feat.setGeometry(feature.geometry()) | |
# Get missing value from closest match. | |
missing_values = [ | |
closest_match[c] if closest_score >= threshold else None | |
for c in columns_to_append | |
] | |
out_feat.setAttributes([*feature.attributes(), *missing_values]) | |
sink.addFeature(out_feat, QgsFeatureSink.FastInsert) | |
if closest_score == 1.0: | |
continue | |
closest_score_format = "{:.2f}".format(closest_score) | |
message = f"STATUS=CONFLICT MASTER_VALUE='{master_value}' ROW={closest_row} SCORE={closest_score_format} CLOSEST_VALUE='{closest_value}'" | |
if closest_score < threshold: | |
feedback.reportError(message) | |
continue | |
feedback.pushConsoleInfo(message) | |
return {OUTPUT: dest_id} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment