Skip to content

Instantly share code, notes, and snippets.

@daviddwlee84
Last active April 10, 2025 06:39
Show Gist options
  • Save daviddwlee84/40040ef057df37e34f6f1a5a30a80833 to your computer and use it in GitHub Desktop.
Save daviddwlee84/40040ef057df37e34f6f1a5a30a80833 to your computer and use it in GitHub Desktop.
A Python real-time dashboard with TradingView Lightweight Chart example (by vibe coding)
import asyncio
import random
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
# Store connected WebSocket clients globally
connected_clients = []
# Task references to properly manage background tasks
background_tasks = set()
ENABLE_HISTORICAL_TICK_DATA = True
# Maximum number of historical ticks to store
HISTORY_BUFFER_SIZE = 100
# List to store the latest tick data
tick_history = []
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Create and store the background task
task = asyncio.create_task(stock_data_producer())
background_tasks.add(task)
# This prevents the task from being garbage collected
task.add_done_callback(background_tasks.discard)
print("Background stock data producer started")
yield # This is where the application runs
# Shutdown: Cancel all background tasks
for task in background_tasks:
task.cancel()
# Wait for all tasks to complete their cancellation
if background_tasks:
print("Shutting down background tasks...")
await asyncio.gather(*background_tasks, return_exceptions=True)
print("All background tasks shut down")
# Create FastAPI app with lifespan management
app = FastAPI(lifespan=lifespan)
# HTML page with TradingView chart implementation (same as before)
html = """
<!DOCTYPE html>
<html>
<head>
<title>Real-Time Stock Chart</title>
<!-- TradingView Lightweight Charts script from CDN -->
<script src="https://unpkg.com/[email protected]/dist/lightweight-charts.standalone.production.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h2 { color: #333; }
#chart { border: 1px solid #ddd; border-radius: 4px; }
</style>
</head>
<body>
<h2>Real-Time Stock Price Dashboard</h2>
<div id="chart" style="width: 600px; height: 300px;"></div>
<script>
// Create the chart with proper configuration
const chart = LightweightCharts.createChart(document.getElementById('chart'), {
width: 600,
height: 300,
layout: {
backgroundColor: '#ffffff',
textColor: '#333',
},
grid: {
vertLines: { color: '#f0f0f0' },
horzLines: { color: '#f0f0f0' },
},
crosshair: {
mode: LightweightCharts.CrosshairMode.Normal,
},
rightPriceScale: {
borderColor: '#d1d4dc',
visible: true
},
timeScale: {
borderColor: '#d1d4dc',
timeVisible: true,
secondsVisible: true
},
});
// Add a line series to the chart
const lineSeries = chart.addLineSeries({
color: '#2962FF',
lineWidth: 2,
});
// Data array to store chart points
let chartData = [];
// Format timestamp for TradingView chart
function formatTime(timestamp) {
const date = new Date(timestamp * 1000); // Convert to milliseconds
return {
year: date.getFullYear(),
month: date.getMonth() + 1,
day: date.getDate(),
hour: date.getHours(),
minute: date.getMinutes(),
second: date.getSeconds()
};
}
// Connect to the WebSocket endpoint
const ws = new WebSocket(`ws://${window.location.host}/ws`);
ws.onopen = () => {
console.log("Connected to the WebSocket server");
};
ws.onmessage = (event) => {
try {
// Parse incoming data
const rawData = JSON.parse(event.data);
// Format data point for TradingView chart
const dataPoint = {
time: rawData.time,
value: rawData.value
};
// Add new point to our data array
chartData.push(dataPoint);
// Only keep the last 100 points to prevent performance issues
if (chartData.length > 100) {
chartData = chartData.slice(-100);
}
// Update the chart with the new data
lineSeries.setData(chartData);
// Ensure the latest data is visible
chart.timeScale().fitContent();
console.log("Received new data point:", dataPoint);
} catch (error) {
console.error("Error processing data:", error);
}
};
ws.onclose = () => {
console.log("WebSocket connection closed");
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
};
</script>
</body>
</html>
"""
@app.get("/")
async def get_dashboard():
"""
Serves the HTML page with the TradingView chart.
"""
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint that accepts clients and keeps the connection open.
"""
await websocket.accept()
if ENABLE_HISTORICAL_TICK_DATA:
# Send the historical tick data first
for data_point in tick_history:
await websocket.send_json(data_point)
connected_clients.append(websocket)
print(f"Client connected. Total clients: {len(connected_clients)}")
try:
# Keep the connection open, waiting for client disconnect
while True:
# Wait for any messages (optional, helps detect disconnects)
data = await websocket.receive_text()
print(f"Received message: {data}")
except WebSocketDisconnect:
print("Client disconnected")
if websocket in connected_clients:
connected_clients.remove(websocket)
print(f"Client removed. Remaining clients: {len(connected_clients)}")
except Exception as e:
print(f"WebSocket error: {e}")
if websocket in connected_clients:
connected_clients.remove(websocket)
print(
f"Client removed due to error. Remaining clients: {len(connected_clients)}"
)
async def stock_data_producer():
"""
Background task that simulates real-time stock price updates and broadcasts data to all connected clients.
"""
base_price = 100
last_updated = 0
try:
while True:
current_time = time.time()
# Only generate data if there are clients connected and at least 1 second has passed
if connected_clients and (current_time - last_updated) >= 1:
# Simulate stock price updates using random fluctuations
simulated_price = base_price + random.uniform(-1, 1) * 2
data_point = {
"time": int(current_time), # Current timestamp in seconds
"value": round(simulated_price, 2),
}
# Append new tick to history and truncate if necessary
tick_history.append(data_point)
if len(tick_history) > HISTORY_BUFFER_SIZE:
tick_history[:] = tick_history[-HISTORY_BUFFER_SIZE:]
# Debug output
print(f"Broadcasting to {len(connected_clients)} clients: {data_point}")
# Broadcast the new data to every connected client
clients_to_remove = []
for client in connected_clients:
try:
await client.send_json(data_point)
except Exception as e:
print(f"Error sending to client: {e}")
# Mark client for removal
clients_to_remove.append(client)
# Remove failed clients (doing it here to avoid modifying the list during iteration)
for client in clients_to_remove:
if client in connected_clients:
connected_clients.remove(client)
# Update base price and last update time
base_price = simulated_price
last_updated = current_time
# Short sleep to prevent CPU hogging
await asyncio.sleep(
0.1
) # Check more frequently but only send updates every second
except asyncio.CancelledError:
print("Stock data producer task was cancelled")
# Clean task shutdown
raise
except Exception as e:
print(f"Unexpected error in stock data producer: {e}")
# In a real application, you might want to implement retry logic here
if __name__ == "__main__":
# Run the FastAPI app using uvicorn with improved logging
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Test TradingView Chart</title>
<!-- Load a specific version of the TradingView Lightweight Charts library -->
<script src="https://unpkg.com/[email protected]/dist/lightweight-charts.standalone.production.js"></script>
</head>
<body>
<h2>TradingView Chart Library Test</h2>
<div id="chart" style="width:600px; height:300px; border: 1px solid #ccc;"></div>
<script>
// Verify that LightweightCharts is loaded
console.log("LightweightCharts:", LightweightCharts);
// Create the chart using the container element
const container = document.getElementById("chart");
const chart = LightweightCharts.createChart(container, {
width: 600,
height: 300,
});
console.log("Chart object:", chart);
// Check if addLineSeries is available
if (typeof chart.addLineSeries === 'function') {
const lineSeries = chart.addLineSeries();
console.log("Line series created successfully:", lineSeries);
// Add some static data to display a simple line
lineSeries.setData([
{ time: '2023-04-10', value: 100 },
{ time: '2023-04-11', value: 105 },
{ time: '2023-04-12', value: 102 },
]);
} else {
console.error("chart.addLineSeries is not a function. Check that you are loading the correct version of the library.");
}
</script>
</body>
</html>
import asyncio
import websockets
async def receive_messages(websocket):
"""
Continuously receives messages from the websocket and prints them.
"""
try:
while True:
msg = await websocket.recv()
print("Received message:", msg)
except websockets.ConnectionClosed:
print("WebSocket connection closed.")
async def send_keepalive(websocket):
"""
Optionally sends a periodic message ('ping') to ensure that the connection remains active.
"""
while True:
await asyncio.sleep(5)
try:
await websocket.send("ping")
print("Sent ping to the server.")
except websockets.ConnectionClosed:
break
async def test_websocket():
uri = "ws://127.0.0.1:8000/ws" # Adjust if needed; for secure connections, use wss://
print(f"Connecting to {uri} ...")
async with websockets.connect(uri) as websocket:
print("Connected to WebSocket endpoint.")
# Create two concurrent tasks: one for receiving messages and one for sending keepalive messages.
receive_task = asyncio.create_task(receive_messages(websocket))
keepalive_task = asyncio.create_task(send_keepalive(websocket))
# Wait until either task finishes (the connection closes, for example)
done, pending = await asyncio.wait(
[receive_task, keepalive_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
if __name__ == "__main__":
asyncio.run(test_websocket())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment