Created
December 21, 2021 22:42
-
-
Save canimus/1817d7d37755e85e28cbae9e7e3f0ff1 to your computer and use it in GitHub Desktop.
PySpark FFill Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyspark.sql.functions as F | |
from pyspark.sql import DataFrame | |
from pyspark.sql import Window as W | |
from pyspark.sql.window import WindowSpec | |
__all__ = ["forward_fill"] | |
def _window_all_previous_rows(partition, order) -> WindowSpec: | |
"""Select the window on which values are filled in a forward manner.""" | |
return W.partitionBy(partition).orderBy(order).rowsBetween(W.unboundedPreceding, 0) | |
def _window_all_following_rows(partition, order) -> WindowSpec: | |
"""Select the window on which values are filled in a backward manner.""" | |
return W.partitionBy(partition).orderBy(order).rowsBetween(0, W.unboundedFollowing) | |
def forward_fill( | |
df: DataFrame, partition: str, order: str, variable: str, number_of_days: int = 7 | |
) -> DataFrame: | |
""" | |
It fills the null values of the "variable" column based on the last non-null value in a window defined by the "number_of_days". 7 days by default | |
The values are filled in a forward manner. This is equivalent to the so-called `ffill` in pandas or numpy | |
""" | |
# Write an Exception if a date appear more than one time | |
if df.count() > df.dropDuplicates(["name", "date"]).count(): | |
raise ValueError("Data has duplicated order values for partition.") | |
_last_5_business_days = _window_all_previous_rows(partition, order) | |
_is_weekend = F.col("day_of_week") > 5 | |
_is_business_day = ~_is_weekend | |
_date_with_value = F.when(F.col(variable).isNotNull(), F.col(order)) | |
# RULE: We forward fill values within 7 days prior the considered row | |
_last_value_within_5_business_days = F.when( | |
F.col("history") <= number_of_days, | |
F.last(variable, ignorenulls=True).over(_last_5_business_days), | |
) | |
return ( | |
df | |
.withColumn("day_of_week", F.dayofweek(order)) | |
.withColumn("is_weekend", _is_weekend) | |
.filter(_is_business_day) | |
.withColumn( | |
"last_date_with_non_null_value", | |
F.last(_date_with_value, ignorenulls=True).over(_last_5_business_days), | |
) | |
.withColumn("history", F.datediff(order, "last_date_with_non_null_value")) | |
.na.fill(value=0, subset=["history"]) | |
.withColumn( | |
"ffill", | |
F.coalesce( | |
F.col(variable), | |
_last_value_within_5_business_days, | |
), | |
) | |
) | |
def backward_fill( | |
df: DataFrame, partition: str, order: str, variable: str, number_of_days: int = 7 | |
) -> DataFrame: | |
""" | |
It fills the null values of the "variable" column based on the next non-null value in a window defined by the "number_of_days". | |
The values are filled in a backward manner. | |
""" | |
# Write an Exception if a date appear more than one time | |
if df.count() > df.dropDuplicates(["name", "date"]).count(): | |
raise ValueError("Data has duplicated order values for partition.") | |
_next_5_business_days = _window_all_following_rows(partition, order) | |
_is_weekend = F.col("day_of_week") > 5 | |
_is_business_day = ~_is_weekend | |
_date_with_value = F.when(F.col(variable).isNotNull(), F.col(order)) | |
# RULE: We forward fill values within 7 days prior the considered row | |
_next_value_within_5_business_days = F.when( | |
F.col("history") <= number_of_days, | |
F.last(variable, ignorenulls=True).over( | |
_window_all_following_rows(partition, order) | |
), | |
) | |
return ( | |
df.withColumn("day_of_week", F.dayofweek(order)) | |
.withColumn("is_weekend", _is_weekend) | |
.filter(_is_business_day) | |
.withColumn( | |
"next_date_with_non_null_value", | |
F.first(_date_with_value, ignorenulls=True).over(_next_5_business_days), | |
) | |
.withColumn("history", F.datediff("next_date_with_non_null_value", order)) | |
.na.fill(value=0, subset=["history"]) | |
.withColumn( | |
"bfill", | |
F.coalesce( | |
F.col(variable), | |
_next_value_within_5_business_days | |
), | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment