Skip to content

Instantly share code, notes, and snippets.

@jongan69
Created May 10, 2025 23:10
Show Gist options
  • Save jongan69/ad70465c323cacee3a095c33f5dd4cfe to your computer and use it in GitHub Desktop.
Save jongan69/ad70465c323cacee3a095c33f5dd4cfe to your computer and use it in GitHub Desktop.
A Script for running portfolio metrics against a fidelity brokerage account, uses FIDELITY_USERNAME, FIDELITY_PASSWORD, and FIDELITY_ACCOUNT_NUMBER to retrieve ofx data and parses it
import os
import datetime
import codecs
import re
from decimal import Decimal
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from ofxtools.Client import OFXClient, InvStmtRq
from ofxtools.utils import UTC
from ofxparse import OfxParser
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Set up your credentials and endpoint
FIDELITY_URL = "https://ofx.fidelity.com/ftgw/OFX/clients/download"
FIDELITY_USERID = os.getenv("FIDELITY_USERNAME")
FIDELITY_PASSWORD = os.getenv("FIDELITY_PASSWORD")
FIDELITY_ORG = "fidelity.com"
FIDELITY_FID = "7776"
FIDELITY_BROKERID = "fidelity.com"
OFX_VERSION = 220
# Set up the client
client = OFXClient(
url=FIDELITY_URL,
userid=FIDELITY_USERID,
org=FIDELITY_ORG,
fid=FIDELITY_FID,
brokerid=FIDELITY_BROKERID,
version=OFX_VERSION,
appid="QWIN",
appver="2700",
useragent="InetClntApp/3.0"
)
# Set up the date range
dtstart = datetime.datetime.now(tz=UTC) - datetime.timedelta(days=30)
dtend = datetime.datetime.now(tz=UTC)
# Replace with your actual account number
ACCOUNT_ID = os.getenv("FIDELITY_ACCOUNT_NUMBER")
# Build the investment statement request
stmt = InvStmtRq(acctid=ACCOUNT_ID, dtstart=dtstart, dtend=dtend)
# Download the statement
response = client.request_statements(FIDELITY_PASSWORD, stmt)
# Save the OFX data to a file
ofx_data = response.read()
with open("fidelity_statement.ofx", "wb") as f:
f.write(ofx_data)
# Build CUSIP to Ticker mapping from the OFX file
cusip_to_info = {}
with open("fidelity_statement.ofx", "r") as f:
ofx_text = f.read()
for secinfo in re.findall(r"<SECINFO>.*?</SECINFO>", ofx_text, re.DOTALL):
cusip_match = re.search(r"<UNIQUEID>([^<]+)", secinfo)
ticker_match = re.search(r"<TICKER>([^<]+)", secinfo)
name_match = re.search(r"<SECNAME>([^<]+)", secinfo)
if cusip_match and ticker_match:
cusip = cusip_match.group(1).strip()
ticker = ticker_match.group(1).strip()
name = name_match.group(1).strip() if name_match else "N/A"
cusip_to_info[cusip] = {"ticker": ticker, "name": name}
# Parse the OFX file
with codecs.open("fidelity_statement.ofx") as fileobj:
ofx = OfxParser.parse(fileobj)
account = ofx.account
statement = account.statement
# Portfolio Positions
print("\nPortfolio Positions:")
total_value = Decimal("0.0")
for position in getattr(statement, "positions", []):
security = position.security
info = cusip_to_info.get(security, {})
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A"
name = info.get("name", "N/A")
units = position.units
market_value = position.market_value
total_value += market_value
print(f"Symbol: {symbol}, Name: {name}, Units: {units}, Market Value: ${market_value:,.2f}")
print(f"\nTotal Portfolio Value: ${total_value:,.2f}")
# --- Pie Chart of Portfolio Holdings ---
# Collect data
pie_labels = []
pie_sizes = []
for position in getattr(statement, "positions", []):
security = position.security
info = cusip_to_info.get(security, {})
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A"
market_value = position.market_value
if market_value > 0:
pie_labels.append(symbol)
pie_sizes.append(float(market_value))
# Optionally, only show the top N holdings and group the rest as "Other"
N = 10
if len(pie_labels) > N:
# Sort by size
sorted_indices = sorted(range(len(pie_sizes)), key=lambda i: pie_sizes[i], reverse=True)
top_indices = sorted_indices[:N]
other_indices = sorted_indices[N:]
top_labels = [pie_labels[i] for i in top_indices]
top_sizes = [pie_sizes[i] for i in top_indices]
other_size = sum(pie_sizes[i] for i in other_indices)
top_labels.append("Other")
top_sizes.append(other_size)
pie_labels = top_labels
pie_sizes = top_sizes
# Plot
plt.figure(figsize=(8, 8))
plt.pie(pie_sizes, labels=pie_labels, autopct='%1.1f%%', startangle=140)
plt.title("Portfolio Holdings Breakdown")
plt.tight_layout()
plt.show(block=False)
# Recent Transactions
print("\nRecent Transactions:")
for txn in getattr(statement, "transactions", []):
txn_type = txn.type
trade_date = getattr(txn, "tradeDate", getattr(txn, "date", "N/A"))
security = getattr(txn, "security", None)
info = cusip_to_info.get(security, {})
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A"
name = info.get("name", "N/A")
units = getattr(txn, "units", "N/A")
total = getattr(txn, "total", getattr(txn, "amount", "N/A"))
print(f"{trade_date}: {txn_type} {units} of {symbol} for ${total}")
# --- Portfolio Metrics and Performance vs SPY ---
# 1. Get the list of tickers and units (as of latest statement)
positions = []
for position in getattr(statement, "positions", []):
security = position.security
info = cusip_to_info.get(security, {})
ticker = info.get("ticker", "").strip()
if ticker: # Only add if ticker is not empty after stripping
positions.append({
"ticker": "BRK-B" if ticker == "BRKB" else ticker,
"units": float(position.units)
})
if positions:
# 2. Clean up tickers: just use the already filtered positions
tickers = [p["ticker"] for p in positions]
print(f"Tickers to download: {tickers}")
if not tickers:
print("No valid tickers found in portfolio positions. Exiting.")
exit(1)
def is_valid_ticker(ticker):
try:
test = yf.Ticker(ticker)
hist = test.history(period='1d')
return not hist.empty
except Exception:
return False
# Filter tickers for validity
valid_tickers = []
for t in tickers:
if t and is_valid_ticker(t):
valid_tickers.append(t)
print(f"Valid tickers to download: {valid_tickers}")
if not valid_tickers:
print("No valid tickers found after filtering. Exiting.")
exit(1)
# Define start_date and end_date before downloading
start_date = (datetime.datetime.now() - datetime.timedelta(days=365)).strftime("%Y-%m-%d")
end_date = datetime.datetime.now().strftime("%Y-%m-%d")
# 3. Download portfolio tickers one by one
price_data_dict = {}
failed_tickers = []
for t in valid_tickers:
try:
df = yf.download(t, start=start_date, end=end_date, auto_adjust=True)
print(f"\nTicker: {t}")
print(f"Type of df: {type(df)}")
print(f"df.head():\n{df.head()}\n")
if 'Close' in df.columns:
print(f"Type of df['Close']: {type(df['Close'])}")
print(f"df['Close'].head():\n{df['Close'].head()}\n")
# If df['Close'] is a DataFrame, extract the Series for the ticker
if isinstance(df['Close'], pd.DataFrame) and t in df['Close'].columns:
close_series = df['Close'][t]
price_data_dict[t] = close_series
print(f"Extracted Series for {t}:")
print(close_series.head())
elif isinstance(df['Close'], pd.Series):
price_data_dict[t] = df['Close']
print(f"Added Series for {t}:")
print(df['Close'].head())
else:
print(f"Warning: Could not extract Series for {t}, skipping.")
failed_tickers.append(t)
else:
print("No 'Close' column in df.")
failed_tickers.append(t)
except Exception as e:
print(f"Error downloading {t}: {e}")
failed_tickers.append(t)
# Debug print to show what is in price_data_dict
print(f"price_data_dict contents: {[(k, type(v)) for k, v in price_data_dict.items()]}")
if not price_data_dict:
print("No valid tickers with price data found. Exiting.")
print("If you want, I can help you filter your tickers for yfinance compatibility (e.g., US stocks and ETFs only). Let me know!")
exit(1)
# Combine into a single DataFrame
price_data = pd.DataFrame(price_data_dict)
print(f"Successfully downloaded data for: {list(price_data.columns)}")
print(f"Failed tickers: {failed_tickers}")
# 4. Download SPY separately
spy_df = yf.download("SPY", start=start_date, end=end_date, auto_adjust=True)
if "Adj Close" in spy_df.columns:
spy_data = spy_df["Adj Close"]
elif "Close" in spy_df.columns:
spy_data = spy_df["Close"]
else:
raise ValueError("SPY data missing 'Adj Close' and 'Close' columns")
# 5. Calculate daily portfolio value
portfolio_values = []
for date, row in price_data.iterrows():
value = 0
for p in positions:
t = "BRK-B" if p["ticker"] == "BRKB" else p["ticker"]
if t in row and not np.isnan(row[t]):
value += p["units"] * row[t]
portfolio_values.append(value)
portfolio_series = pd.Series(portfolio_values, index=price_data.index)
# 6. Calculate daily returns
portfolio_returns = portfolio_series.pct_change().dropna()
spy_returns = spy_data.pct_change().dropna()
# 7. Calculate Sharpe ratio (assume risk-free rate = 0)
sharpe_ratio = (portfolio_returns.mean() / portfolio_returns.std()) * np.sqrt(252)
print(f"\nSharpe Ratio (Portfolio): {sharpe_ratio:.2f}")
# 8. Calculate annualized return and volatility
annualized_return = portfolio_returns.mean() * 252
annualized_volatility = portfolio_returns.std() * np.sqrt(252)
print(f"Annualized Return: {annualized_return:.2%}")
print(f"Annualized Volatility: {annualized_volatility:.2%}")
# 9. Calculate max drawdown
cumulative_max = (1 + portfolio_returns).cumprod().cummax()
drawdown = (1 + portfolio_returns).cumprod() / cumulative_max - 1
max_drawdown = drawdown.min()
print(f"Max Drawdown: {max_drawdown:.2%}")
# 10. Plot cumulative returns
portfolio_cum = (1 + portfolio_returns).cumprod()
spy_cum = (1 + spy_returns).cumprod()
plt.figure(figsize=(12, 6))
plt.plot(portfolio_cum, label="Portfolio")
plt.plot(spy_cum, label="SPY")
plt.title("Portfolio vs SPY Performance (Past Year, Static Holdings)")
plt.xlabel("Date")
plt.ylabel("Cumulative Return")
plt.legend()
plt.tight_layout()
plt.grid()
plt.savefig("portfolio_vs_spy.png")
plt.show(block=False)
print("\nPortfolio Performance Metrics:")
print(f"Annualized Return: {annualized_return:.2%}")
print(f"Annualized Volatility: {annualized_volatility:.2%}")
# Calculate and print Total Return %
total_return = (portfolio_cum.iloc[-1] - 1) * 100
print(f"Total Return: {total_return:.2f}%")
plt.show(block=False)
input('Press Enter to exit...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment