Last active
February 8, 2024 16:56
-
-
Save jamesanto/da27479e15614c270a738881833aa6d6 to your computer and use it in GitHub Desktop.
Stock Prediction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import math | |
import os.path | |
from calendar import monthrange, isleap | |
from datetime import datetime, date | |
from typing import List | |
import lightning as L | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from lightning.pytorch.utilities.types import STEP_OUTPUT | |
from torch.utils import data as td | |
class ParallelLinear(nn.Module): | |
def __init__(self, in_features, out_features, parallelism): | |
super(ParallelLinear, self).__init__() | |
self.parallelism = parallelism | |
self.weights = nn.Parameter(torch.Tensor(parallelism, out_features, in_features)) | |
self.biases = nn.Parameter(torch.Tensor(1, parallelism, 1, out_features)) | |
# Initialize parameters | |
nn.init.kaiming_uniform_(self.weights, a=math.sqrt(5)) | |
nn.init.zeros_(self.biases) | |
def forward(self, x): | |
output = torch.einsum('bsti,soi->bsto', x, self.weights) + self.biases | |
return output | |
class StockEncoder(nn.Module): | |
def __init__(self, | |
num_stocks, | |
num_features, | |
pre_norm: bool = True): | |
super(StockEncoder, self).__init__() | |
self.query_proj = ParallelLinear(num_features, num_features, num_stocks) | |
self.key_proj = ParallelLinear(num_features, num_features, num_stocks) | |
self.value_proj = ParallelLinear(num_features, num_features, num_stocks) | |
self.mlp = nn.Sequential( | |
ParallelLinear(num_features, num_features * 2, num_stocks), | |
nn.ReLU(), | |
ParallelLinear(num_features * 2, num_features, num_stocks) | |
) | |
self.norm1 = nn.LayerNorm(num_features) | |
self.norm2 = nn.LayerNorm(num_features) | |
self.pre_norm = pre_norm | |
def forward(self, x): | |
# x = [batch_size, num_stocks, time, num_features] | |
if self.pre_norm: | |
x = self.norm1(x) | |
query = self.query_proj(x) | |
key = self.key_proj(x) | |
value = self.value_proj(x) | |
# Scaled Dot Product Attention | |
scaling = query.size(-1) ** 0.5 | |
scores_time = torch.matmul(query, key.transpose(-1, -2)) / scaling | |
scores_time = F.softmax(scores_time, dim=-1) | |
output_time = torch.matmul(scores_time, value) | |
scores_stocks = torch.matmul(query.transpose(1, 2), key.permute(0, 2, 3, 1)) / scaling | |
scores_stocks = F.softmax(scores_stocks, dim=-1) | |
output_stocks = torch.matmul(scores_stocks, value.transpose(1, 2)) | |
output = output_time + output_stocks.transpose(1, 2) | |
output = self.norm2(x + output) | |
outputs = self.mlp(output) | |
outputs = output + outputs | |
if not self.pre_norm: | |
outputs = self.norm1(outputs) | |
return outputs | |
class Model(nn.Module): | |
def __init__(self, | |
num_stocks: int, | |
num_days: int, | |
num_features: int, | |
num_layers: int, | |
max_price: float): | |
super().__init__() | |
self.feature_scaling = ParallelLinear(in_features=num_features, out_features=num_features, | |
parallelism=num_stocks) | |
self.encoders = nn.ModuleList([ | |
StockEncoder(num_stocks=num_stocks, num_features=num_features, pre_norm=True) | |
for _ in range(num_layers) | |
]) | |
self.norm = nn.LayerNorm(num_features) | |
self.output = nn.Linear(in_features=num_days * num_features, out_features=1) | |
self.scaling = num_features ** 0.5 | |
self.max_price = max_price | |
def forward(self, x): | |
out = x | |
out = self.feature_scaling(out) | |
out = out * self.scaling | |
for encoder in self.encoders: | |
out = encoder(out) | |
out = self.norm(out) | |
out = self.output(out.view(out.shape[0], out.shape[1], -1)).squeeze(dim=-1) | |
return out | |
def lightning(self) -> 'ModelLightning': | |
return ModelLightning(self) | |
class ModelLightning(L.LightningModule): | |
def __init__(self, model: Model): | |
super().__init__() | |
self.model = model | |
def _step(self, batch, train: bool = True): | |
features, targets = batch | |
targets = targets | |
predictions = self.model(features)[:, 1:] | |
loss = nn.functional.mse_loss(predictions, targets) | |
l1_loss = nn.functional.l1_loss(predictions * self.model.max_price, targets * self.model.max_price) | |
stage = 'train' if train else 'val' | |
self.log(f'{stage}_loss', loss, prog_bar=True) | |
self.log(f'{stage}_l1_loss', l1_loss, prog_bar=True) | |
return { | |
'loss': loss, | |
'predictions': predictions | |
} | |
def training_step(self, batch) -> STEP_OUTPUT: | |
return self._step(batch, train=True) | |
def validation_step(self, batch) -> STEP_OUTPUT: | |
return self._step(batch, train=False) | |
def configure_optimizers(self): | |
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) | |
return optimizer | |
class Dataset(td.Dataset): | |
def __init__(self, | |
stocks: List[str], | |
dates: List[date], | |
features: torch.Tensor, | |
targets: torch.Tensor, | |
num_days: int, | |
max_price: float): | |
self.stocks = stocks | |
self.dates = dates | |
self.features = features | |
self.targets = targets | |
self.num_days = num_days | |
self.max_price = max_price | |
self.length = len(self.dates) - num_days + 1 | |
def with_num_days(self, num_days: int) -> 'Dataset': | |
return Dataset( | |
stocks=self.stocks, | |
dates=self.dates, | |
features=self.features, | |
targets=self.targets, | |
num_days=num_days, | |
max_price=self.max_price | |
) | |
def save(self, path: str): | |
torch.save(self, path) | |
@classmethod | |
def load(cls, path: str) -> 'Dataset': | |
return torch.load(path) | |
@classmethod | |
def make(cls, path: str, num_days: int): | |
_dates = set() | |
_stock_features = {} | |
max_price = 0.0 | |
max_volume = 0.0 | |
with open(path, 'r') as fd: | |
for row in csv.DictReader(fd): | |
_date = datetime.strptime(row['date'], '%Y-%m-%d').date() | |
_dates.add(_date) | |
_name = row['Name'] | |
features = [ | |
float(row['volume'].strip()) if row['volume'].strip() != '' else 0.0, | |
float(row['open'].strip()) if row['open'].strip() != '' else 0.0, | |
float(row['high'].strip()) if row['high'].strip() != '' else 0.0, | |
float(row['low'].strip()) if row['low'].strip() != '' else 0.0, | |
float(row['close'].strip()) if row['close'].strip() != '' else 0.0 | |
] | |
max_price = max(max_price, max(features[1:])) | |
max_volume = max(max_volume, features[0]) | |
features_by_date = _stock_features.get(_name, {}) | |
features_by_date[_date] = features | |
_stock_features[_name] = features_by_date | |
stocks = list(_stock_features.keys()) | |
dates = list(_dates) | |
dates.sort() | |
_empty_features = [0.0, 0.0, 0.0, 0.0, 0.0] | |
features = [] | |
targets = [] | |
for stock in stocks: | |
stock_features = [] | |
target_features = [] | |
for i in range(len(dates) - 1): | |
_date = dates[i] | |
_features = _stock_features.get(stock, {}).get(_date, _empty_features) | |
stock_features.append(_features) | |
_next_date = dates[i + 1] | |
_next_features = _stock_features.get(stock, {}).get(_next_date, _empty_features) | |
target_features.append(_next_features[-1]) | |
features.append(stock_features) | |
targets.append(target_features) | |
_date_features = [] | |
for _date in dates[:-1]: | |
_date_features.append([ | |
_date.weekday() / 6, # day of week | |
_date.day / monthrange(_date.year, _date.month)[1], # day of month | |
_date.day / (365 + isleap(_date.year)), # day of year | |
_date.month / 12, # month of year | |
(_date.year - 2000) / 2000 | |
]) | |
features = torch.tensor(features) / torch.tensor([max_volume, max_price, max_price, max_price, max_price]) | |
features = torch.cat([torch.tensor(_date_features).unsqueeze(0), features]) | |
targets = torch.tensor(targets) / max_price | |
return cls( | |
stocks=stocks, | |
dates=dates[:-1], | |
features=features, | |
targets=targets, | |
num_days=num_days, | |
max_price=max_price | |
) | |
@classmethod | |
def make_or_load(cls, | |
csv_path: str, | |
num_days: int, | |
save_path: str, | |
save_if_not_exists: bool = True) -> 'Dataset': | |
if os.path.exists(save_path): | |
ret = cls.load(save_path) | |
else: | |
ret = cls.make(path=csv_path, num_days=num_days) | |
if save_if_not_exists: | |
ret.save(save_path) | |
return ret | |
def __len__(self): | |
return self.length | |
def __getitem__(self, idx): | |
if idx > self.length: | |
raise StopIteration() | |
features = self.features[:, idx:idx + self.num_days, :] | |
targets = self.targets[:, idx + self.num_days - 1] | |
return features, targets | |
def split(self, factor: float): | |
len1 = int(len(self) * factor) | |
ds1 = Dataset( | |
stocks=self.stocks, | |
dates=self.dates[:len1], | |
features=self.features[:, :len1, :], | |
targets=self.targets[:, :len1], | |
num_days=self.num_days, | |
max_price=self.max_price, | |
) | |
ds2 = Dataset( | |
stocks=self.stocks, | |
dates=self.dates[len1:], | |
features=self.features[:, len1:, :], | |
targets=self.targets[:, len1:], | |
num_days=self.num_days, | |
max_price=self.max_price | |
) | |
return ds1, ds2 | |
def main(): | |
batch_size = 4 | |
num_layers = 2 | |
num_days = 10 | |
ds = Dataset.make_or_load( | |
csv_path='data/raw/all_stocks_5yr.csv', | |
num_days=num_days, | |
save_path='/tmp/t1.ds' | |
) | |
print(len(ds)) | |
ds_train, ds_val = ds.split(0.8) | |
dl_train = td.DataLoader(ds_train, batch_size=batch_size, shuffle=True) | |
dl_val = td.DataLoader(ds_val, batch_size=batch_size) | |
model = Model( | |
num_stocks=len(ds.stocks) + 1, | |
num_days=num_days, | |
num_features=ds.features.shape[-1], | |
num_layers=num_layers, | |
max_price=ds.max_price | |
) | |
trainer = L.Trainer( | |
max_epochs=5, | |
val_check_interval=1.0, | |
num_sanity_val_steps=1, | |
check_val_every_n_epoch=1 | |
) | |
trainer.fit( | |
model=model.lightning(), | |
train_dataloaders=dl_train, | |
val_dataloaders=dl_val, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment