Created
August 16, 2022 16:59
-
-
Save janpipek/329d42bd2f12fd49549a01a8591b0cd6 to your computer and use it in GitHub Desktop.
Normalize series
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numbers | |
from typing import Literal, Optional, Union | |
import numpy as np | |
import pandas as pd | |
def normalize_series( | |
series: pd.Series, | |
*, | |
new_sum: Union[numbers.Real, Literal["count"]] = "count", | |
weights: Optional[pd.Series] = None, | |
) -> pd.Series: | |
"""Normalize a series to a new target sum. | |
Args: | |
series: The series to perform on | |
weights: Optional weights for the sum (scale-independent) | |
new_sum: | |
"count" (default) => scale to a (weighted) average of 1.0 | |
number => the total (weighted) sum will be equal to this (1.0 to get fractions) | |
Returns: | |
A new series with non-na values replaced, whose (potentially weighted) sum | |
is equal to `new_sum`. | |
Example: | |
>>> normalize_series(pd.Series([1, 2, 3])).tolist() | |
[0.5, 1.0, 1.5] | |
>>> normalize_series(pd.Series([1, np.nan, 2]), new_sum=1.0).tolist() | |
[0.3333333333333333, nan, 0.6666666666666666] | |
>>> normalize_series(pd.Series([4, 2, 1]), weights=pd.Series([1, 0, 96])).tolist() | |
[3.88, 1.94, 0.97] | |
Note that it is possible to normalize empty or all-NaN series only | |
if the new_sum is not explicitly specified ("mean"). | |
""" | |
if weights is not None: | |
if not weights.index.equals(series.index): | |
raise ValueError( | |
f"Different indices for the series: {series.index} and the weights: {weights.index}" | |
) | |
if weights[series.notna()].isna().any(): | |
raise ValueError( | |
"Weights must be defined for all non-na items of the series." | |
) | |
# Only use weights where applicable | |
weights = weights.where(series.notna(), np.nan) | |
weights = weights * weights.count() / weights.sum() # => mean=1.0 | |
current_sum = (series * weights).sum() | |
else: | |
current_sum = series.sum() | |
if new_sum == "count": | |
# Trivial with all NaNs or empty series | |
new_sum = series.count() | |
if new_sum == 0.0: | |
return series.copy() | |
if not np.isfinite(new_sum): | |
raise ValueError(f"The target weight must be finite: {new_sum}") | |
if current_sum == 0.0: | |
raise ValueError(f"Cannot normalize a series with zero sum: {series}") | |
if not np.isfinite(current_sum): | |
raise ValueError(f"Cannot normalize a series with infinite sum: {series}") | |
return series * new_sum / current_sum |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import pytest | |
class TestNormalizeSeries: | |
@pytest.mark.parametrize( | |
"values,expected", | |
[ | |
# TODO: add ids | |
[[], []], | |
[[1, 2, 3], [0.5, 1.0, 1.5]], | |
[[1], [1.0]], | |
[[2, 0.5, np.nan], [1.6, 0.4, np.nan]], | |
[[np.nan], [np.nan]], | |
], | |
) | |
def test_for_count(self, values, expected): | |
values = pd.Series(values) | |
result = normalize_series(values) | |
assert_series_equal(result, pd.Series(expected)) | |
@pytest.mark.parametrize( | |
"values", | |
[ | |
[0.0], | |
[0.0, np.nan], | |
[-1.0, 1.0], | |
], | |
) | |
def test_with_zero_mean(self, values): | |
with pytest.raises(ValueError, match="Cannot normalize a series with zero sum"): | |
values = pd.Series(values) | |
normalize_series(values) | |
@pytest.mark.parametrize( | |
"values", | |
[ | |
[np.inf], | |
[0.0, np.inf], | |
[np.inf, -np.inf], | |
], | |
) | |
def test_with_infinite_sum(self, values): | |
with pytest.raises( | |
ValueError, match="Cannot normalize a series with infinite sum" | |
): | |
values = pd.Series(values) | |
normalize_series(values) | |
@pytest.mark.parametrize( | |
"values,new_sum,expected", | |
[ | |
[[1, 2, 5], 1.0, [0.125, 0.25, 0.625]], | |
[[1], 1.33, [1.33]], | |
[[2, 0.5, np.nan], 4, [3.2, 0.8, np.nan]], | |
], | |
) | |
def test_with_valid_target_weight(self, values, new_sum, expected): | |
values = pd.Series(values) | |
result = normalize_series(values, new_sum=new_sum) | |
assert_series_equal(result, pd.Series(expected)) | |
@pytest.mark.parametrize("new_sum", [np.inf, np.nan]) | |
def test_with_invalid_target_weight(self, new_sum): | |
with pytest.raises(ValueError, match="The target weight must be finite"): | |
normalize_series(pd.Series([1, 2]), new_sum=new_sum) | |
def test_keeps_index(self): | |
x = pd.Series({"a": 1, "b": 2}) | |
result = normalize_series(x) | |
expected_index = pd.Index(["a", "b"]) | |
assert_index_equal(result.index, expected_index) | |
@pytest.mark.parametrize( | |
"weights,expected", | |
[ | |
pytest.param([1, 1], [0.5, 1.5], id="identity"), | |
pytest.param([2, 2], [0.5, 1.5], id="identity-scaled"), | |
pytest.param([0.01, 1], [0.3355, 1.006], id="one-very-small"), | |
pytest.param([0, 10], [0.3333, 1], id="one-ignored"), | |
], | |
) | |
def test_with_weights(self, weights, expected): | |
series = pd.Series([1, 3]) | |
weights = pd.Series(weights) | |
expected = pd.Series(expected) | |
result = normalize_series(series, weights=weights) | |
assert_series_equal(result, expected, rtol=1e-3) | |
@pytest.mark.parametrize( | |
"weights_index", | |
[ | |
pytest.param([0, 1], id="short"), | |
pytest.param([0, 1, 2, 3], id="long"), | |
pytest.param([1, 0, 3], id="different"), | |
], | |
) | |
def test_weights_with_an_invalid_index(self, weights_index): | |
series = pd.Series([1, 2, 3]) | |
weights = pd.Series(1, index=weights_index) | |
with pytest.raises(ValueError, match="Different indices"): | |
normalize_series(series, weights=weights) | |
# TODO: Add some hypothesis tests to verify the weighted sum invariant |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment