Created
May 10, 2025 23:24
-
-
Save jongan69/506c509bbaadf01ba0b1ccc276173352 to your computer and use it in GitHub Desktop.
Script for running portfolio metrics on alpaca portfolio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
from datetime import datetime, timedelta | |
import requests | |
from dotenv import load_dotenv | |
import yfinance as yf | |
from collections import Counter, defaultdict | |
import re | |
from alpaca.trading.client import TradingClient | |
from alpaca.trading.models import Position | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler(sys.stdout)] | |
) | |
logger = logging.getLogger(__name__) | |
load_dotenv() | |
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY") | |
ALPACA_API_SECRET = os.getenv("ALPACA_API_SECRET") | |
IS_DEVELOPMENT = False | |
if not ALPACA_API_KEY or not ALPACA_API_SECRET: | |
logger.error("Missing Alpaca API credentials.") | |
sys.exit(1) | |
def fetch_account_activities(activity_types=None, max_pages=1000, start_date=None, end_date=None): | |
""" | |
Fetch account activities from Alpaca, optionally for a date range. | |
If both start_date and end_date are provided and are the same, use 'date'. | |
If a range, use 'after' and 'until'. | |
""" | |
url = 'https://api.alpaca.markets/v2/account/activities' | |
headers = { | |
'accept': 'application/json', | |
'APCA-API-KEY-ID': ALPACA_API_KEY, | |
'APCA-API-SECRET-KEY': ALPACA_API_SECRET | |
} | |
params = { | |
'direction': 'desc', | |
'page_size': 100 | |
} | |
if activity_types: | |
params['activity_types'] = ','.join(activity_types) | |
if start_date and end_date and start_date == end_date: | |
params['date'] = str(start_date) | |
else: | |
if start_date: | |
params['after'] = str(start_date) | |
if end_date: | |
params['until'] = str(end_date) | |
activities = [] | |
next_page_token = None | |
for _ in range(max_pages): | |
if next_page_token: | |
params['page_token'] = next_page_token | |
else: | |
params.pop('page_token', None) | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code != 200: | |
logger.error(f"Error fetching activities: {response.status_code}, {response.text}") | |
break | |
page = response.json() | |
if not page: | |
break | |
activities.extend(page) | |
next_page_token = response.headers.get('Next-Page-Token') | |
if not next_page_token: | |
break | |
return activities | |
def fetch_all_account_activities_by_year(activity_types=None, first_year=2018): | |
""" | |
Fetch all account activities by looping over years to avoid missing old activities (esp. dividends). | |
""" | |
today = datetime.utcnow().date() | |
all_activities = [] | |
for year in range(first_year, today.year + 1): | |
start = datetime(year, 1, 1).date() | |
end = datetime(year, 12, 31).date() | |
if end > today: | |
end = today | |
logger.info(f"Fetching activities for {year} ({start} to {end})...") | |
acts = fetch_account_activities(activity_types=activity_types, start_date=start, end_date=end) | |
logger.info(f" Retrieved {len(acts)} activities for {year}.") | |
all_activities.extend(acts) | |
return all_activities | |
def get_market_performance(start_date, end_date, symbol="SPY"): | |
""" | |
Fetch market performance (e.g., SPY) between start_date and end_date using yfinance. | |
Returns the percent change over the period. | |
""" | |
ticker = yf.Ticker(symbol) | |
hist = ticker.history(start=start_date, end=end_date) | |
if hist.empty: | |
logger.warning(f"No market data for {symbol} between {start_date} and {end_date}") | |
return None | |
start_price = hist['Close'].iloc[0] | |
end_price = hist['Close'].iloc[-1] | |
pct_change = (end_price - start_price) / start_price * 100 | |
return pct_change, start_price, end_price | |
def analyze_trading_history(activities): | |
""" | |
Analyze trading history: count trades, win/loss, average P/L, etc. | |
Add debug logs to inspect available data fields. | |
Also returns the earliest and latest trade dates for market comparison. | |
""" | |
trades = [a for a in activities if a.get('activity_type') == 'FILL'] | |
if not trades: | |
logger.info("No trades found.") | |
return None, None | |
logger.info(f"Sample trade keys: {list(trades[0].keys())}") | |
logger.debug(f"Sample trade: {trades[0]}") | |
# Log all unique keys across all trades | |
all_keys = set() | |
for t in trades: | |
all_keys.update(t.keys()) | |
logger.info(f"All unique keys in FILL activities: {sorted(all_keys)}") | |
# Log if profit_loss or other useful fields are present in any trade | |
profit_loss_present = any('profit_loss' in t for t in trades) | |
logger.info(f"Any trade has 'profit_loss' field: {profit_loss_present}") | |
logger.info(f"First 3 trades with 'profit_loss': {[t for t in trades if 'profit_loss' in t][:3]}") | |
# Log a few sample trades for manual inspection | |
for i, t in enumerate(trades[:3]): | |
logger.debug(f"Trade {i+1}: {t}") | |
# Only return date range for market comparison | |
trade_dates = [] | |
for t in trades: | |
tx_time = t.get('transaction_time') | |
if tx_time: | |
try: | |
trade_dates.append(datetime.fromisoformat(tx_time.replace('Z', '+00:00'))) | |
except Exception: | |
pass | |
if trade_dates: | |
start_date = min(trade_dates).date() | |
end_date = max(trade_dates).date() | |
return start_date, end_date | |
else: | |
return None, None | |
def summarize_account_activities(activities): | |
""" | |
Summarize all account activities and print portfolio metrics. | |
""" | |
activity_types = [a.get('activity_type') for a in activities] | |
type_counts = Counter(activity_types) | |
print("\nActivity type counts:") | |
for t, c in type_counts.items(): | |
print(f" {t}: {c}") | |
# Helper to detect options symbols (very basic) | |
def is_option(symbol): | |
return bool(re.search(r'[0-9]{6,}', symbol or '')) | |
# Shares bought/sold and average price per symbol | |
shares_bought = defaultdict(float) | |
shares_sold = defaultdict(float) | |
buy_amount = defaultdict(float) | |
sell_amount = defaultdict(float) | |
for a in activities: | |
if a.get('activity_type') == 'FILL': | |
symbol = a.get('symbol') | |
qty = float(a.get('qty', 0)) | |
price = float(a.get('price', 0)) | |
side = a.get('side') | |
if side == 'buy': | |
shares_bought[symbol] += qty | |
buy_amount[symbol] += qty * price | |
elif side == 'sell': | |
shares_sold[symbol] += qty | |
sell_amount[symbol] += qty * price | |
def print_symbol_table(title, symbol_dict, avg_price_dict=None, net_position_dict=None, filter_options=None): | |
print(f"\n{title}") | |
print(f"{'Symbol':<20}{'Qty':>10}{'Avg Price':>15}{'Net Pos':>12}") | |
print("-"*60) | |
for sym, qty in symbol_dict.items(): | |
if filter_options is not None and is_option(sym) != filter_options: | |
continue | |
avg_price = avg_price_dict[sym]/qty if avg_price_dict and qty else 0.0 | |
net_pos = net_position_dict[sym] if net_position_dict else '' | |
print(f"{sym:<20}{qty:>10.2f}{avg_price:>15.2f}{net_pos:>12}") | |
# Net position per symbol | |
net_position = {sym: shares_bought[sym] - shares_sold[sym] for sym in set(list(shares_bought.keys()) + list(shares_sold.keys()))} | |
# Print stocks and options separately | |
print_symbol_table("Stocks bought per symbol:", shares_bought, buy_amount, net_position, filter_options=False) | |
print_symbol_table("Options bought per symbol:", shares_bought, buy_amount, net_position, filter_options=True) | |
print_symbol_table("Stocks sold per symbol:", shares_sold, sell_amount, net_position, filter_options=False) | |
print_symbol_table("Options sold per symbol:", shares_sold, sell_amount, net_position, filter_options=True) | |
# Dividends received per symbol | |
dividends = defaultdict(float) | |
for a in activities: | |
if a.get('activity_type') == 'DIV': | |
symbol = a.get('symbol') | |
amount = float(a.get('net_amount', 0)) | |
dividends[symbol] += amount | |
total_dividends = sum(dividends.values()) | |
print("\nDividends received per symbol:") | |
for sym, amt in dividends.items(): | |
print(f" {sym}: ${amt:.2f}") | |
print(f"Total dividends: ${total_dividends:.2f}") | |
# Net cash flow (deposits/withdrawals) | |
deposits = 0.0 | |
withdrawals = 0.0 | |
for a in activities: | |
if a.get('activity_type') == 'TRANSFER': | |
amount = float(a.get('net_amount', 0)) | |
if amount > 0: | |
deposits += amount | |
elif amount < 0: | |
withdrawals += amount | |
print(f"\nDeposits: ${deposits:.2f}") | |
print(f"Withdrawals: ${withdrawals:.2f}") | |
print(f"Net cash flow from TRANSFER activities: ${deposits + withdrawals:.2f}") | |
def print_current_portfolio_metrics(): | |
""" | |
Print current portfolio metrics using Alpaca SDK. | |
""" | |
trading_client = TradingClient(ALPACA_API_KEY, ALPACA_API_SECRET, paper=IS_DEVELOPMENT) | |
account = trading_client.get_account() | |
positions = trading_client.get_all_positions() | |
print("\n--- Current Portfolio Metrics ---") | |
print(f"Account value: ${float(account.portfolio_value):,.2f}") | |
print(f"Cash: ${float(account.cash):,.2f}") | |
print(f"Buying power: ${float(account.buying_power):,.2f}") | |
print(f"Long market value: ${float(account.long_market_value):,.2f}") | |
print(f"Short market value: ${float(account.short_market_value):,.2f}") | |
total_unrealized_pl = sum(float(p.unrealized_pl) for p in positions) | |
total_cost_basis = sum(float(p.cost_basis) for p in positions) | |
unrealized_pl_pc = (total_unrealized_pl / total_cost_basis * 100) if total_cost_basis else 0.0 | |
print(f"Total Unrealized P/L: ${total_unrealized_pl:,.2f}") | |
print(f"Total Unrealized P/L %: {unrealized_pl_pc:.2f}%") | |
print(f"Number of open positions: {len(positions)}") | |
if not positions: | |
print("No open positions.") | |
return | |
print("\nOpen Positions:") | |
total_market_value = sum(float(p.market_value) for p in positions) | |
for pos in positions: | |
alloc = (float(pos.market_value) / total_market_value * 100) if total_market_value else 0.0 | |
print(f" {pos.symbol}: {pos.qty} @ ${pos.avg_entry_price} (Current: ${pos.current_price}) | Unrealized P/L: ${pos.unrealized_pl} ({float(pos.unrealized_plpc)*100:.2f}%) | Alloc: {alloc:.2f}%") | |
def fetch_portfolio_history(timeframe='1D', period='1A'): | |
""" | |
Fetch portfolio value history from Alpaca REST API. | |
Returns a pandas DataFrame with 'timestamp' and 'equity'. | |
""" | |
url = 'https://api.alpaca.markets/v2/account/portfolio/history' | |
headers = { | |
'accept': 'application/json', | |
'APCA-API-KEY-ID': ALPACA_API_KEY, | |
'APCA-API-SECRET-KEY': ALPACA_API_SECRET | |
} | |
params = { | |
'timeframe': timeframe, # '1D' for daily | |
'period': period, # e.g., '1y', '6M', '3M', 'max' | |
'extended_hours': 'false' | |
} | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code != 200: | |
logger.error(f"Error fetching portfolio history: {response.status_code}, {response.text}") | |
return None | |
data = response.json() | |
if not data or 'equity' not in data or not data['equity']: | |
logger.error("No equity data in portfolio history response.") | |
return None | |
df = pd.DataFrame({ | |
'timestamp': pd.to_datetime(data['timestamp'], unit='s'), | |
'equity': data['equity'] | |
}) | |
df = df.dropna() | |
return df | |
def calculate_sharpe_ratio(equity_series, risk_free_rate=0.03): | |
""" | |
Calculate the annualized Sharpe ratio from a series of daily portfolio values. | |
""" | |
returns = equity_series.pct_change().dropna() | |
excess_returns = returns - (risk_free_rate / 252) # 252 trading days | |
sharpe = np.sqrt(252) * excess_returns.mean() / excess_returns.std() if excess_returns.std() else 0.0 | |
return sharpe | |
def plot_portfolio_vs_benchmarks(portfolio_df, start, end): | |
""" | |
Plot normalized portfolio and multiple benchmarks performance over time. | |
""" | |
plt.figure(figsize=(12, 6)) | |
# Normalize portfolio to 1.0 at the start | |
portfolio_norm = portfolio_df['equity'] / portfolio_df['equity'].iloc[0] | |
plt.plot(portfolio_df['timestamp'], portfolio_norm, label='Portfolio', linewidth=2) | |
# Benchmarks to plot | |
benchmarks = ['SPY', 'QQQ', 'DIA', 'IWM', 'VTI', 'AGG', 'GLD'] | |
for symbol in benchmarks: | |
ticker = yf.Ticker(symbol) | |
hist = ticker.history(start=start, end=end) | |
if not hist.empty: | |
norm = hist['Close'] / hist['Close'].iloc[0] | |
plt.plot(hist.index, norm, label=symbol) | |
else: | |
print(f"Warning: No data for benchmark {symbol} in the given period.") | |
plt.title('Portfolio vs. Benchmarks Performance') | |
plt.xlabel('Date') | |
plt.ylabel('Normalized Value (Start=1.0)') | |
plt.legend() | |
plt.grid(True) | |
plt.tight_layout() | |
plt.show() | |
def print_portfolio_metrics(portfolio_df, risk_free_rate=0.03): | |
""" | |
Print key portfolio metrics: Sharpe Ratio, Annualized Return, Annualized Volatility, Max Drawdown. | |
""" | |
equity = portfolio_df['equity'] | |
returns = equity.pct_change().dropna() | |
n_days = returns.shape[0] | |
if n_days == 0: | |
print("Not enough data for metrics.") | |
return | |
# Annualized Return (CAGR) | |
start_value = equity.iloc[0] | |
end_value = equity.iloc[-1] | |
cagr = (end_value / start_value) ** (252 / n_days) - 1 | |
# Total Return Percentage | |
total_return_pct = (end_value - start_value) / start_value * 100 | |
# Annualized Volatility | |
ann_vol = returns.std() * np.sqrt(252) | |
# Max Drawdown | |
running_max = equity.cummax() | |
drawdown = (equity - running_max) / running_max | |
max_drawdown = drawdown.min() | |
# Sharpe Ratio | |
sharpe = calculate_sharpe_ratio(equity, risk_free_rate) | |
print("\n--- Portfolio Metrics ---") | |
print(f"Sharpe Ratio: {sharpe:.2f}") | |
print(f"Annualized Return: {cagr*100:.2f}%") | |
print(f"Total Return: {total_return_pct:.2f}%") | |
print(f"Annualized Volatility: {ann_vol*100:.2f}%") | |
print(f"Max Drawdown: {max_drawdown*100:.2f}%") | |
def main(): | |
logger.info("Fetching trading history...") | |
# Use the new batch fetcher to get all activities by year | |
activities = fetch_all_account_activities_by_year() | |
logger.info(f"Fetched {len(activities)} activities (all years).") | |
summarize_account_activities(activities) | |
print_current_portfolio_metrics() | |
# Fetch and analyze portfolio history | |
portfolio_df = fetch_portfolio_history(timeframe='1D', period='1A') | |
if portfolio_df is not None: | |
print_portfolio_metrics(portfolio_df) | |
# Fetch benchmarks for the same period | |
start = portfolio_df['timestamp'].iloc[0].date() | |
end = portfolio_df['timestamp'].iloc[-1].date() + pd.Timedelta(days=1) | |
plot_portfolio_vs_benchmarks(portfolio_df, start, end) | |
else: | |
print("Could not fetch portfolio history for metrics/graph.") | |
start_date, end_date = analyze_trading_history(activities) | |
if start_date and end_date: | |
logger.info(f"Comparing to market (SPY) from {start_date} to {end_date}...") | |
market_result = get_market_performance(start_date, end_date) | |
if market_result: | |
pct_change, start_price, end_price = market_result | |
print(f"SPY performance from {start_date} to {end_date}: {pct_change:.2f}% (from ${start_price:.2f} to ${end_price:.2f})") | |
else: | |
print("Could not fetch SPY market performance for the given period.") | |
else: | |
print("No valid trade dates found for market comparison.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment