Last active
April 6, 2024 11:05
-
-
Save st1vms/08c135462dc2a3d07c89ed533fe6e6b1 to your computer and use it in GitHub Desktop.
Linear Regression in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Linear Regression module""" | |
import numpy as np | |
class SimpleLinearRegressionModel: | |
"""Linear regression model class""" | |
def __init__(self, float_precision: int = 6) -> None: | |
if float_precision <= 0: | |
raise ValueError("precision argument must be positive or 0") | |
self.float_precision = float_precision | |
self.indep_vars = [] | |
self.dep_vars = [] | |
self.beta_slope = 0 | |
self.alpha_value = 0 | |
self.data_size = 0 | |
def _get_average(self, var_list: list[float]) -> float: | |
return round(sum(var_list) / self.data_size, self.float_precision) | |
def add_data(self, data: list[tuple[float, float]]) -> None: | |
"""Add input/output data to this model""" | |
for x, y in data: | |
self.indep_vars.append(round(x, self.float_precision)) | |
self.dep_vars.append(round(y, self.float_precision)) | |
self.data_size += 1 | |
def set_data(self, data: list[tuple[float, float]]) -> None: | |
"""Set input/output data for this model""" | |
self.indep_vars = [round(row[0], self.float_precision) for row in data] | |
self.dep_vars = [round(row[1], self.float_precision) for row in data] | |
self.data_size = len(data) | |
def fit(self) -> float: | |
"""Estimate the alpha value of linear regression based off current data | |
Returns R^2 float value after fitting data. | |
""" | |
# Calculate averages | |
x_avg = self._get_average(self.indep_vars) | |
y_avg = self._get_average(self.dep_vars) | |
# Calculate n(x_avg)(y_avg) product | |
nxy = self.data_size * x_avg * y_avg | |
# Calculate summation of the x(i)*y(i) products | |
product_sum = sum(x * y for x, y in zip(self.indep_vars, self.dep_vars)) | |
# Calculate the summation of the x(i)^2 indep var squares. | |
square_sum = sum(x * x for x in self.indep_vars) | |
# Calculate n(x_avg^2) product | |
nxsquare_product = self.data_size * (x_avg**2) | |
# Calculate linear regression Beta slope | |
self.beta_slope = (product_sum - nxy) / (square_sum - nxsquare_product) | |
# Return linear regression alpha value | |
self.alpha_value = round( | |
y_avg - (self.beta_slope * x_avg), self.float_precision | |
) | |
# Calculate quality (goodness) of fit | |
y_variance = sum((y - y_avg) ** 2 for y in self.dep_vars) / self.data_size | |
e_variance = ( | |
sum( | |
(y - teoric_y) ** 2 | |
for y, teoric_y in zip( | |
self.dep_vars, (self.predict(x) for x in self.indep_vars) | |
) | |
) | |
/ self.data_size | |
) | |
return 1 - (e_variance / y_variance) | |
def predict(self, input_x: float) -> float: | |
"""Resolve input x into output y using this model""" | |
return round( | |
(self.beta_slope * input_x) + self.alpha_value, self.float_precision | |
) | |
class MultiLinearRegressionModel(SimpleLinearRegressionModel): | |
"""Linear Regression model for multiple input objects""" | |
def __init__(self, float_precision: int = 6) -> None: | |
super().__init__(float_precision) | |
self.beta_values = [] | |
def add_data(self, data: list[tuple[tuple[float], float]]) -> None: | |
for x_vars, y in data: | |
self.indep_vars.append( | |
[ | |
*x_vars, | |
] | |
) | |
self.dep_vars.append(y) | |
self.data_size += 1 | |
def set_data(self, data: list[tuple[tuple[float], float]]) -> None: | |
self.indep_vars = [ | |
[round(x, self.float_precision) for x in row[0]] for row in data | |
] | |
self.dep_vars = [round(row[1], self.float_precision) for row in data] | |
self.data_size = len(data) | |
def fit(self) -> float: | |
"""Estimate the alpha and beta values of linear regression based on current data | |
Returns R^2 float value after fitting data. | |
""" | |
# Convert the lists to numpy arrays | |
x = np.array(self.indep_vars) | |
y = np.array(self.dep_vars) | |
# Add a column of ones for the intercept term | |
x_matrix = np.hstack((np.ones((x.shape[0], 1)), x)) | |
# Calculate coefficients using the normal equation | |
coef_ = np.linalg.inv(x_matrix.T @ x_matrix) @ x_matrix.T @ y | |
self.alpha_value = round(coef_[0], self.float_precision) | |
self.beta_values = [round(b, self.float_precision) for b in coef_[1:]] | |
# Calculate the sum of squared residuals | |
residuals = y - x_matrix @ coef_ | |
ssr = np.sum(residuals**2) | |
# Calculate the total sum of squares | |
y_mean = np.mean(y) | |
sst = np.sum((y - y_mean) ** 2) | |
# Calculate the R-squared value | |
r_squared = 1 - (ssr / sst) | |
return r_squared | |
def predict(self, input_x: list[float]) -> float: | |
return round( | |
sum( | |
[ | |
self.alpha_value, | |
*[self.beta_values[i] * input_x[i] for i in range(len(input_x))], | |
] | |
), | |
self.float_precision, | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
7.1 0.68 4 41.53 | |
9.9 0.64 1 63.75 | |
3.6 0.58 1 16.38 | |
9.3 0.21 3 45.54 | |
2.3 0.89 5 15.52 | |
4.6 0.00 8 28.55 | |
0.2 0.37 5 5.65 | |
5.4 0.11 3 25.02 | |
8.2 0.87 4 52.49 | |
7.1 0.00 6 38.05 | |
4.7 0.76 0 30.76 | |
5.4 0.87 8 39.69 | |
1.7 0.52 1 17.59 | |
1.9 0.31 3 13.22 | |
9.2 0.19 5 50.98 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
0.41 1850 | |
0.46 2620 | |
0.44 2340 | |
0.47 2690 | |
0.42 2160 | |
0.39 1760 | |
0.41 2500 | |
0.44 2750 | |
0.43 2732 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Linear Regression main test cli utility""" | |
from sys import exit as sys_exit | |
from os import path as ospath | |
from os import getcwd | |
from lin_regr import SimpleLinearRegressionModel, MultiLinearRegressionModel | |
MULTI_INPUTS_FILE = ospath.join(getcwd(), "multi_inputs_test.txt") | |
SIMPLE_INPUTS_FILE = ospath.join(getcwd(), "simple_inputs_test.txt") | |
def read_input_data_simple(fpath: str) -> list[tuple[float, float]]: | |
"""Read input (x) and output (y) variables from data file""" | |
if not ospath.isfile(fpath): | |
raise ValueError("Input file does not exist!") | |
data = [] | |
with open(fpath, "r", encoding="utf-8", errors="ignore") as fp: | |
for line in fp.readlines(): | |
x, y = line.strip().split(" ") | |
x, y = float(x), float(y) | |
data.append((x, y)) | |
return data | |
def read_multi_input_data(fpath: str) -> list[tuple[list[float], float]]: | |
"""Read multi input (x) and output (y) variables from data file""" | |
if not ospath.isfile(fpath): | |
raise ValueError("Input file does not exist!") | |
data = [] | |
with open(fpath, "r", encoding="utf-8", errors="ignore") as fp: | |
for line in fp.readlines(): | |
nums = line.strip().split(" ") | |
x, y = [float(n) for n in nums[:-1]], float(nums[-1]) | |
data.append((x, y)) | |
return data | |
def main() -> int: | |
"""main entry point""" | |
print("\nTesting Simple Linear Regression model:") | |
data = read_input_data_simple(SIMPLE_INPUTS_FILE) | |
model = SimpleLinearRegressionModel(float_precision=3) | |
model.add_data(data) | |
model.fit() | |
# Print weights and test prediction | |
print(model.alpha_value, model.beta_slope) | |
print(model.predict(data[0][0])) | |
print("\nTesting Multi Linear Regression model:") | |
data = read_multi_input_data(MULTI_INPUTS_FILE) | |
model = MultiLinearRegressionModel(float_precision=6) | |
model.add_data(data) | |
model.fit() | |
# Print weights and test prediction | |
print(model.alpha_value, model.beta_values) | |
print(model.predict(data[0][0])) | |
return 0 | |
if __name__ == "__main__": | |
sys_exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment