Created
May 10, 2025 23:10
-
-
Save jongan69/ad70465c323cacee3a095c33f5dd4cfe to your computer and use it in GitHub Desktop.
A Script for running portfolio metrics against a fidelity brokerage account, uses FIDELITY_USERNAME, FIDELITY_PASSWORD, and FIDELITY_ACCOUNT_NUMBER to retrieve ofx data and parses it
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import datetime | |
import codecs | |
import re | |
from decimal import Decimal | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import yfinance as yf | |
from ofxtools.Client import OFXClient, InvStmtRq | |
from ofxtools.utils import UTC | |
from ofxparse import OfxParser | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Set up your credentials and endpoint | |
FIDELITY_URL = "https://ofx.fidelity.com/ftgw/OFX/clients/download" | |
FIDELITY_USERID = os.getenv("FIDELITY_USERNAME") | |
FIDELITY_PASSWORD = os.getenv("FIDELITY_PASSWORD") | |
FIDELITY_ORG = "fidelity.com" | |
FIDELITY_FID = "7776" | |
FIDELITY_BROKERID = "fidelity.com" | |
OFX_VERSION = 220 | |
# Set up the client | |
client = OFXClient( | |
url=FIDELITY_URL, | |
userid=FIDELITY_USERID, | |
org=FIDELITY_ORG, | |
fid=FIDELITY_FID, | |
brokerid=FIDELITY_BROKERID, | |
version=OFX_VERSION, | |
appid="QWIN", | |
appver="2700", | |
useragent="InetClntApp/3.0" | |
) | |
# Set up the date range | |
dtstart = datetime.datetime.now(tz=UTC) - datetime.timedelta(days=30) | |
dtend = datetime.datetime.now(tz=UTC) | |
# Replace with your actual account number | |
ACCOUNT_ID = os.getenv("FIDELITY_ACCOUNT_NUMBER") | |
# Build the investment statement request | |
stmt = InvStmtRq(acctid=ACCOUNT_ID, dtstart=dtstart, dtend=dtend) | |
# Download the statement | |
response = client.request_statements(FIDELITY_PASSWORD, stmt) | |
# Save the OFX data to a file | |
ofx_data = response.read() | |
with open("fidelity_statement.ofx", "wb") as f: | |
f.write(ofx_data) | |
# Build CUSIP to Ticker mapping from the OFX file | |
cusip_to_info = {} | |
with open("fidelity_statement.ofx", "r") as f: | |
ofx_text = f.read() | |
for secinfo in re.findall(r"<SECINFO>.*?</SECINFO>", ofx_text, re.DOTALL): | |
cusip_match = re.search(r"<UNIQUEID>([^<]+)", secinfo) | |
ticker_match = re.search(r"<TICKER>([^<]+)", secinfo) | |
name_match = re.search(r"<SECNAME>([^<]+)", secinfo) | |
if cusip_match and ticker_match: | |
cusip = cusip_match.group(1).strip() | |
ticker = ticker_match.group(1).strip() | |
name = name_match.group(1).strip() if name_match else "N/A" | |
cusip_to_info[cusip] = {"ticker": ticker, "name": name} | |
# Parse the OFX file | |
with codecs.open("fidelity_statement.ofx") as fileobj: | |
ofx = OfxParser.parse(fileobj) | |
account = ofx.account | |
statement = account.statement | |
# Portfolio Positions | |
print("\nPortfolio Positions:") | |
total_value = Decimal("0.0") | |
for position in getattr(statement, "positions", []): | |
security = position.security | |
info = cusip_to_info.get(security, {}) | |
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A" | |
name = info.get("name", "N/A") | |
units = position.units | |
market_value = position.market_value | |
total_value += market_value | |
print(f"Symbol: {symbol}, Name: {name}, Units: {units}, Market Value: ${market_value:,.2f}") | |
print(f"\nTotal Portfolio Value: ${total_value:,.2f}") | |
# --- Pie Chart of Portfolio Holdings --- | |
# Collect data | |
pie_labels = [] | |
pie_sizes = [] | |
for position in getattr(statement, "positions", []): | |
security = position.security | |
info = cusip_to_info.get(security, {}) | |
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A" | |
market_value = position.market_value | |
if market_value > 0: | |
pie_labels.append(symbol) | |
pie_sizes.append(float(market_value)) | |
# Optionally, only show the top N holdings and group the rest as "Other" | |
N = 10 | |
if len(pie_labels) > N: | |
# Sort by size | |
sorted_indices = sorted(range(len(pie_sizes)), key=lambda i: pie_sizes[i], reverse=True) | |
top_indices = sorted_indices[:N] | |
other_indices = sorted_indices[N:] | |
top_labels = [pie_labels[i] for i in top_indices] | |
top_sizes = [pie_sizes[i] for i in top_indices] | |
other_size = sum(pie_sizes[i] for i in other_indices) | |
top_labels.append("Other") | |
top_sizes.append(other_size) | |
pie_labels = top_labels | |
pie_sizes = top_sizes | |
# Plot | |
plt.figure(figsize=(8, 8)) | |
plt.pie(pie_sizes, labels=pie_labels, autopct='%1.1f%%', startangle=140) | |
plt.title("Portfolio Holdings Breakdown") | |
plt.tight_layout() | |
plt.show(block=False) | |
# Recent Transactions | |
print("\nRecent Transactions:") | |
for txn in getattr(statement, "transactions", []): | |
txn_type = txn.type | |
trade_date = getattr(txn, "tradeDate", getattr(txn, "date", "N/A")) | |
security = getattr(txn, "security", None) | |
info = cusip_to_info.get(security, {}) | |
symbol = info.get("ticker", security) if isinstance(security, str) else "N/A" | |
name = info.get("name", "N/A") | |
units = getattr(txn, "units", "N/A") | |
total = getattr(txn, "total", getattr(txn, "amount", "N/A")) | |
print(f"{trade_date}: {txn_type} {units} of {symbol} for ${total}") | |
# --- Portfolio Metrics and Performance vs SPY --- | |
# 1. Get the list of tickers and units (as of latest statement) | |
positions = [] | |
for position in getattr(statement, "positions", []): | |
security = position.security | |
info = cusip_to_info.get(security, {}) | |
ticker = info.get("ticker", "").strip() | |
if ticker: # Only add if ticker is not empty after stripping | |
positions.append({ | |
"ticker": "BRK-B" if ticker == "BRKB" else ticker, | |
"units": float(position.units) | |
}) | |
if positions: | |
# 2. Clean up tickers: just use the already filtered positions | |
tickers = [p["ticker"] for p in positions] | |
print(f"Tickers to download: {tickers}") | |
if not tickers: | |
print("No valid tickers found in portfolio positions. Exiting.") | |
exit(1) | |
def is_valid_ticker(ticker): | |
try: | |
test = yf.Ticker(ticker) | |
hist = test.history(period='1d') | |
return not hist.empty | |
except Exception: | |
return False | |
# Filter tickers for validity | |
valid_tickers = [] | |
for t in tickers: | |
if t and is_valid_ticker(t): | |
valid_tickers.append(t) | |
print(f"Valid tickers to download: {valid_tickers}") | |
if not valid_tickers: | |
print("No valid tickers found after filtering. Exiting.") | |
exit(1) | |
# Define start_date and end_date before downloading | |
start_date = (datetime.datetime.now() - datetime.timedelta(days=365)).strftime("%Y-%m-%d") | |
end_date = datetime.datetime.now().strftime("%Y-%m-%d") | |
# 3. Download portfolio tickers one by one | |
price_data_dict = {} | |
failed_tickers = [] | |
for t in valid_tickers: | |
try: | |
df = yf.download(t, start=start_date, end=end_date, auto_adjust=True) | |
print(f"\nTicker: {t}") | |
print(f"Type of df: {type(df)}") | |
print(f"df.head():\n{df.head()}\n") | |
if 'Close' in df.columns: | |
print(f"Type of df['Close']: {type(df['Close'])}") | |
print(f"df['Close'].head():\n{df['Close'].head()}\n") | |
# If df['Close'] is a DataFrame, extract the Series for the ticker | |
if isinstance(df['Close'], pd.DataFrame) and t in df['Close'].columns: | |
close_series = df['Close'][t] | |
price_data_dict[t] = close_series | |
print(f"Extracted Series for {t}:") | |
print(close_series.head()) | |
elif isinstance(df['Close'], pd.Series): | |
price_data_dict[t] = df['Close'] | |
print(f"Added Series for {t}:") | |
print(df['Close'].head()) | |
else: | |
print(f"Warning: Could not extract Series for {t}, skipping.") | |
failed_tickers.append(t) | |
else: | |
print("No 'Close' column in df.") | |
failed_tickers.append(t) | |
except Exception as e: | |
print(f"Error downloading {t}: {e}") | |
failed_tickers.append(t) | |
# Debug print to show what is in price_data_dict | |
print(f"price_data_dict contents: {[(k, type(v)) for k, v in price_data_dict.items()]}") | |
if not price_data_dict: | |
print("No valid tickers with price data found. Exiting.") | |
print("If you want, I can help you filter your tickers for yfinance compatibility (e.g., US stocks and ETFs only). Let me know!") | |
exit(1) | |
# Combine into a single DataFrame | |
price_data = pd.DataFrame(price_data_dict) | |
print(f"Successfully downloaded data for: {list(price_data.columns)}") | |
print(f"Failed tickers: {failed_tickers}") | |
# 4. Download SPY separately | |
spy_df = yf.download("SPY", start=start_date, end=end_date, auto_adjust=True) | |
if "Adj Close" in spy_df.columns: | |
spy_data = spy_df["Adj Close"] | |
elif "Close" in spy_df.columns: | |
spy_data = spy_df["Close"] | |
else: | |
raise ValueError("SPY data missing 'Adj Close' and 'Close' columns") | |
# 5. Calculate daily portfolio value | |
portfolio_values = [] | |
for date, row in price_data.iterrows(): | |
value = 0 | |
for p in positions: | |
t = "BRK-B" if p["ticker"] == "BRKB" else p["ticker"] | |
if t in row and not np.isnan(row[t]): | |
value += p["units"] * row[t] | |
portfolio_values.append(value) | |
portfolio_series = pd.Series(portfolio_values, index=price_data.index) | |
# 6. Calculate daily returns | |
portfolio_returns = portfolio_series.pct_change().dropna() | |
spy_returns = spy_data.pct_change().dropna() | |
# 7. Calculate Sharpe ratio (assume risk-free rate = 0) | |
sharpe_ratio = (portfolio_returns.mean() / portfolio_returns.std()) * np.sqrt(252) | |
print(f"\nSharpe Ratio (Portfolio): {sharpe_ratio:.2f}") | |
# 8. Calculate annualized return and volatility | |
annualized_return = portfolio_returns.mean() * 252 | |
annualized_volatility = portfolio_returns.std() * np.sqrt(252) | |
print(f"Annualized Return: {annualized_return:.2%}") | |
print(f"Annualized Volatility: {annualized_volatility:.2%}") | |
# 9. Calculate max drawdown | |
cumulative_max = (1 + portfolio_returns).cumprod().cummax() | |
drawdown = (1 + portfolio_returns).cumprod() / cumulative_max - 1 | |
max_drawdown = drawdown.min() | |
print(f"Max Drawdown: {max_drawdown:.2%}") | |
# 10. Plot cumulative returns | |
portfolio_cum = (1 + portfolio_returns).cumprod() | |
spy_cum = (1 + spy_returns).cumprod() | |
plt.figure(figsize=(12, 6)) | |
plt.plot(portfolio_cum, label="Portfolio") | |
plt.plot(spy_cum, label="SPY") | |
plt.title("Portfolio vs SPY Performance (Past Year, Static Holdings)") | |
plt.xlabel("Date") | |
plt.ylabel("Cumulative Return") | |
plt.legend() | |
plt.tight_layout() | |
plt.grid() | |
plt.savefig("portfolio_vs_spy.png") | |
plt.show(block=False) | |
print("\nPortfolio Performance Metrics:") | |
print(f"Annualized Return: {annualized_return:.2%}") | |
print(f"Annualized Volatility: {annualized_volatility:.2%}") | |
# Calculate and print Total Return % | |
total_return = (portfolio_cum.iloc[-1] - 1) * 100 | |
print(f"Total Return: {total_return:.2f}%") | |
plt.show(block=False) | |
input('Press Enter to exit...') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment