Skip to content

Instantly share code, notes, and snippets.

@testpageAN
Created April 21, 2025 10:12
Show Gist options
  • Save testpageAN/6c97207912e653e7c1b1791391a3c7bd to your computer and use it in GitHub Desktop.
Save testpageAN/6c97207912e653e7c1b1791391a3c7bd to your computer and use it in GitHub Desktop.
test_gist
# backtest_equity_plot.py
import pandas as pd
import joblib
import numpy as np
import matplotlib.pyplot as plt
from feature_engineering import create_features_v2
# === Ρυθμίσεις ===
DATA_PATH = "datasets/BTCUSDT_15m.csv"
MODEL_PATH = "ensemble_model.pkl"
TP_PCT = 0.02
SL_PCT = 0.01
HOLD_PERIOD = 5
# === Φόρτωση δεδομένων
df = pd.read_csv(DATA_PATH, parse_dates=["Date"], index_col="Date")
X, y, full_df = create_features_v2(df)
model = joblib.load(MODEL_PATH)
preds = model.predict(X)
full_df = full_df.iloc[-len(preds):].copy()
full_df["prediction"] = preds
full_df["return_pct"] = 0.0
# === Backtest Trades
for i in range(len(full_df) - HOLD_PERIOD):
signal = full_df["prediction"].iloc[i]
entry_price = full_df["Close"].iloc[i]
future_prices = full_df["Close"].iloc[i+1:i+HOLD_PERIOD+1].values
if signal == 1: # LONG
max_return = max((p - entry_price) / entry_price for p in future_prices)
min_return = min((p - entry_price) / entry_price for p in future_prices)
if max_return >= TP_PCT:
full_df.iloc[i, full_df.columns.get_loc("return_pct")] = TP_PCT
elif min_return <= -SL_PCT:
full_df.iloc[i, full_df.columns.get_loc("return_pct")] = -SL_PCT
elif signal == 0: # SHORT
max_return = max((entry_price - p) / entry_price for p in future_prices)
min_return = min((entry_price - p) / entry_price for p in future_prices)
if max_return >= TP_PCT:
full_df.iloc[i, full_df.columns.get_loc("return_pct")] = TP_PCT
elif min_return <= -SL_PCT:
full_df.iloc[i, full_df.columns.get_loc("return_pct")] = -SL_PCT
# === Φιλτράρισμα
trades = full_df[full_df["return_pct"] != 0.0].copy()
trades["equity_curve"] = (1 + trades["return_pct"]).cumprod()
# === Υπολογισμός Στατιστικών
total_return = trades["equity_curve"].iloc[-1] - 1
max_drawdown = (trades["equity_curve"] / trades["equity_curve"].cummax() - 1).min()
sharpe = trades["return_pct"].mean() / trades["return_pct"].std()
win_rate = (trades["return_pct"] > 0).mean()
loss_rate = (trades["return_pct"] < 0).mean()
expectancy = trades[trades["return_pct"] > 0]["return_pct"].mean() / abs(trades[trades["return_pct"] < 0]["return_pct"].mean())
# === Εκτύπωση
print("=== BACKTEST RESULTS ===")
print(f"Trades : {len(trades)}")
print(f"Total Return : {total_return:.2%}")
print(f"Max Drawdown : {max_drawdown:.2%}")
print(f"Sharpe Ratio : {sharpe:.2f}")
print(f"Win Rate : {win_rate:.2%}")
print(f"Loss Rate : {loss_rate:.2%}")
print(f"Expectancy : {expectancy:.2f}")
# === Plot
plt.figure(figsize=(10, 5))
plt.plot(trades.index, trades["equity_curve"], label="Equity Curve", linewidth=2)
plt.title("📈 Equity Curve από ML Predictions με SL/TP")
plt.xlabel("Ημερομηνία")
plt.ylabel("Cumulative Return")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
# backtest_sl_tp.py
import pandas as pd
import joblib
import numpy as np
from feature_engineering import create_features_v2
# === Ρυθμίσεις ===
DATA_PATH = "datasets/BTCUSDT_15m.csv"
MODEL_PATH = "ensemble_model.pkl"
TP_PCT = 0.02 # 2% take profit
SL_PCT = 0.01 # 1% stop loss
HOLD_PERIOD = 5 # bars
# === Φόρτωση δεδομένων & χαρακτηριστικών
df = pd.read_csv(DATA_PATH, parse_dates=["Date"], index_col="Date")
X, y, full_df = create_features_v2(df)
model = joblib.load(MODEL_PATH)
preds = model.predict(X)
full_df = full_df.iloc[-len(preds):].copy()
full_df["prediction"] = preds
full_df["result"] = 0
full_df["return"] = 0.0
# === Υλοποίηση στρατηγικής SL/TP
for i in range(len(full_df) - HOLD_PERIOD):
signal = full_df["prediction"].iloc[i]
entry_price = full_df["Close"].iloc[i]
future_prices = full_df["Close"].iloc[i+1:i+HOLD_PERIOD+1].values
if signal == 1: # LONG
max_return = max((p - entry_price) / entry_price for p in future_prices)
min_return = min((p - entry_price) / entry_price for p in future_prices)
if max_return >= TP_PCT:
full_df.iloc[i, full_df.columns.get_loc("result")] = 1
full_df.iloc[i, full_df.columns.get_loc("return")] = TP_PCT
elif min_return <= -SL_PCT:
full_df.iloc[i, full_df.columns.get_loc("result")] = -1
full_df.iloc[i, full_df.columns.get_loc("return")] = -SL_PCT
elif signal == 0: # SHORT
max_return = max((entry_price - p) / entry_price for p in future_prices)
min_return = min((entry_price - p) / entry_price for p in future_prices)
if max_return >= TP_PCT:
full_df.iloc[i, full_df.columns.get_loc("result")] = 1
full_df.iloc[i, full_df.columns.get_loc("return")] = TP_PCT
elif min_return <= -SL_PCT:
full_df.iloc[i, full_df.columns.get_loc("result")] = -1
full_df.iloc[i, full_df.columns.get_loc("return")] = -SL_PCT
# === Αφαίρεση μηδενικών
trades = full_df[full_df["result"] != 0]
# === Υπολογισμός Metrics
total_return = trades["return"].sum()
avg_return = trades["return"].mean()
win_rate = (trades["result"] == 1).mean()
loss_rate = (trades["result"] == -1).mean()
sharpe_ratio = trades["return"].mean() / trades["return"].std()
expectancy = trades[trades["result"] == 1]["return"].mean() / abs(trades[trades["result"] == -1]["return"].mean())
print("=== BACKTEST RESULTS ===")
print(f"Total Trades : {len(trades)}")
print(f"Total Return : {total_return:.4f}")
print(f"Average Return : {avg_return:.4f}")
print(f"Win Rate : {win_rate:.2%}")
print(f"Loss Rate : {loss_rate:.2%}")
print(f"Expectancy (Win/Loss Ratio): {expectancy:.2f}")
print(f"Sharpe Ratio : {sharpe_ratio:.2f}")
# config.py
# === Binance Settings ===
SYMBOL = "BTCUSDT" # [ml_trader_old.py, data_utils.py, run_scheduler.py]
# Το σύμβολο του crypto pair για trading και για λήψη δεδομένων (π.χ. BTC/USDT)
INTERVAL = "15m" # [ml_trader_old.py, data_utils.py, run_scheduler.py]
# Το χρονικό διάστημα κάθε candlestick (π.χ. 1m, 15m, 1h)
LEVERAGE = 5 # [ml_trader_old.py]
# Μόχλευση που θα εφαρμοστεί στις θέσεις στο Binance Futures
# === Trading Thresholds ===
CONF_UPPER = 0.55 # [ml_trader_old.py, retrain_model.py]
# Ελάχιστη πιθανότητα (confidence) για να εκτελεστεί trade (Long/Short)
CONF_LOWER = 0.45 # [ml_trader_old.py]
# Κατώφλι κάτω από το οποίο θεωρούμε ότι δεν υπάρχει αρκετή εμπιστοσύνη — καμία εντολή
# === Trade Management ===
POSITION_SIZE_PCT = 0.05 # [ml_trader_old.py]
# Ποσοστό του διαθέσιμου κεφαλαίου που θα χρησιμοποιείται σε κάθε trade
STOP_LOSS_PCT = 0.02 # [ml_trader_old.py]
# Ποσοστό Stop Loss — αν η τιμή κινηθεί αρνητικά κατά 2%, κλείνουμε τη θέση
TAKE_PROFIT_PCT = 0.04 # [ml_trader_old.py]
# Ποσοστό Take Profit — αν η τιμή κινηθεί θετικά κατά 4%, κλείνουμε τη θέση
# === Risk Management ===
MAX_CONSECUTIVE_LOSSES = 3 # [ml_trader_old.py]
# Αν χάσουμε 3 συνεχόμενες φορές, παγώνουμε τις συναλλαγές
MAX_DRAWDOWN_PCT = 0.10 # [ml_trader_old.py]
# Αν το κεφάλαιο πέσει >10% από το peak, παγώνουμε το trading
AUTO_RESUME = True # [ml_trader_old.py]
# Αν είναι True, τότε το trading συνεχίζεται αυτόματα μετά από παύση λόγω ρίσκου
RESUME_HOURS = 6 # [ml_trader_old.py]
# Χρόνος αναμονής (σε ώρες) πριν ενεργοποιηθεί το auto resume μετά από pause
# === File Paths ===
MODEL_PATH = "ensemble_model.pkl" # [ml_trader_old.py, optimize_threshold.py, retrain_model.py]
# Το αρχείο του αποθηκευμένου εκπαιδευμένου ML μοντέλου
DATA_PATH = "datasets/BTCUSDT_15m.csv" # [ml_trader_old.py, retrain_model.py, data_utils.py]
# CSV αρχείο με τα ιστορικά candlesticks που χρησιμοποιεί ο trader
RISK_PATH = "risk_state.json" # [ml_trader_old.py]
# JSON αρχείο που αποθηκεύει την τρέχουσα κατάσταση του Risk Management
TRADES_LOG_PATH = "logs/trades.csv" # [ml_trader_old.py]
# Αρχείο καταγραφής όλων των trades (entry/exit, τιμές, confidence, αποτέλεσμα)
LOG_FILE = "logs/ml_trader.log" # [ml_trader_old.py, ml_trading_loop.py, run_scheduler.py]
# Log αρχείο για καταγραφή της λειτουργίας του trader και debugging
# === Report Settings ===
DAILY_REPORT_PATH = "logs/trading_report.txt" # [trading_report.py]
# Ημερήσια αναφορά των trades (αποτελέσματα, PnL, αριθμός επιτυχιών κ.λπ.)
import os
import time
import pandas as pd
from datetime import datetime, timedelta
from binance.client import Client
from config import DATA_PATH
# ✅ Από .env ή environment
api_key = os.getenv("api_key")
api_secret = os.getenv("secret_key")
client = Client(api_key, api_secret)
def initialize_historical_data(symbol="BTCUSDT", interval="15m", days=120, path="datasets/BTCUSDT_15m.csv"):
interval_map = {
"1m": 60 * 1000,
"3m": 3 * 60 * 1000,
"5m": 5 * 60 * 1000,
"15m": 15 * 60 * 1000,
"30m": 30 * 60 * 1000,
"1h": 60 * 60 * 1000,
"2h": 2 * 60 * 60 * 1000,
"4h": 4 * 60 * 60 * 1000,
"6h": 6 * 60 * 60 * 1000,
"8h": 8 * 60 * 60 * 1000,
"12h": 12 * 60 * 60 * 1000,
"1d": 24 * 60 * 60 * 1000
}
interval_ms = interval_map[interval]
end_time = int(datetime.utcnow().timestamp() * 1000)
start_time = end_time - days * 24 * 60 * 60 * 1000
all_data = []
while start_time < end_time:
try:
klines = client.get_klines(symbol=symbol, interval=interval, startTime=start_time, endTime=end_time, limit=1000)
if not klines:
break
all_data.extend(klines)
last_open_time = klines[-1][0]
start_time = last_open_time + interval_ms
time.sleep(0.1)
except Exception as e:
print(f"Σφάλμα στο κατέβασμα δεδομένων: {e}")
time.sleep(1)
df = pd.DataFrame(all_data, columns=[
'timestamp', 'Open', 'High', 'Low', 'Close', 'Volume',
'Close_time', 'Quote_asset_volume', 'Number_of_trades',
'Taker_buy_base_volume', 'Taker_buy_quote_volume', 'Ignore'
])
# Κράτα μόνο τις απαραίτητες στήλες για το trading
df = df[['timestamp', 'Open', 'High', 'Low', 'Close', 'Volume']]
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df.set_index("timestamp", inplace=True)
df = df.astype({
"Open": float,
"High": float,
"Low": float,
"Close": float,
"Volume": float
})
df.to_csv(path)
print(f"Αποθηκεύτηκε στο: {path} ({len(df)} γραμμές)")
return df
def update_historical_data(symbol="BTCUSDT", interval="15m", path="datasets/BTCUSDT_15m.csv"):
if not os.path.exists(path):
return initialize_historical_data(symbol=symbol, interval=interval, days=5, path=path)
df = pd.read_csv(path, parse_dates=["timestamp"], index_col="timestamp")
last_timestamp = df.index[-1]
start_time = int((last_timestamp + pd.Timedelta(minutes=1)).timestamp() * 1000)
end_time = int(datetime.utcnow().timestamp() * 1000)
new_data = []
interval_map = {
"1m": 60 * 1000,
"3m": 3 * 60 * 1000,
"5m": 5 * 60 * 1000,
"15m": 15 * 60 * 1000,
"30m": 30 * 60 * 1000,
"1h": 60 * 60 * 1000,
"2h": 2 * 60 * 60 * 1000,
"4h": 4 * 60 * 60 * 1000,
"6h": 6 * 60 * 60 * 1000,
"8h": 8 * 60 * 60 * 1000,
"12h": 12 * 60 * 60 * 1000,
"1d": 24 * 60 * 60 * 1000
}
interval_ms = interval_map[interval]
while start_time < end_time:
try:
klines = client.get_klines(symbol=symbol, interval=interval, startTime=start_time, endTime=end_time, limit=1000)
if not klines:
break
new_data.extend(klines)
last_open_time = klines[-1][0]
start_time = last_open_time + interval_ms
time.sleep(0.1)
except Exception as e:
print(f"Σφάλμα στην ενημέρωση δεδομένων: {e}")
time.sleep(1)
if new_data:
# new_df = pd.DataFrame(new_data, columns=df.columns)
new_df = pd.DataFrame(new_data, columns=[
"timestamp", "Open", "High", "Low", "Close", "Volume",
"Close_time", "Quote_asset_volume", "Number_of_trades",
"Taker_buy_base", "Taker_buy_quote", "Ignore"
])
new_df["timestamp"] = pd.to_datetime(new_df["timestamp"], unit="ms")
new_df.set_index("timestamp", inplace=True)
new_df = new_df.astype({
"Open": float,
"High": float,
"Low": float,
"Close": float,
"Volume": float
})
updated_df = pd.concat([df, new_df])
updated_df = updated_df[~updated_df.index.duplicated(keep='last')]
updated_df.to_csv(path)
print(f"Ενημερώθηκε το αρχείο: {path} ({len(updated_df)} γραμμές)")
else:
print("Δεν υπάρχουν νέα δεδομένα.")
# ensemble_utils.py
import os
import time
import pandas as pd
import numpy as np
import joblib
import logging
from datetime import datetime, timezone, timedelta
from binance.client import Client
from binance.enums import *
from feature_engineering import create_features
from telegram_utils import send_telegram_message
from sklearn.ensemble import VotingClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
# Ensure directories exist
os.makedirs("datasets", exist_ok=True)
os.makedirs("logs", exist_ok=True)
# --------------- 🔨️ Logging Setup ---------------
BASE_DIR = os.path.dirname(__file__)
log_dir = os.path.join(BASE_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "ml_trader.log")
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# --------------- 🔐 Binance Setup ---------------
api_key = os.getenv("api_key")
api_secret = os.getenv("secret_key")
client = Client(api_key=api_key, api_secret=api_secret, tld="com", testnet=True)
# --------------- ⚙️ Feature Engineering ---------------
def create_multiple_feature_sets(df):
features1 = create_features(df)[0]
features2 = create_features(df, rsi_period=10)[0]
features3 = create_features(df, rsi_period=20)[0]
return features1, features2, features3
# --------------- 🤖 Model Training ---------------
def train_ensemble_model(X1, X2, X3, y):
model1 = XGBClassifier(n_estimators=50, random_state=0, use_label_encoder=False, eval_metric='logloss')
model2 = LogisticRegression(max_iter=1000)
model3 = RandomForestClassifier(n_estimators=100, random_state=0)
model1.fit(X1, y)
model2.fit(X2, y)
model3.fit(X3, y)
ensemble = VotingClassifier(
estimators=[('xgb', model1), ('lr', model2), ('rf', model3)],
voting='soft'
)
ensemble.fit(X1, y)
return ensemble
# --------------- 🧠 Prediction with Confidence ---------------
def predict_with_confidence(model, X):
proba = model.predict_proba(X)
avg_proba = np.mean(proba, axis=0)
prediction = np.argmax(avg_proba)
confidence = np.max(avg_proba)
return prediction, confidence
# --------------- 📤 Save & Load Ensemble ---------------
def save_model(model, path):
joblib.dump(model, path)
def load_model(path):
return joblib.load(path)
# feature_engineering.py
import pandas as pd
import numpy as np
from typing import Tuple
import ta
def compute_rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def compute_macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
ema_fast = series.ewm(span=fast, adjust=False).mean()
ema_slow = series.ewm(span=slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal, adjust=False).mean()
macd_hist = macd_line - signal_line
return macd_hist
def compute_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
high_low = df["High"] - df["Low"]
high_close = np.abs(df["High"] - df["Close"].shift())
low_close = np.abs(df["Low"] - df["Close"].shift())
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
return tr.rolling(window=period).mean()
def compute_vwap(df: pd.DataFrame) -> pd.Series:
cum_vol_price = (df["Close"] * df["Volume"]).cumsum()
cum_vol = df["Volume"].cumsum()
return cum_vol_price / cum_vol
def compute_obv(df: pd.DataFrame) -> pd.Series:
obv = [0]
for i in range(1, len(df)):
if df["Close"].iloc[i] > df["Close"].iloc[i - 1]:
obv.append(obv[-1] + df["Volume"].iloc[i])
elif df["Close"].iloc[i] < df["Close"].iloc[i - 1]:
obv.append(obv[-1] - df["Volume"].iloc[i])
else:
obv.append(obv[-1])
return pd.Series(obv, index=df.index)
def create_features_for_latest(df):
df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
df["return"] = df["Close"].pct_change()
df["log_ret"] = np.log(df["Close"] / df["Close"].shift(1))
df["rsi"] = compute_rsi(df["Close"], period=14)
df["macd_hist"] = compute_macd(df["Close"], fast=12, slow=26, signal=9)
df["atr"] = compute_atr(df, period=14)
df["vwap"] = compute_vwap(df)
df["obv"] = compute_obv(df)
df["close_lag1"] = df["Close"].shift(1)
df["volume_lag1"] = df["Volume"].shift(1)
df["rsi_lag1"] = df["rsi"].shift(1)
df["macd_hist_lag1"] = df["macd_hist"].shift(1)
df["rolling_std_10"] = df["return"].rolling(window=10).std()
return df.dropna().iloc[[-1]]
def create_features_v2(df, threshold: float = 0.003):
df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
# 1. Βασικά returns και volatility
df["return"] = df["Close"].pct_change()
df["log_ret"] = np.log(df["Close"] / df["Close"].shift(1))
# 2. Υπολογισμός future return (5 βήματα μπροστά)
df["future_return"] = (df["Close"].shift(-5) - df["Close"]) / df["Close"]
# 3. Σήμα μόνο αν η κίνηση είναι > 0.3% ή < -0.3%, αλλιώς NaN
df["target"] = np.where(df["future_return"] > threshold, 1,
np.where(df["future_return"] < -threshold, 0, np.nan))
# 4. Προσθήκη RSI
df["rsi"] = compute_rsi(df["Close"], period=14)
# 5. Προσθήκη MACD Histogram
df["macd_hist"] = compute_macd(df["Close"], fast=12, slow=26, signal=9)
# 6. Προσθήκη ATR (Average True Range)
df["atr"] = compute_atr(df, period=14)
# 7. Προσθήκη VWAP
df["vwap"] = compute_vwap(df)
# 8. Προσθήκη OBV
df["obv"] = compute_obv(df)
# 9. Lagged features
df["close_lag1"] = df["Close"].shift(1)
df["volume_lag1"] = df["Volume"].shift(1)
df["rsi_lag1"] = df["rsi"].shift(1)
df["macd_hist_lag1"] = df["macd_hist"].shift(1)
# 10. Rolling standard deviation (τοπική μεταβλητότητα)
df["rolling_std_10"] = df["return"].rolling(window=10).std()
return df.dropna(subset=["target", "rsi", "macd_hist", "atr", "vwap", "obv",
"close_lag1", "volume_lag1", "rsi_lag1", "macd_hist_lag1",
"rolling_std_10"]).drop(columns=["future_return"]), \
df.dropna(subset=["target", "rsi", "macd_hist", "atr", "vwap", "obv",
"close_lag1", "volume_lag1", "rsi_lag1", "macd_hist_lag1",
"rolling_std_10"])["target"], df
def feature_engineering_v3(df):
df = df.copy()
# === Standard Features ===
df["return"] = df["Close"].pct_change()
df["log_ret"] = np.log(df["Close"] / df["Close"].shift(1))
# === Technical Indicators ===
df["rsi"] = ta.momentum.RSIIndicator(close=df["Close"], window=14).rsi()
df["macd_hist"] = ta.trend.MACD(close=df["Close"]).macd_diff()
df["atr"] = ta.volatility.AverageTrueRange(high=df["High"], low=df["Low"], close=df["Close"]).average_true_range()
df["vwap"] = (df["Volume"] * (df["High"] + df["Low"] + df["Close"]) / 3).cumsum() / df["Volume"].cumsum()
df["obv"] = ta.volume.OnBalanceVolumeIndicator(close=df["Close"], volume=df["Volume"]).on_balance_volume()
# === Advanced Technical Indicators ===
df["adx"] = ta.trend.ADXIndicator(high=df["High"], low=df["Low"], close=df["Close"]).adx()
df["williams_r"] = ta.momentum.WilliamsRIndicator(high=df["High"], low=df["Low"], close=df["Close"]).williams_r()
df["cmf"] = ta.volume.ChaikinMoneyFlowIndicator(high=df["High"], low=df["Low"], close=df["Close"], volume=df["Volume"]).chaikin_money_flow()
df["roc"] = ta.momentum.ROCIndicator(close=df["Close"]).roc()
df["sma_fast"] = ta.trend.SMAIndicator(close=df["Close"], window=5).sma_indicator()
df["sma_slow"] = ta.trend.SMAIndicator(close=df["Close"], window=20).sma_indicator()
df["sma_ratio"] = df["sma_fast"] / df["sma_slow"]
# === Lag Features ===
df["close_lag1"] = df["Close"].shift(1)
df["volume_lag1"] = df["Volume"].shift(1)
df["rsi_lag1"] = df["rsi"].shift(1)
df["macd_hist_lag1"] = df["macd_hist"].shift(1)
# === Volatility Proxy ===
df["rolling_std_10"] = df["log_ret"].rolling(window=10).std()
# === Time-Based Features ===
df["hour"] = df.index.hour
df["day_of_week"] = df.index.dayofweek
df.dropna(inplace=True)
return df
import pandas as pd
import numpy as np
import ta
def feature_engineering_v3(df):
print("✅ Ξεκίνησε η feature_engineering_v3")
print("📊 Αρχικό μέγεθος:", len(df))
# 🔧 Drop any irrelevant Binance CSV columns
df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
df = df.copy()
# === Return & Log Return ===
df["return"] = df["Close"].pct_change()
df["log_ret"] = np.log(df["Close"] / df["Close"].shift(1))
# === Τεχνικοί δείκτες ===
df["rsi"] = ta.momentum.RSIIndicator(close=df["Close"], window=14).rsi()
print("📌 Μετά RSI:", df["rsi"].notna().sum())
df["macd_hist"] = ta.trend.MACD(close=df["Close"]).macd_diff()
print("📌 Μετά MACD:", df["macd_hist"].notna().sum())
df["atr"] = ta.volatility.AverageTrueRange(high=df["High"], low=df["Low"], close=df["Close"]).average_true_range()
print("📌 Μετά ATR:", df["atr"].notna().sum())
df["vwap"] = (df["Volume"] * (df["High"] + df["Low"] + df["Close"]) / 3).cumsum() / df["Volume"].cumsum()
print("📌 Μετά VWAP:", df["vwap"].notna().sum())
df["obv"] = ta.volume.OnBalanceVolumeIndicator(close=df["Close"], volume=df["Volume"]).on_balance_volume()
print("📌 Μετά OBV:", df["obv"].notna().sum())
# === Advanced indicators ===
df["adx"] = ta.trend.ADXIndicator(high=df["High"], low=df["Low"], close=df["Close"]).adx()
print("📌 Μετά ADX:", df["adx"].notna().sum())
df["williams_r"] = ta.momentum.WilliamsRIndicator(high=df["High"], low=df["Low"], close=df["Close"]).williams_r()
print("📌 Μετά williams_r:", df["williams_r"].notna().sum())
df["cmf"] = ta.volume.ChaikinMoneyFlowIndicator(high=df["High"], low=df["Low"], close=df["Close"], volume=df["Volume"]).chaikin_money_flow()
print("📌 Μετά cmf:", df["cmf"].notna().sum())
df["roc"] = ta.momentum.ROCIndicator(close=df["Close"]).roc()
print("📌 Μετά roc:", df["roc"].notna().sum())
df["sma_fast"] = ta.trend.SMAIndicator(close=df["Close"], window=5).sma_indicator()
df["sma_slow"] = ta.trend.SMAIndicator(close=df["Close"], window=20).sma_indicator()
df["sma_ratio"] = df["sma_fast"] / df["sma_slow"]
print("📌 Μετά SMA ratio:", df["sma_ratio"].notna().sum())
# === Lag Features ===
df["close_lag1"] = df["Close"].shift(1)
df["volume_lag1"] = df["Volume"].shift(1)
df["rsi_lag1"] = df["rsi"].shift(1)
df["macd_hist_lag1"] = df["macd_hist"].shift(1)
# === Volatility Proxy ===
df["rolling_std_10"] = df["log_ret"].rolling(window=10).std()
# === Χρονικά χαρακτηριστικά ===
df["hour"] = df.index.hour
df["day_of_week"] = df.index.dayofweek
# === Στόχος ===
df["future_return"] = (df["Close"].shift(-5) - df["Close"]) / df["Close"]
df["target"] = np.where(df["future_return"] > 0.003, 1,
np.where(df["future_return"] < -0.003, 0, np.nan))
print("📌 Μετά target:", df["target"].notna().sum())
# === Έλεγχος ελλείψεων σε target-valid data ===
df_valid_target = df[df["target"].notna()]
print("📏 Γραμμές με έγκυρο target:", len(df_valid_target))
missing = df_valid_target.isna().sum()
print("🔍 Απουσίες ανά στήλη (σε target-valid δείγματα):")
print(missing[missing > 0].sort_values(ascending=False))
df = df.drop(columns=["future_return"]) # ΑΦΑΙΡΕΣΗ label leakage
# === Τελικό dropna ===
df = df.dropna()
print("📏 Dataset μέγεθος μετά το dropna:", len(df))
return df
# ml_trader_old.py (με Risk Management Layer + Exit Logic)
import os
import time
import json
import csv
import pandas as pd
import numpy as np
import joblib
import logging
from pathlib import Path
from datetime import datetime, timezone, timedelta
from binance.client import Client
from binance.enums import *
from feature_engineering import create_features_for_latest
from telegram_utils import send_telegram_message
from data_utils import update_historical_data
from config import SYMBOL, INTERVAL, LEVERAGE, POSITION_SIZE_PCT, STOP_LOSS_PCT, TAKE_PROFIT_PCT, DATA_PATH, CONF_UPPER, CONF_LOWER
# === Logging Setup ===
os.makedirs("logs", exist_ok=True)
log_file = os.path.join("logs", "ml_trader.log")
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# === Binance Setup ===
api_key = os.getenv("api_key")
api_secret = os.getenv("secret_key")
client = Client(api_key=api_key, api_secret=api_secret, tld="com", testnet=True)
# === Settings ===
symbol = SYMBOL
interval = INTERVAL
leverage = LEVERAGE
model_path = "ensemble_model.pkl"
risk_path = "risk_state.json"
position = 0
POSITION_SIZE_PCT = 0.05
STOP_LOSS_PCT = 0.02
TAKE_PROFIT_PCT = 0.04
DATA_PATH = "datasets/BTCUSDT_15m.csv"
CONF_UPPER = 0.55
CONF_LOWER = 0.45
# === Risk Control ===
def load_risk_state():
if not os.path.exists(risk_path):
state = {
"paused": False,
"equity_curve": [100.0],
"consecutive_losses": 0,
"max_consecutive_losses": 3,
"max_drawdown_pct": 0.1,
"auto_resume": True,
"resume_hours": 6,
"pause_timestamp": None
}
save_risk_state(state)
with open(risk_path, "r") as f:
return json.load(f)
def save_risk_state(state):
with open(risk_path, "w") as f:
json.dump(state, f, indent=4)
# === Trade Functions ===
def get_balance():
balance_info = client.futures_account_balance()
usdt_balance = next(item for item in balance_info if item['asset'] == 'USDT')
return float(usdt_balance['balance'])
def calculate_quantity(entry_price):
balance = get_balance()
amount_to_use = balance * POSITION_SIZE_PCT
return round(amount_to_use / entry_price, 3)
def log_trade(date, signal, entry, sl, tp, exit_price, result, confidence):
log_path = Path("logs/trades.csv")
is_new = not log_path.exists()
with open(log_path, "a", newline="") as f:
writer = csv.writer(f)
if is_new:
writer.writerow(["Date", "Signal", "Entry", "SL", "TP", "Exit Price", "Result", "Confidence"])
writer.writerow([date, signal, entry, sl, tp, exit_price, result, confidence])
def log_daily_equity():
state = load_risk_state()
today = datetime.utcnow().strftime("%Y-%m-%d")
equity = state["equity_curve"][-1]
report_path = Path("logs/daily_report.csv")
is_new = not report_path.exists()
with open(report_path, "a", newline="") as f:
writer = csv.writer(f)
if is_new:
writer.writerow(["Date", "Equity"])
writer.writerow([today, equity])
# def close_position(entry_price):
# global position
# side = SIDE_SELL if position == 1 else SIDE_BUY
# client.futures_cancel_all_open_orders(symbol=symbol)
# quantity = calculate_quantity(entry_price)
# client.futures_create_order(
# symbol=symbol,
# side=side,
# type=ORDER_TYPE_MARKET,
# quantity=quantity
# )
# msg = f"🔄 Κλείσιμο θέσης {('LONG' if position == 1 else 'SHORT')} στην τιμή {entry_price:.2f}"
# send_telegram_message(msg)
# logging.info(msg)
# position = 0
def close_position(entry_price, position_side, confidence):
global position
side = SIDE_SELL if position == 1 else SIDE_BUY
client.futures_cancel_all_open_orders(symbol=symbol)
quantity = calculate_quantity(entry_price)
# ✅ Get current market price as exit_price
ticker = client.futures_mark_price(symbol=symbol)
exit_price = float(ticker["markPrice"])
# ✅ Calculate result based on direction
if position_side == "LONG":
result = (exit_price - entry_price) / entry_price
elif position_side == "SHORT":
result = (entry_price - exit_price) / entry_price
else:
result = 0
# ✅ Log the trade here
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
sl_price = entry_price * (1 - STOP_LOSS_PCT) if position_side == "LONG" else entry_price * (1 + STOP_LOSS_PCT)
tp_price = entry_price * (1 + TAKE_PROFIT_PCT) if position_side == "LONG" else entry_price * (1 - TAKE_PROFIT_PCT)
# log_trade(
# now_str,
# position_side,
# round(entry_price, 2),
# round(sl_price, 2),
# round(tp_price, 2),
# round(exit_price, 2),
# round(result, 4),
# confidence
# )
client.futures_create_order(
symbol=symbol,
side=side,
type=ORDER_TYPE_MARKET,
quantity=quantity
)
msg = f"🔄 Κλείσιμο θέσης {('LONG' if position == 1 else 'SHORT')} στην τιμή {exit_price:.2f}"
send_telegram_message(msg)
logging.info(msg)
position = 0
def place_order(position_side, entry_price, confidence):
quantity = calculate_quantity(entry_price)
side = SIDE_BUY if position_side == "LONG" else SIDE_SELL
client.futures_cancel_all_open_orders(symbol=symbol)
client.futures_create_order(
symbol=symbol,
side=side,
type=ORDER_TYPE_MARKET,
quantity=quantity
)
sl_price = entry_price * (1 - STOP_LOSS_PCT) if position_side == "LONG" else entry_price * (1 + STOP_LOSS_PCT)
tp_price = entry_price * (1 + TAKE_PROFIT_PCT) if position_side == "LONG" else entry_price * (1 - TAKE_PROFIT_PCT)
client.futures_create_order(
symbol=symbol,
side=SIDE_SELL if position_side == "LONG" else SIDE_BUY,
type=FUTURE_ORDER_TYPE_STOP_MARKET,
stopPrice=round(sl_price, 2),
closePosition=True,
timeInForce=TIME_IN_FORCE_GTC
)
client.futures_create_order(
symbol=symbol,
side=SIDE_SELL if position_side == "LONG" else SIDE_BUY,
type=FUTURE_ORDER_TYPE_TAKE_PROFIT_MARKET,
stopPrice=round(tp_price, 2),
closePosition=True,
timeInForce=TIME_IN_FORCE_GTC
)
result = TAKE_PROFIT_PCT
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
# log_trade(now_str, position_side, entry_price, round(sl_price, 2), round(tp_price, 2), result, confidence)
msg = (
f"📈 Νέα Θέση: {position_side}\n"
f"🔹 Τιμή Εισόδου: {entry_price:.2f}\n"
f"🔻 Stop Loss: {sl_price:.2f}\n"
f"🎯 Take Profit: {tp_price:.2f}\n"
f"⚖️ Ποσότητα: {quantity}"
)
send_telegram_message(msg)
logging.info(f"{position_side} position opened at {entry_price:.2f} | SL: {sl_price:.2f}, TP: {tp_price:.2f}, Qty: {quantity}")
def update_risk_state(success):
state = load_risk_state()
equity = state["equity_curve"][-1]
equity *= (1 + TAKE_PROFIT_PCT if success else (1 - STOP_LOSS_PCT))
state["equity_curve"].append(round(equity, 4))
state["consecutive_losses"] = 0 if success else state["consecutive_losses"] + 1
peak = max(state["equity_curve"])
drawdown = (equity - peak) / peak
if state["consecutive_losses"] >= state["max_consecutive_losses"] or drawdown <= -state["max_drawdown_pct"]:
state["paused"] = True
state["pause_timestamp"] = datetime.now(timezone.utc).isoformat()
send_telegram_message("🛑 Trading Paused - Risk Threshold Exceeded.")
logging.warning("Trading paused due to risk limits.")
save_risk_state(state)
log_daily_equity() # ✅ Προστέθηκε εδώ στο Βήμα 10
def handle_auto_resume(state):
if not state.get("paused"):
return state
if not state.get("auto_resume", False):
return state
pause_time_str = state.get("pause_timestamp")
if not pause_time_str:
return state
pause_time = datetime.fromisoformat(pause_time_str)
resume_after = pause_time + timedelta(hours=state.get("resume_hours", 6))
now = datetime.now(timezone.utc)
if now >= resume_after:
state["paused"] = False
state["consecutive_losses"] = 0
state["pause_timestamp"] = None
send_telegram_message("⚡ Auto Resume ενεργοποιήθηκε - Trading συνεχίζεται.")
logging.info("Auto resume triggered.")
return state
# === Trader Logic ===
def run_trader():
global position
risk_state = load_risk_state()
risk_state = handle_auto_resume(risk_state)
save_risk_state(risk_state)
if risk_state.get("paused"):
print("⛔ Trading paused due to risk limits.")
send_telegram_message("⛔ Δεν εκτελείται trade - Σύστημα σε PAUSE λόγω ρίσκου.")
return
update_historical_data(symbol, interval, DATA_PATH)
df = pd.read_csv(DATA_PATH, parse_dates=["timestamp"], index_col="timestamp")
last_row = df.iloc[-1]
last_index = df.index[-1]
print(f"[TIMESTAMP CHECK] Τελευταίο timestamp στο CSV: {last_index} | Close: {last_row['Close']} | Volume: {last_row['Volume']}")
logging.info(f"[TIMESTAMP CHECK] Τελευταίο timestamp στο CSV: {last_index} | Close: {last_row['Close']} | Volume: {last_row['Volume']}")
model = joblib.load(model_path)
latest = create_features_for_latest(df)
latest_index = latest.index[-1] if latest.index is not None else "χωρίς index"
logging.info(f"[DEBUG] Τελευταίο timestamp features: {latest_index}")
logging.info(f"[DEBUG] Features:")
for key, val in latest.iloc[0].items():
logging.info(f" {key}: {round(val, 6)}")
proba = model.predict_proba(latest)[0]
confidence = round(max(proba), 4)
if proba[1] > CONF_UPPER:
pred = 1
elif proba[0] > CONF_UPPER:
pred = 0
else:
pred = -1
price = df["Close"].iloc[-1]
print(f"[{datetime.now(timezone.utc)}] ML Prediction: {pred} | Prob: {confidence} | Price: {price} | Pos: {position}")
logging.info(f"ML Prediction: {pred} | Confidence: {confidence} | Price: {price:.2f} | Position: {position}")
# === Exit Logic ===
# if position == 1 and (pred != 1):
# close_position(price)
# update_risk_state(success=True)
# return
# elif position == -1 and (pred != 0):
# close_position(price)
# update_risk_state(success=True)
# return
if position == 1 and (pred != 1):
close_position(entry_price=price, position_side="LONG", confidence=confidence)
update_risk_state(success=True)
return
elif position == -1 and (pred != 0):
close_position(entry_price=price, position_side="SHORT", confidence=confidence)
update_risk_state(success=True)
return
# === Entry Logic ===
if pred == 1 and position != 1:
print("🟢 LONG")
place_order("LONG", price, confidence)
position = 1
update_risk_state(success=True)
elif pred == 0 and position != -1:
print("🔴 SHORT")
place_order("SHORT", price, confidence)
position = -1
update_risk_state(success=True)
elif pred == -1:
msg = f"⚠️ Καμία εντολή - Χαμηλή εμπιστοσύνη (Confidence: {confidence})"
print(msg)
send_telegram_message(msg)
logging.info("No trade due to low confidence.")
else:
print("⏸️ Καμία αλλαγή")
send_telegram_message(f"ℹ️ Καμία αλλαγή θέσης | Τιμή: {price:.2f} | Confidence: {confidence}")
logging.info("No position change.")
if __name__ == "__main__":
run_trader()
# ml_trading_loop.py
import time
from datetime import datetime, timedelta
from ml_trader import run_trader
from telegram_utils import send_telegram_message
from config import INTERVAL
INTERVAL_MINUTES = INTERVAL
def wait_until_next_interval(interval_minutes):
now = datetime.now()
# Στρογγυλοποίηση προς τα πάνω στην επόμενη χρονική στιγμή των 15 λεπτών
next_run = (now + timedelta(minutes=interval_minutes)).replace(second=0, microsecond=0)
next_run = next_run.replace(minute=(next_run.minute // interval_minutes) * interval_minutes)
if next_run <= now:
next_run += timedelta(minutes=interval_minutes)
sleep_time = (next_run - now).total_seconds()
print(f"[WAIT] Sleeping for {int(sleep_time)} seconds until next interval: {next_run}")
time.sleep(sleep_time)
if __name__ == "__main__":
send_telegram_message("🚀 ML Trading loop ξεκίνησε!")
while True:
try:
print(f"\n[INFO] Εκτέλεση trader: {datetime.now()}")
time.sleep(10)
run_trader()
print(f"[INFO] Trader ολοκλήρωσε. Αναμονή μέχρι το επόμενο interval...\n")
wait_until_next_interval(INTERVAL_MINUTES)
except Exception as e:
msg = f"❌ Σφάλμα κατά την εκτέλεση trader: {str(e)}"
print(msg)
send_telegram_message(msg)
wait_until_next_interval(INTERVAL_MINUTES)
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from feature_engineering import create_features_v2
# 📂 Φόρτωση dataset
csv_path = "datasets/BTCUSDT_15m.csv"
df = pd.read_csv(csv_path, parse_dates=["timestamp"], index_col="timestamp")
# 🧠 Feature engineering
X_all, y_all, _ = create_features_v2(df, threshold=0.003)
# 🧪 Train/test split (σταθερό για σύγκριση με retrain_model)
X_train, X_test, y_train, y_test = train_test_split(X_all, y_all, test_size=0.2, shuffle=False)
# 📦 Φόρτωση ensemble μοντέλου
model = joblib.load("ensemble_model.pkl")
# 🔍 Υπολογισμός πιθανοτήτων για την κλάση 1 (long)
probs = model.predict_proba(X_test)[:, 1]
# 📊 Threshold sweep
results = []
for thresh in np.arange(0.40, 0.61, 0.01):
preds = (probs >= thresh).astype(int)
acc = accuracy_score(y_test, preds)
prec = precision_score(y_test, preds, zero_division=0)
rec = recall_score(y_test, preds, zero_division=0)
f1 = f1_score(y_test, preds, zero_division=0)
results.append({
"threshold": round(thresh, 2),
"accuracy": round(acc, 4),
"precision": round(prec, 4),
"recall": round(rec, 4),
"f1_score": round(f1, 4)
})
# 📋 Προβολή αποτελεσμάτων
results_df = pd.DataFrame(results)
print("\n📈 Threshold Optimization Results:")
print(results_df.to_string(index=False))
# 🌟 Βέλτιστο threshold με βάση F1-score
best_row = results_df.loc[results_df["f1_score"].idxmax()]
print(f"\n✅ Recommended Threshold: {best_row['threshold']} (F1-score: {best_row['f1_score']})")
import joblib
import pandas as pd
import matplotlib
matplotlib.use("TkAgg")
import matplotlib.pyplot as plt
# === 1. Φόρτωση preprocessor και μοντέλου ===
preprocessor = joblib.load("preprocessor.pkl")
model = joblib.load("ensemble_model.pkl")
# === 2. Ανάκτηση ονομάτων χαρακτηριστικών ===
feature_names = preprocessor.transformers_[0][2]
# === 3. Υπολογισμός σημασίας χαρακτηριστικών από κάθε estimator ===
importances = {}
for name, estimator in model.named_estimators_.items():
if hasattr(estimator, "feature_importances_"):
importances[name] = estimator.feature_importances_
df_importance = pd.DataFrame({
name: importances[name] for name in importances
}, index=feature_names)
df_importance["mean_importance"] = df_importance.mean(axis=1)
df_importance = df_importance.sort_values("mean_importance", ascending=False)
# === 4. Εμφάνιση αποτελεσμάτων στην οθόνη ===
print("📊 Top 15 Feature Importances (Mean across models):")
for feature, importance in df_importance["mean_importance"].head(15).items():
print(f"{feature:<20} → {importance:.4f}")
# === 5. Γράφημα των top 15 χαρακτηριστικών ===
top_features = df_importance.head(15)
plt.figure(figsize=(10, 6))
top_features["mean_importance"].plot(kind="barh")
plt.title("Top 15 Feature Importances (Mean across models)")
plt.xlabel("Mean Importance")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()
# retrain_model.py (Τελική έκδοση με feature_engineering_v3)
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import shuffle
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import VotingClassifier
import joblib
from feature_engineering_v3 import feature_engineering_v3
print("🔄 Φόρτωση δεδομένων...")
df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["timestamp"], index_col="timestamp")
print("📊 Συνολικό μέγεθος CSV:", len(df))
print("📅 Range ημερομηνιών:", df.index.min(), "→", df.index.max())
# === Feature Engineering v3 (με built-in target) ===
df = feature_engineering_v3(df)
print("📏 Dataset μέγεθος μετά το dropna:", len(df))
if len(df) < 30:
print("❌ Πολύ λίγα δεδομένα. Ακύρωση εκπαίδευσης.")
exit()
X = df.drop(columns=["target"], errors="ignore")
y = df["target"]
# Αντικαθιστούμε τυχόν NaN/Inf
X = X.replace([np.inf, -np.inf], np.nan)
# Διαχωρισμός set (70-30)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, shuffle=False
)
# === Pipeline Preprocessing ===
numeric_features = X.columns.tolist()
preprocessor = ColumnTransformer(
transformers=[
("num", Pipeline([
("imputer", SimpleImputer(strategy="mean")),
("scaler", StandardScaler())
]), numeric_features)
]
)
X_train = preprocessor.fit_transform(X_train)
X_test = preprocessor.transform(X_test)
# 🔍 CORRELATION Έλεγχος
print("🔍 Correlation του κάθε feature με το target:")
for name, col in zip(numeric_features, X.columns):
corr = pd.Series(X[col]).corr(y)
print(f"{name:25} ↔ corr: {corr:.4f}")
# === SMOTE (μόνο αν έχουμε τουλάχιστον 6 δείγματα ανά κλάση) ===
print("🔍 Έλεγχος ισορροπίας κλάσεων πριν το SMOTE:")
print(y_train.value_counts())
if y_train.value_counts().min() >= 6:
smote = SMOTE(sampling_strategy="auto", random_state=42)
X_train, y_train = smote.fit_resample(X_train, y_train)
print("✅ Εφαρμόστηκε SMOTE.")
else:
print("⚠️ Προειδοποίηση: Δεν εφαρμόστηκε SMOTE (μία από τις κλάσεις έχει λιγότερα από 6 δείγματα)")
# === Τρία μοντέλα με fixed random_state ===
xgb = XGBClassifier(eval_metric="logloss", random_state=42)
rfc = RandomForestClassifier(n_estimators=100, random_state=42)
logit = LogisticRegression(max_iter=1000, random_state=42)
# === Hyperparameter Tuning ===
param_grid = {
"xgb__n_estimators": [50, 100, 150],
"xgb__max_depth": [3, 5, 7],
}
ensemble = VotingClassifier(
estimators=[("xgb", xgb), ("rfc", rfc), ("logit", logit)],
voting="soft"
)
search = RandomizedSearchCV(
estimator=ensemble,
param_distributions=param_grid,
n_iter=8,
cv=3,
verbose=1,
random_state=42,
n_jobs=-1
)
search.fit(X_train, y_train)
print("📊 Classification Report:")
y_pred = search.predict(X_test)
print(classification_report(y_test, y_pred))
print("\n🧩 Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))
# === Αποθήκευση μοντέλου και preprocessing ===
joblib.dump(search.best_estimator_, "ensemble_model.pkl")
joblib.dump(preprocessor, "preprocessor.pkl")
print("✅ Μοντέλο και preprocessor αποθηκεύτηκαν.")
# retrain_model.py (Βελτιωμένος με reproducibility + SMOTE fallback)
#
# import pandas as pd
# import numpy as np
# from sklearn.model_selection import train_test_split
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import StandardScaler
# from sklearn.linear_model import LogisticRegression
# from sklearn.ensemble import RandomForestClassifier
# from xgboost import XGBClassifier
# from sklearn.pipeline import Pipeline
# from sklearn.compose import ColumnTransformer
# from sklearn.metrics import classification_report, confusion_matrix
# from sklearn.utils import shuffle
# from imblearn.over_sampling import SMOTE
# from sklearn.model_selection import RandomizedSearchCV
# from sklearn.ensemble import VotingClassifier
# import joblib
# from feature_engineering import feature_engineering_v3
#
# print("🔄 Φόρτωση δεδομένων...")
# df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["timestamp"], index_col="timestamp")
# df = feature_engineering_v3(df)
#
# # === Δημιουργία target ===
# df["future_return"] = (df["Close"].shift(-5) - df["Close"]) / df["Close"]
# df["target"] = np.where(df["future_return"] > 0.003, 1,
# np.where(df["future_return"] < -0.003, 0, np.nan))
# df.dropna(subset=["target"], inplace=True)
#
# X = df.drop(columns=["target", "future_return"], errors="ignore")
# y = df["target"]
#
# # Αντικαθιστούμε NaN/Inf
# X = X.replace([np.inf, -np.inf], np.nan)
#
# # === Train/Test Split ===
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.3, shuffle=False
# )
#
# # === Pipeline Preprocessing ===
# numeric_features = X.columns.tolist()
#
# preprocessor = ColumnTransformer(
# transformers=[
# ("num", Pipeline([
# ("imputer", SimpleImputer(strategy="mean")),
# ("scaler", StandardScaler())
# ]), numeric_features)
# ]
# )
#
# X_train = preprocessor.fit_transform(X_train)
# X_test = preprocessor.transform(X_test)
#
# # === Έλεγχος κλάσεων ===
# print("🔍 Έλεγχος ισορροπίας κλάσεων πριν το SMOTE:")
# class_counts = pd.Series(y_train).value_counts()
# print(class_counts)
#
# if all(class_counts >= 6):
# smote = SMOTE(sampling_strategy="auto", random_state=42)
# X_train, y_train = smote.fit_resample(X_train, y_train)
# print("✅ Εφαρμόστηκε SMOTE")
# else:
# print("⚠️ Προειδοποίηση: Δεν εφαρμόστηκε SMOTE (μία από τις κλάσεις έχει λιγότερα από 6 δείγματα)")
#
# # === Τρία μοντέλα ===
# xgb = XGBClassifier(eval_metric="logloss", random_state=42)
# rfc = RandomForestClassifier(n_estimators=100, random_state=42)
# logit = LogisticRegression(max_iter=1000, random_state=42)
#
# # === Hyperparameter Tuning ===
# param_grid = {
# "xgb__n_estimators": [50, 100, 150],
# "xgb__max_depth": [3, 5, 7],
# }
#
# ensemble = VotingClassifier(
# estimators=[("xgb", xgb), ("rfc", rfc), ("logit", logit)],
# voting="soft"
# )
#
# search = RandomizedSearchCV(
# estimator=ensemble,
# param_distributions=param_grid,
# n_iter=8,
# cv=3,
# verbose=1,
# random_state=42,
# n_jobs=-1
# )
#
# search.fit(X_train, y_train)
#
# print("📊 Classification Report:")
# y_pred = search.predict(X_test)
# print(classification_report(y_test, y_pred))
# print("\n🧩 Confusion Matrix:")
# print(confusion_matrix(y_test, y_pred))
#
# # === Αποθήκευση ===
# joblib.dump(search.best_estimator_, "ensemble_model.pkl")
# joblib.dump(preprocessor, "preprocessor.pkl")
# print("✅ Μοντέλο και preprocessor αποθηκεύτηκαν.")
# retrain_model.py (Βελτιωμένος με feature pruning)
# import pandas as pd
# import numpy as np
# from sklearn.model_selection import train_test_split
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import StandardScaler
# from sklearn.linear_model import LogisticRegression
# from sklearn.ensemble import RandomForestClassifier
# from xgboost import XGBClassifier
# from sklearn.pipeline import Pipeline
# from sklearn.compose import ColumnTransformer
# from sklearn.metrics import classification_report, confusion_matrix
# from sklearn.utils import shuffle
# from imblearn.over_sampling import SMOTE
# from sklearn.model_selection import RandomizedSearchCV
# from sklearn.ensemble import VotingClassifier
# import joblib
# from feature_engineering import create_features_v3
#
# print("🔄 Φόρτωση δεδομένων...")
# df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["timestamp"], index_col="timestamp")
# df = create_features_v3(df, threshold=0.003)
# df = df.dropna()
#
# X = df.drop(columns=["target", "future_return"], errors="ignore")
# y = df["target"]
#
# # === Feature Pruning ===
# top_features = [
# "vwap", "obv", "Low", "rolling_std_10", "atr",
# "High", "close_lag1", "macd_hist_lag1", "rsi",
# "macd_hist", "rsi_lag1", "Close", "Volume",
# "volume_lag1", "Open"
# ]
# X = X[top_features]
#
# # Αντικαθιστούμε τυχόν NaN/Inf
# X = X.replace([np.inf, -np.inf], np.nan)
#
# # Διαχωρισμός set (70-30)
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.3, shuffle=False
# )
#
# # === Pipeline Preprocessing ===
# numeric_features = X.columns.tolist()
#
# preprocessor = ColumnTransformer(
# transformers=[
# ("num", Pipeline([
# ("imputer", SimpleImputer(strategy="mean")),
# ("scaler", StandardScaler())
# ]), numeric_features)
# ]
# )
#
# X_train = preprocessor.fit_transform(X_train)
# X_test = preprocessor.transform(X_test)
#
# # === SMOTE ===
# smote = SMOTE(sampling_strategy="auto", random_state=42)
# X_train, y_train = smote.fit_resample(X_train, y_train)
#
# # === Τρία μοντέλα με fixed random_state ===
# xgb = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)
# rfc = RandomForestClassifier(n_estimators=100, random_state=42)
# logit = LogisticRegression(max_iter=1000, random_state=42)
#
# # === Hyperparameter Tuning ===
# param_grid = {
# "xgb__n_estimators": [50, 100, 150],
# "xgb__max_depth": [3, 5, 7],
# }
#
# ensemble = VotingClassifier(
# estimators=[("xgb", xgb), ("rfc", rfc), ("logit", logit)],
# voting="soft"
# )
#
# search = RandomizedSearchCV(
# estimator=ensemble,
# param_distributions=param_grid,
# n_iter=8,
# cv=3,
# verbose=1,
# random_state=42,
# n_jobs=-1
# )
#
# search.fit(X_train, y_train)
#
# print("📊 Classification Report:")
# y_pred = search.predict(X_test)
# print(classification_report(y_test, y_pred))
# print("\n🧩 Confusion Matrix:")
# print(confusion_matrix(y_test, y_pred))
#
# # === Αποθήκευση μοντέλου και preprocessing ===
# joblib.dump(search.best_estimator_, "ensemble_model.pkl")
# joblib.dump(preprocessor, "preprocessor.pkl")
# print("✅ Μοντέλο και preprocessor αποθηκεύτηκαν.")
# retrain_model.py (Βελτιωμένος με reproducibility + preprocessing pipeline)
# import pandas as pd
# import numpy as np
# from sklearn.model_selection import train_test_split
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import StandardScaler
# from sklearn.linear_model import LogisticRegression
# from sklearn.ensemble import RandomForestClassifier
# from xgboost import XGBClassifier
# from sklearn.pipeline import Pipeline
# from sklearn.compose import ColumnTransformer
# from sklearn.metrics import classification_report, confusion_matrix
# from sklearn.utils import shuffle
# from imblearn.over_sampling import SMOTE
# from sklearn.model_selection import RandomizedSearchCV
# from sklearn.ensemble import VotingClassifier
# import joblib
# from feature_engineering import create_features_v2
#
# print("🔄 Φόρτωση δεδομένων...")
# df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["timestamp"], index_col="timestamp")
# # df = create_features_v2(df, threshold=0.003).dropna()
# df, _, _ = create_features_v2(df, threshold=0.003)
# df = df.dropna()
#
# X = df.drop(columns=["target", "future_return"], errors="ignore")
# y = df["target"]
#
# # Αντικαθιστούμε τυχόν NaN/Inf
# X = X.replace([np.inf, -np.inf], np.nan)
#
# # Διαχωρισμός set (70-30)
# X_train, X_test, y_train, y_test = train_test_split(
# X, y, test_size=0.3, shuffle=False
# )
#
# # === Pipeline Preprocessing ===
# numeric_features = X.columns.tolist()
#
# preprocessor = ColumnTransformer(
# transformers=[
# ("num", Pipeline([
# ("imputer", SimpleImputer(strategy="mean")),
# ("scaler", StandardScaler())
# ]), numeric_features)
# ]
# )
#
# X_train = preprocessor.fit_transform(X_train)
# X_test = preprocessor.transform(X_test)
#
# # === SMOTE ===
# smote = SMOTE(sampling_strategy="auto", random_state=42)
# X_train, y_train = smote.fit_resample(X_train, y_train)
#
# # === Τρία μοντέλα με fixed random_state ===
# # xgb = XGBClassifier(use_label_encoder=False, eval_metric="logloss", random_state=42)
# xgb = XGBClassifier(eval_metric="logloss", random_state=42)
#
# rfc = RandomForestClassifier(n_estimators=100, random_state=42)
# logit = LogisticRegression(max_iter=1000, random_state=42)
#
# # === Hyperparameter Tuning ===
# param_grid = {
# "xgb__n_estimators": [50, 100, 150],
# "xgb__max_depth": [3, 5, 7],
# }
#
# ensemble = VotingClassifier(
# estimators=[("xgb", xgb), ("rfc", rfc), ("logit", logit)],
# voting="soft"
# )
#
# search = RandomizedSearchCV(
# estimator=ensemble,
# param_distributions=param_grid,
# n_iter=8,
# cv=3,
# verbose=1,
# random_state=42,
# n_jobs=-1
# )
#
# search.fit(X_train, y_train)
#
# print("📊 Classification Report:")
# y_pred = search.predict(X_test)
# print(classification_report(y_test, y_pred))
# print("\n🧩 Confusion Matrix:")
# print(confusion_matrix(y_test, y_pred))
#
# # === Αποθήκευση μοντέλου και preprocessing ===
# joblib.dump(search.best_estimator_, "ensemble_model.pkl")
# joblib.dump(preprocessor, "preprocessor.pkl")
# print("✅ Μοντέλο και preprocessor αποθηκεύτηκαν.")
# import pandas as pd
# import numpy as np
# import joblib
# from sklearn.model_selection import train_test_split, RandomizedSearchCV
# from sklearn.ensemble import RandomForestClassifier
# from sklearn.linear_model import LogisticRegression
# from xgboost import XGBClassifier
# from sklearn.metrics import classification_report, confusion_matrix
# from sklearn.impute import SimpleImputer
# from imblearn.over_sampling import SMOTE
# from collections import Counter
# import warnings
#
# from feature_engineering import create_features_v2
#
# warnings.filterwarnings("ignore", category=UserWarning)
#
# # Load data
# print("🔄 Φόρτωση δεδομένων...")
# df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["timestamp"], index_col="timestamp")
# X, y, df = create_features_v2(df, threshold=0.003)
# df["target"] = y
# df = df.dropna(subset=["target"])
#
# # Split features and target
# X = df.drop(columns=["target", "future_return"])
# y = df["target"]
#
# # Train/test split
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
#
# # Impute missing values
# imputer = SimpleImputer()
# X_train = pd.DataFrame(imputer.fit_transform(X_train), columns=X.columns)
# X_test = pd.DataFrame(imputer.transform(X_test), columns=X.columns)
#
# # SMOTE class balancing
# smote = SMOTE(sampling_strategy='auto', random_state=42)
# X_train, y_train = smote.fit_resample(X_train, y_train)
# print(f"[INFO] Εφαρμόστηκε SMOTE - Νέα κατανομή: {Counter(y_train)}")
# print(f"⚖️ Class balance: {dict(Counter(y_train))} | scale_pos_weight = {round(len(y_train[y_train==0])/len(y_train[y_train==1]), 2)}")
#
# # Inspect
# def inspect_data(X, y):
# print("\n🔎 Data Inspection:")
# print("Feature means:\n", X.mean().mean())
# print("\nTarget distribution:\n", y.value_counts().to_frame('count'))
#
# inspect_data(X_train, y_train)
#
# # Model training
# print("🔍 Tuning XGBoost...")
# xgb = XGBClassifier(n_estimators=100, max_depth=3, eval_metric="logloss", use_label_encoder=False)
#
# print("🔍 Tuning Random Forest...")
# rf_base = RandomForestClassifier()
# param_dist_rf = {
# "n_estimators": [50, 100, 150],
# "max_depth": [3, 5, 7]
# }
# rf_search = RandomizedSearchCV(rf_base, param_distributions=param_dist_rf, n_iter=4, cv=3, scoring="f1", random_state=42)
# rf_search.fit(X_train, y_train)
# rf = rf_search.best_estimator_
#
# print("🔍 Tuning Logistic Regression...")
# lr = LogisticRegression(max_iter=500)
# lr.fit(X_train, y_train)
#
# # Ensemble με soft voting
# from sklearn.ensemble import VotingClassifier
# ensemble = VotingClassifier(estimators=[
# ("xgb", xgb),
# ("rf", rf),
# ("lr", lr)
# ], voting="soft")
#
# print("🚀 Εκκίνηση εκπαίδευσης ensemble...")
# ensemble.fit(X_train, y_train)
#
# # Evaluation
# y_pred = ensemble.predict(X_test)
# print("\n📊 Classification Report:")
# print(classification_report(y_test, y_pred))
#
# print("\n🧩 Confusion Matrix:")
# print(confusion_matrix(y_test, y_pred))
#
# # Save model
# joblib.dump(ensemble, "ensemble_model.pkl")
# print("\n✅ Το ensemble μοντέλο αποθηκεύτηκε στο: ensemble_model.pkl")
{
"paused": false,
"auto_resume": true,
"pause_timestamp": null,
"resume_hours": 6,
"consecutive_losses": 0,
"max_consecutive_losses": 3,
"equity_curve": [
1.0,
1.04,
1.0816,
1.1249,
1.1699,
1.2167,
1.2654,
1.316
],
"max_drawdown_pct": 0.1
}
# run_scheduler.py
import schedule
import time
import subprocess
from datetime import datetime
def job():
print(f"[{datetime.now()}] 🔁 Εκκίνηση retrain_model.py...")
subprocess.run(["python", "retrain_model.py"])
print(f"[{datetime.now()}] ✅ Ολοκληρώθηκε retrain_model.py\n")
# Εκτέλεση κάθε Δευτέρα στις 07:00
schedule.every().monday.at("07:00").do(job)
print("📅 Scheduler ξεκίνησε. Περιμένει επόμενη εκτέλεση...\n")
while True:
schedule.run_pending()
time.sleep(60)
# scheduled_retrain.py
import schedule
import time
import subprocess
from datetime import datetime
# === Συνάρτηση retrain ===
def retrain():
print(f"[{datetime.now()}] 🔁 Ξεκινά retrain του μοντέλου...")
result = subprocess.run(["python", "retrain_model.py"], capture_output=True, text=True)
print(f"[{datetime.now()}] ✅ Retrain Ολοκληρώθηκε")
print("📄 Έξοδος retrain_model.py:\n")
print(result.stdout)
if result.stderr:
print("⚠️ Σφάλμα:\n")
print(result.stderr)
# === Ορισμός προγράμματος ===
# ✅ Αν θες 1 φορά τη μέρα στις 06:00 UTC
schedule.every().day.at("06:00").do(retrain)
# ✅ Εναλλακτικά: κάθε 3 ώρες
# schedule.every(3).hours.do(retrain)
# === Εκκίνηση scheduler ===
print("🕒 Ο scheduler ξεκίνησε... Πατήστε Ctrl+C για έξοδο.")
while True:
schedule.run_pending()
time.sleep(60)
# start_all.py
import subprocess
import os
import sys
from datetime import datetime
from telegram_utils import send_telegram_message
from data_utils import initialize_historical_data
print("🚀 Εκκίνηση όλων των modules...")
send_telegram_message("🚀 ML System: Εκκίνηση trader & scheduler!")
# === Προετοιμασία δεδομένων και μοντέλου ===
csv_path = "datasets/BTCUSDT_15m.csv"
model_path = "ensemble_model.pkl"
if not os.path.exists(csv_path):
initialize_historical_data(symbol="BTCUSDT", interval="15m", days=120, path=csv_path)
if not os.path.exists(model_path):
print("🧠 Δεν βρέθηκε μοντέλο. Ξεκινάμε training...")
subprocess.run(["python", "retrain_model.py"], check=True)
print("✅ Το μοντέλο αποθηκεύτηκε.")
# === Python Interpreter ===
PYTHON = sys.executable
TRADER_SCRIPT = os.path.join(os.getcwd(), "ml_trading_loop.py")
SCHEDULER_SCRIPT = os.path.join(os.getcwd(), "run_scheduler.py")
# === Start Trader Process ===
trader_proc = subprocess.Popen([PYTHON, TRADER_SCRIPT])
print(f"[2025-04-16 12:17:49.456244] ✅ Ξεκίνησε το ml_trading_loop.py")
# === Start Scheduler Process ===
scheduler_proc = subprocess.Popen([PYTHON, SCHEDULER_SCRIPT])
print(f"[2025-04-16 12:17:49.456260] ✅ Ξεκίνησε το run_scheduler.py")
# === Keep Main Process Alive ===
try:
trader_proc.wait()
scheduler_proc.wait()
except KeyboardInterrupt:
print("\n🛑 Διακοπή από χρήστη. Τερματισμός subprocesses...")
trader_proc.terminate()
scheduler_proc.terminate()
send_telegram_message("🛑 ML System: Διακόπηκε από τον χρήστη.")
# telegram_utils.py
import os
import requests
def send_telegram_message(message, bot_token=None, chat_id=None):
"""
Στέλνει μήνυμα μέσω Telegram bot.
Μπορεί να διαβάσει bot_token και chat_id από μεταβλητές περιβάλλοντος.
:param message: Το μήνυμα που θα σταλεί
:param bot_token: Το token του Telegram bot (προαιρετικά)
:param chat_id: Το chat_id του παραλήπτη (προαιρετικά)
"""
bot_token = bot_token or os.getenv("TELEGRAM_TOKEN")
chat_id = chat_id or os.getenv("TELEGRAM_CHAT_ID")
if not bot_token or not chat_id:
print("⚠️ Δεν έχουν οριστεί bot_token ή chat_id.")
return False
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
try:
response = requests.post(url, data=data)
if response.status_code == 200:
print("✅ Μήνυμα εστάλη επιτυχώς μέσω Telegram.")
return True
else:
print(f"❌ Σφάλμα Telegram: {response.text}")
return False
except Exception as e:
print(f"❌ Εξαίρεση κατά την αποστολή Telegram: {e}")
return False
# trading_report.py
import pandas as pd
import os
from datetime import datetime
LOG_PATH = "logs/trades.csv"
def generate_report():
if not os.path.exists(LOG_PATH):
print("❌ Δεν βρέθηκε αρχείο trades.")
return
df = pd.read_csv(LOG_PATH, parse_dates=["Date"])
if df.empty:
print("⚠️ Το αρχείο trades είναι κενό.")
return
today = datetime.utcnow().date()
df_today = df[df["Date"].dt.date == today]
if df_today.empty:
print(f"📅 Δεν υπάρχουν trades για την ημέρα: {today}")
return
# Υπολογισμός PnL σε USD
df_today["PnL_$"] = df_today["Result"] * df_today["Entry"]
wins = df_today[df_today["Result"] > 0]
losses = df_today[df_today["Result"] < 0]
win_rate = len(wins) / len(df_today) if len(df_today) > 0 else 0
expectancy = (
wins["Result"].mean() / abs(losses["Result"].mean())
if len(losses) > 0 and abs(losses["Result"].mean()) > 0
else None
)
sharpe = (
df_today["Result"].mean() / df_today["Result"].std()
if df_today["Result"].std() != 0
else None
)
total_return = df_today["Result"].sum()
total_pnl = df_today["PnL_$"].sum()
print(f"=== Trading Summary: {today} ===")
print(f"Total Trades : {len(df_today)}")
print(f"Wins : {len(wins)}")
print(f"Losses : {len(losses)}")
print(f"Win Rate : {win_rate:.2%}")
print(f"Total Return : {total_return:.2%}")
print(f"Total PnL ($): {total_pnl:.2f}")
print(f"Expectancy : {expectancy:.2f}" if expectancy is not None else "Expectancy : N/A")
print(f"Sharpe Ratio : {sharpe:.2f}" if sharpe is not None else "Sharpe Ratio : N/A")
# Save to file
report_path = f"logs/report_{today}.txt"
with open(report_path, "w") as f:
f.write(f"=== Trading Summary: {today} ===\n")
f.write(f"Total Trades : {len(df_today)}\n")
f.write(f"Wins : {len(wins)}\n")
f.write(f"Losses : {len(losses)}\n")
f.write(f"Win Rate : {win_rate:.2%}\n")
f.write(f"Total Return : {total_return:.2%}\n")
f.write(f"Total PnL ($): {total_pnl:.2f}\n")
f.write(f"Expectancy : {expectancy:.2f}\n" if expectancy is not None else "Expectancy : N/A\n")
f.write(f"Sharpe Ratio : {sharpe:.2f}\n" if sharpe is not None else "Sharpe Ratio : N/A\n")
print(f"📄 Report αποθηκεύτηκε: {report_path}")
if __name__ == "__main__":
generate_report()
# tune_models.py
import pandas as pd
import numpy as np
from collections import Counter
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import classification_report
from feature_engineering import create_features_v2
import warnings
warnings.filterwarnings("ignore")
# === 1. Load Data
df = pd.read_csv("datasets/BTCUSDT_15m.csv", parse_dates=["Date"], index_col="Date")
X, y, _ = create_features_v2(df)
# === 2. Split Data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=False)
# === 3. Class balance για XGBoost
counter = Counter(y_train)
scale_pos_weight = counter[0] / counter[1]
# === 4. Parameter Grids
param_dist_xgb = {
"n_estimators": [50, 100, 150],
"max_depth": [3, 4, 5],
"learning_rate": [0.01, 0.05, 0.1],
"subsample": [0.6, 0.8, 1.0],
"colsample_bytree": [0.6, 0.8, 1.0],
"scale_pos_weight": [scale_pos_weight]
}
param_dist_rf = {
"n_estimators": [50, 100, 150],
"max_depth": [3, 4, 5, None],
"min_samples_split": [2, 4, 6],
"class_weight": ["balanced"]
}
param_dist_lr = {
"C": [0.01, 0.1, 1, 10],
"penalty": ["l2"],
"solver": ["lbfgs"],
"class_weight": ["balanced"]
}
# === 5. Models
xgb_model = XGBClassifier(random_state=42, verbosity=0, use_label_encoder=False)
rf_model = RandomForestClassifier(random_state=42)
lr_model = LogisticRegression(max_iter=1000)
# === 6. Randomized Search
print("🔍 Tuning XGBoost...")
xgb_search = RandomizedSearchCV(xgb_model, param_distributions=param_dist_xgb, n_iter=10, cv=3, scoring="f1", verbose=1, n_jobs=-1)
xgb_search.fit(X_train, y_train)
print("\n🔍 Tuning Random Forest...")
rf_search = RandomizedSearchCV(rf_model, param_distributions=param_dist_rf, n_iter=10, cv=3, scoring="f1", verbose=1, n_jobs=-1)
rf_search.fit(X_train, y_train)
print("\n🔍 Tuning Logistic Regression...")
lr_search = RandomizedSearchCV(lr_model, param_distributions=param_dist_lr, n_iter=10, cv=3, scoring="f1", verbose=1, n_jobs=-1)
lr_search.fit(X_train, y_train)
# === 7. Results
print("\n✅ Best Params - XGBoost:")
print(xgb_search.best_params_)
print("\n✅ Best Params - Random Forest:")
print(rf_search.best_params_)
print("\n✅ Best Params - Logistic Regression:")
print(lr_search.best_params_)
# === 8. Evaluation
print("\n📊 Evaluation on Test Set (XGBoost):")
y_pred_xgb = xgb_search.predict(X_test)
print(classification_report(y_test, y_pred_xgb))
print("\n📊 Evaluation on Test Set (Random Forest):")
y_pred_rf = rf_search.predict(X_test)
print(classification_report(y_test, y_pred_rf))
print("\n📊 Evaluation on Test Set (Logistic Regression):")
y_pred_lr = lr_search.predict(X_test)
print(classification_report(y_test, y_pred_lr))
# walk_forward_backtest.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score
from feature_engineering import create_features_v2
# === Settings ===
DATA_PATH = "datasets/BTCUSDT_15m.csv"
TP_PCT = 0.02
SL_PCT = 0.01
HOLD_PERIOD = 5
train_days = 30
test_days = 7
step_days = 7
# === Load data & features
df = pd.read_csv(DATA_PATH, parse_dates=["Date"], index_col="Date")
X, y, full_df = create_features_v2(df)
full_df["target"] = y
full_df = full_df.dropna()
# === Setup for walk-forward
results = []
start_idx = 0
window_size = (train_days + test_days) * 96 # 96 bars/day in 15m
while start_idx + window_size < len(full_df):
train_end = start_idx + train_days * 96
test_end = train_end + test_days * 96
train_df = full_df.iloc[start_idx:train_end]
test_df = full_df.iloc[train_end:test_end]
X_train = train_df.drop(columns=["target"])
y_train = train_df["target"]
X_test = test_df.drop(columns=["target"])
y_test = test_df["target"]
model = XGBClassifier(use_label_encoder=False, eval_metric="logloss", verbosity=0)
model.fit(X_train, y_train)
preds = model.predict(X_test)
# Simulate trades
test_df = test_df.copy()
test_df["prediction"] = preds
test_df["return_pct"] = 0.0
for i in range(len(test_df) - HOLD_PERIOD):
signal = test_df["prediction"].iloc[i]
entry_price = test_df["Close"].iloc[i]
future_prices = test_df["Close"].iloc[i+1:i+HOLD_PERIOD+1].values
if signal == 1:
max_ret = max((p - entry_price) / entry_price for p in future_prices)
min_ret = min((p - entry_price) / entry_price for p in future_prices)
if max_ret >= TP_PCT:
test_df.iloc[i, test_df.columns.get_loc("return_pct")] = TP_PCT
elif min_ret <= -SL_PCT:
test_df.iloc[i, test_df.columns.get_loc("return_pct")] = -SL_PCT
elif signal == 0:
max_ret = max((entry_price - p) / entry_price for p in future_prices)
min_ret = min((entry_price - p) / entry_price for p in future_prices)
if max_ret >= TP_PCT:
test_df.iloc[i, test_df.columns.get_loc("return_pct")] = TP_PCT
elif min_ret <= -SL_PCT:
test_df.iloc[i, test_df.columns.get_loc("return_pct")] = -SL_PCT
trades = test_df[test_df["return_pct"] != 0.0].copy()
if len(trades) > 0:
trades["equity_curve"] = (1 + trades["return_pct"]).cumprod()
equity_return = trades["equity_curve"].iloc[-1] - 1
else:
equity_return = 0.0
results.append({
"start_date": test_df.index[0],
"end_date": test_df.index[-1],
"accuracy": accuracy_score(y_test, preds),
"total_return": equity_return,
"n_trades": len(trades)
})
start_idx += step_days * 96
# === Convert to DataFrame
results_df = pd.DataFrame(results)
# === Show results
print("=== WALK-FORWARD VALIDATION ===")
print(results_df)
# === Plot returns
plt.figure(figsize=(10, 5))
plt.plot(results_df["end_date"], results_df["total_return"].cumsum(), marker="o", label="Cumulative Return")
plt.title("📈 Walk-Forward: Cumulative Return per Window")
plt.xlabel("End Date")
plt.ylabel("Cumulative Return")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from feature_engineering import create_features_v2
# Config
CSV_PATH = "datasets/BTCUSDT_15m.csv"
TRAIN_DAYS = 60
TEST_DAYS = 7
STEP_DAYS = 7
THRESHOLD = 0.003
# Load data
df = pd.read_csv(CSV_PATH, parse_dates=["timestamp"], index_col="timestamp")
X_full, y_full, df_with_features = create_features_v2(df, threshold=THRESHOLD)
# Merge X and y for slicing
df_all = X_full.copy()
df_all["target"] = y_full
# Store results
results = []
start_date = df_all.index.min()
end_date = df_all.index.max()
while True:
train_start = start_date
train_end = train_start + pd.Timedelta(days=TRAIN_DAYS)
test_start = train_end
test_end = test_start + pd.Timedelta(days=TEST_DAYS)
if test_end > end_date:
break
df_train = df_all.loc[train_start:train_end].dropna()
df_test = df_all.loc[test_start:test_end].dropna()
if len(df_train) == 0 or len(df_test) == 0:
start_date += pd.Timedelta(days=STEP_DAYS)
continue
X_train = df_train.drop("target", axis=1)
y_train = df_train["target"]
X_test = df_test.drop("target", axis=1)
y_test = df_test["target"]
model = XGBClassifier(n_estimators=100, max_depth=3, use_label_encoder=False, eval_metric="logloss")
model.fit(X_train, y_train)
preds = model.predict(X_test)
acc = accuracy_score(y_test, preds)
prec = precision_score(y_test, preds, zero_division=0)
rec = recall_score(y_test, preds, zero_division=0)
f1 = f1_score(y_test, preds, zero_division=0)
cm = confusion_matrix(y_test, preds)
results.append({
"train_start": train_start.date(),
"train_end": train_end.date(),
"test_start": test_start.date(),
"test_end": test_end.date(),
"accuracy": round(acc, 4),
"precision": round(prec, 4),
"recall": round(rec, 4),
"f1_score": round(f1, 4),
"tp": cm[1, 1], "fp": cm[0, 1],
"tn": cm[0, 0], "fn": cm[1, 0]
})
print(f"✔️ {test_start.date()} - {test_end.date()} | F1: {f1:.4f}")
start_date += pd.Timedelta(days=STEP_DAYS)
# Export results
results_df = pd.DataFrame(results)
results_df.to_csv("walk_forward_results.csv", index=False)
print("\n✅ Walk-forward validation complete. Results saved to walk_forward_results.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment