Skip to content

Instantly share code, notes, and snippets.

@jongan69
Created May 10, 2025 23:24
Show Gist options
  • Save jongan69/506c509bbaadf01ba0b1ccc276173352 to your computer and use it in GitHub Desktop.
Save jongan69/506c509bbaadf01ba0b1ccc276173352 to your computer and use it in GitHub Desktop.
Script for running portfolio metrics on alpaca portfolio
import os
import sys
import logging
from datetime import datetime, timedelta
import requests
from dotenv import load_dotenv
import yfinance as yf
from collections import Counter, defaultdict
import re
from alpaca.trading.client import TradingClient
from alpaca.trading.models import Position
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
load_dotenv()
ALPACA_API_KEY = os.getenv("ALPACA_API_KEY")
ALPACA_API_SECRET = os.getenv("ALPACA_API_SECRET")
IS_DEVELOPMENT = False
if not ALPACA_API_KEY or not ALPACA_API_SECRET:
logger.error("Missing Alpaca API credentials.")
sys.exit(1)
def fetch_account_activities(activity_types=None, max_pages=1000, start_date=None, end_date=None):
"""
Fetch account activities from Alpaca, optionally for a date range.
If both start_date and end_date are provided and are the same, use 'date'.
If a range, use 'after' and 'until'.
"""
url = 'https://api.alpaca.markets/v2/account/activities'
headers = {
'accept': 'application/json',
'APCA-API-KEY-ID': ALPACA_API_KEY,
'APCA-API-SECRET-KEY': ALPACA_API_SECRET
}
params = {
'direction': 'desc',
'page_size': 100
}
if activity_types:
params['activity_types'] = ','.join(activity_types)
if start_date and end_date and start_date == end_date:
params['date'] = str(start_date)
else:
if start_date:
params['after'] = str(start_date)
if end_date:
params['until'] = str(end_date)
activities = []
next_page_token = None
for _ in range(max_pages):
if next_page_token:
params['page_token'] = next_page_token
else:
params.pop('page_token', None)
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
logger.error(f"Error fetching activities: {response.status_code}, {response.text}")
break
page = response.json()
if not page:
break
activities.extend(page)
next_page_token = response.headers.get('Next-Page-Token')
if not next_page_token:
break
return activities
def fetch_all_account_activities_by_year(activity_types=None, first_year=2018):
"""
Fetch all account activities by looping over years to avoid missing old activities (esp. dividends).
"""
today = datetime.utcnow().date()
all_activities = []
for year in range(first_year, today.year + 1):
start = datetime(year, 1, 1).date()
end = datetime(year, 12, 31).date()
if end > today:
end = today
logger.info(f"Fetching activities for {year} ({start} to {end})...")
acts = fetch_account_activities(activity_types=activity_types, start_date=start, end_date=end)
logger.info(f" Retrieved {len(acts)} activities for {year}.")
all_activities.extend(acts)
return all_activities
def get_market_performance(start_date, end_date, symbol="SPY"):
"""
Fetch market performance (e.g., SPY) between start_date and end_date using yfinance.
Returns the percent change over the period.
"""
ticker = yf.Ticker(symbol)
hist = ticker.history(start=start_date, end=end_date)
if hist.empty:
logger.warning(f"No market data for {symbol} between {start_date} and {end_date}")
return None
start_price = hist['Close'].iloc[0]
end_price = hist['Close'].iloc[-1]
pct_change = (end_price - start_price) / start_price * 100
return pct_change, start_price, end_price
def analyze_trading_history(activities):
"""
Analyze trading history: count trades, win/loss, average P/L, etc.
Add debug logs to inspect available data fields.
Also returns the earliest and latest trade dates for market comparison.
"""
trades = [a for a in activities if a.get('activity_type') == 'FILL']
if not trades:
logger.info("No trades found.")
return None, None
logger.info(f"Sample trade keys: {list(trades[0].keys())}")
logger.debug(f"Sample trade: {trades[0]}")
# Log all unique keys across all trades
all_keys = set()
for t in trades:
all_keys.update(t.keys())
logger.info(f"All unique keys in FILL activities: {sorted(all_keys)}")
# Log if profit_loss or other useful fields are present in any trade
profit_loss_present = any('profit_loss' in t for t in trades)
logger.info(f"Any trade has 'profit_loss' field: {profit_loss_present}")
logger.info(f"First 3 trades with 'profit_loss': {[t for t in trades if 'profit_loss' in t][:3]}")
# Log a few sample trades for manual inspection
for i, t in enumerate(trades[:3]):
logger.debug(f"Trade {i+1}: {t}")
# Only return date range for market comparison
trade_dates = []
for t in trades:
tx_time = t.get('transaction_time')
if tx_time:
try:
trade_dates.append(datetime.fromisoformat(tx_time.replace('Z', '+00:00')))
except Exception:
pass
if trade_dates:
start_date = min(trade_dates).date()
end_date = max(trade_dates).date()
return start_date, end_date
else:
return None, None
def summarize_account_activities(activities):
"""
Summarize all account activities and print portfolio metrics.
"""
activity_types = [a.get('activity_type') for a in activities]
type_counts = Counter(activity_types)
print("\nActivity type counts:")
for t, c in type_counts.items():
print(f" {t}: {c}")
# Helper to detect options symbols (very basic)
def is_option(symbol):
return bool(re.search(r'[0-9]{6,}', symbol or ''))
# Shares bought/sold and average price per symbol
shares_bought = defaultdict(float)
shares_sold = defaultdict(float)
buy_amount = defaultdict(float)
sell_amount = defaultdict(float)
for a in activities:
if a.get('activity_type') == 'FILL':
symbol = a.get('symbol')
qty = float(a.get('qty', 0))
price = float(a.get('price', 0))
side = a.get('side')
if side == 'buy':
shares_bought[symbol] += qty
buy_amount[symbol] += qty * price
elif side == 'sell':
shares_sold[symbol] += qty
sell_amount[symbol] += qty * price
def print_symbol_table(title, symbol_dict, avg_price_dict=None, net_position_dict=None, filter_options=None):
print(f"\n{title}")
print(f"{'Symbol':<20}{'Qty':>10}{'Avg Price':>15}{'Net Pos':>12}")
print("-"*60)
for sym, qty in symbol_dict.items():
if filter_options is not None and is_option(sym) != filter_options:
continue
avg_price = avg_price_dict[sym]/qty if avg_price_dict and qty else 0.0
net_pos = net_position_dict[sym] if net_position_dict else ''
print(f"{sym:<20}{qty:>10.2f}{avg_price:>15.2f}{net_pos:>12}")
# Net position per symbol
net_position = {sym: shares_bought[sym] - shares_sold[sym] for sym in set(list(shares_bought.keys()) + list(shares_sold.keys()))}
# Print stocks and options separately
print_symbol_table("Stocks bought per symbol:", shares_bought, buy_amount, net_position, filter_options=False)
print_symbol_table("Options bought per symbol:", shares_bought, buy_amount, net_position, filter_options=True)
print_symbol_table("Stocks sold per symbol:", shares_sold, sell_amount, net_position, filter_options=False)
print_symbol_table("Options sold per symbol:", shares_sold, sell_amount, net_position, filter_options=True)
# Dividends received per symbol
dividends = defaultdict(float)
for a in activities:
if a.get('activity_type') == 'DIV':
symbol = a.get('symbol')
amount = float(a.get('net_amount', 0))
dividends[symbol] += amount
total_dividends = sum(dividends.values())
print("\nDividends received per symbol:")
for sym, amt in dividends.items():
print(f" {sym}: ${amt:.2f}")
print(f"Total dividends: ${total_dividends:.2f}")
# Net cash flow (deposits/withdrawals)
deposits = 0.0
withdrawals = 0.0
for a in activities:
if a.get('activity_type') == 'TRANSFER':
amount = float(a.get('net_amount', 0))
if amount > 0:
deposits += amount
elif amount < 0:
withdrawals += amount
print(f"\nDeposits: ${deposits:.2f}")
print(f"Withdrawals: ${withdrawals:.2f}")
print(f"Net cash flow from TRANSFER activities: ${deposits + withdrawals:.2f}")
def print_current_portfolio_metrics():
"""
Print current portfolio metrics using Alpaca SDK.
"""
trading_client = TradingClient(ALPACA_API_KEY, ALPACA_API_SECRET, paper=IS_DEVELOPMENT)
account = trading_client.get_account()
positions = trading_client.get_all_positions()
print("\n--- Current Portfolio Metrics ---")
print(f"Account value: ${float(account.portfolio_value):,.2f}")
print(f"Cash: ${float(account.cash):,.2f}")
print(f"Buying power: ${float(account.buying_power):,.2f}")
print(f"Long market value: ${float(account.long_market_value):,.2f}")
print(f"Short market value: ${float(account.short_market_value):,.2f}")
total_unrealized_pl = sum(float(p.unrealized_pl) for p in positions)
total_cost_basis = sum(float(p.cost_basis) for p in positions)
unrealized_pl_pc = (total_unrealized_pl / total_cost_basis * 100) if total_cost_basis else 0.0
print(f"Total Unrealized P/L: ${total_unrealized_pl:,.2f}")
print(f"Total Unrealized P/L %: {unrealized_pl_pc:.2f}%")
print(f"Number of open positions: {len(positions)}")
if not positions:
print("No open positions.")
return
print("\nOpen Positions:")
total_market_value = sum(float(p.market_value) for p in positions)
for pos in positions:
alloc = (float(pos.market_value) / total_market_value * 100) if total_market_value else 0.0
print(f" {pos.symbol}: {pos.qty} @ ${pos.avg_entry_price} (Current: ${pos.current_price}) | Unrealized P/L: ${pos.unrealized_pl} ({float(pos.unrealized_plpc)*100:.2f}%) | Alloc: {alloc:.2f}%")
def fetch_portfolio_history(timeframe='1D', period='1A'):
"""
Fetch portfolio value history from Alpaca REST API.
Returns a pandas DataFrame with 'timestamp' and 'equity'.
"""
url = 'https://api.alpaca.markets/v2/account/portfolio/history'
headers = {
'accept': 'application/json',
'APCA-API-KEY-ID': ALPACA_API_KEY,
'APCA-API-SECRET-KEY': ALPACA_API_SECRET
}
params = {
'timeframe': timeframe, # '1D' for daily
'period': period, # e.g., '1y', '6M', '3M', 'max'
'extended_hours': 'false'
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
logger.error(f"Error fetching portfolio history: {response.status_code}, {response.text}")
return None
data = response.json()
if not data or 'equity' not in data or not data['equity']:
logger.error("No equity data in portfolio history response.")
return None
df = pd.DataFrame({
'timestamp': pd.to_datetime(data['timestamp'], unit='s'),
'equity': data['equity']
})
df = df.dropna()
return df
def calculate_sharpe_ratio(equity_series, risk_free_rate=0.03):
"""
Calculate the annualized Sharpe ratio from a series of daily portfolio values.
"""
returns = equity_series.pct_change().dropna()
excess_returns = returns - (risk_free_rate / 252) # 252 trading days
sharpe = np.sqrt(252) * excess_returns.mean() / excess_returns.std() if excess_returns.std() else 0.0
return sharpe
def plot_portfolio_vs_benchmarks(portfolio_df, start, end):
"""
Plot normalized portfolio and multiple benchmarks performance over time.
"""
plt.figure(figsize=(12, 6))
# Normalize portfolio to 1.0 at the start
portfolio_norm = portfolio_df['equity'] / portfolio_df['equity'].iloc[0]
plt.plot(portfolio_df['timestamp'], portfolio_norm, label='Portfolio', linewidth=2)
# Benchmarks to plot
benchmarks = ['SPY', 'QQQ', 'DIA', 'IWM', 'VTI', 'AGG', 'GLD']
for symbol in benchmarks:
ticker = yf.Ticker(symbol)
hist = ticker.history(start=start, end=end)
if not hist.empty:
norm = hist['Close'] / hist['Close'].iloc[0]
plt.plot(hist.index, norm, label=symbol)
else:
print(f"Warning: No data for benchmark {symbol} in the given period.")
plt.title('Portfolio vs. Benchmarks Performance')
plt.xlabel('Date')
plt.ylabel('Normalized Value (Start=1.0)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def print_portfolio_metrics(portfolio_df, risk_free_rate=0.03):
"""
Print key portfolio metrics: Sharpe Ratio, Annualized Return, Annualized Volatility, Max Drawdown.
"""
equity = portfolio_df['equity']
returns = equity.pct_change().dropna()
n_days = returns.shape[0]
if n_days == 0:
print("Not enough data for metrics.")
return
# Annualized Return (CAGR)
start_value = equity.iloc[0]
end_value = equity.iloc[-1]
cagr = (end_value / start_value) ** (252 / n_days) - 1
# Total Return Percentage
total_return_pct = (end_value - start_value) / start_value * 100
# Annualized Volatility
ann_vol = returns.std() * np.sqrt(252)
# Max Drawdown
running_max = equity.cummax()
drawdown = (equity - running_max) / running_max
max_drawdown = drawdown.min()
# Sharpe Ratio
sharpe = calculate_sharpe_ratio(equity, risk_free_rate)
print("\n--- Portfolio Metrics ---")
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Annualized Return: {cagr*100:.2f}%")
print(f"Total Return: {total_return_pct:.2f}%")
print(f"Annualized Volatility: {ann_vol*100:.2f}%")
print(f"Max Drawdown: {max_drawdown*100:.2f}%")
def main():
logger.info("Fetching trading history...")
# Use the new batch fetcher to get all activities by year
activities = fetch_all_account_activities_by_year()
logger.info(f"Fetched {len(activities)} activities (all years).")
summarize_account_activities(activities)
print_current_portfolio_metrics()
# Fetch and analyze portfolio history
portfolio_df = fetch_portfolio_history(timeframe='1D', period='1A')
if portfolio_df is not None:
print_portfolio_metrics(portfolio_df)
# Fetch benchmarks for the same period
start = portfolio_df['timestamp'].iloc[0].date()
end = portfolio_df['timestamp'].iloc[-1].date() + pd.Timedelta(days=1)
plot_portfolio_vs_benchmarks(portfolio_df, start, end)
else:
print("Could not fetch portfolio history for metrics/graph.")
start_date, end_date = analyze_trading_history(activities)
if start_date and end_date:
logger.info(f"Comparing to market (SPY) from {start_date} to {end_date}...")
market_result = get_market_performance(start_date, end_date)
if market_result:
pct_change, start_price, end_price = market_result
print(f"SPY performance from {start_date} to {end_date}: {pct_change:.2f}% (from ${start_price:.2f} to ${end_price:.2f})")
else:
print("Could not fetch SPY market performance for the given period.")
else:
print("No valid trade dates found for market comparison.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment