Skip to content

Instantly share code, notes, and snippets.

@Nikolaj-K
Last active July 12, 2025 16:14
Show Gist options
  • Save Nikolaj-K/fa496c9f770c3094f01a776464275136 to your computer and use it in GitHub Desktop.
Save Nikolaj-K/fa496c9f770c3094f01a776464275136 to your computer and use it in GitHub Desktop.
Computing the autocorrelation, its spectrum and the power spectrum for a set of simple Markov chains
"""
Code described in the video
https://youtu.be/yyHd7BGGVp8
Note: A day after recording, I refactored the script to use a more continuous
random variable (no jump at full circle). I find this gives a nicer to read plot.
Secondly, I've also added a tiny `class State`, abstracting away the state indices
and conversions using string.ascii_uppercase.index.
"""
import random
import numpy as np
import matplotlib.pyplot as plt
import string
class Config:
NUM_LAGS = 25 # Just for the range of the autocorrelation plot
WALK_LEN = 60 # Our auto-correlation function is an _empirical_ one, so this number is relevant for its statistics/means
NUM_SIMS = 300 # Number of independent simulations to average over
DET_AND_SIMPLE_X_DEBUG = False
# We may see higher harmonics at multiples of the core frquency
# (e.g. at f=2/5 when the period is 5, i.e. with expected frequency f=1/5)
# When DET_AND_SIMPLE_X_DEBUG==True, this is particularly visible
class State:
def __init__(self, n: int) -> None:
self.name = string.ascii_uppercase[n]
def idx(self) -> int:
return string.ascii_uppercase.index(self.name)
class MarkovChain:
ACTIONS_DEBUG = {-2: 2*"⬅", -1: "⬅", 0: "|", 1: "➡", 2: 2*"➡"} # Mapping deltas to symbols for logging
# The chain is hardcoded as a circle with num_state() vertices and always
# only one of two possible next vertices (three if USE_THIRD_ACTION==True)
# So we're not working with a general transition matrix here. (But we could just as well, it's just more code...)
NUM_FORWARD_STATES = 3
P_FORWARD = 1 if Config.DET_AND_SIMPLE_X_DEBUG else .85 # Accordingly, 1-P_FORWARD gives change of going backwards
DOUBLE_STATES = True # If True, the random variable is coded such that the second half is a smooth falloff, i.e. more continous. This should mirror smooth dependency on a pendulum process (no hard jumps)
USE_THIRD_ACTION = False # Third action is a double-jump forward.
# Note: I find it doesn't affect the autocorr simulation much
IDX_INIT_STATE = 0
BURN_IN = 300 # A few hundret should suffice to make samples independent of IDX_INIT_STATE.
# There's theory when stationarity kicks in, that could be used.
def __init__(self):
self.current = State(self.IDX_INIT_STATE)
if self.P_FORWARD == 1:
print("! Warning: Chain set to run deterministically")
self.randomize_state()
def num_states(self):
return self.NUM_FORWARD_STATES * (1 + int(MarkovChain.DOUBLE_STATES))
def randomize_state(self):
for _ in range(self.BURN_IN):
self.__next_state(log=False)
def __next_state(self, log=False):
r: float = random.random()
if r < self.P_FORWARD:
delta = +1 # forward
else:
delta = 0 # freeze
if self.USE_THIRD_ACTION:
P_THIRD_ACTION = (1 - self.P_FORWARD) / 2
if r < self.P_FORWARD + (1 - self.P_FORWARD) / 2:
delta = -1 # backward
assert delta in MarkovChain.ACTIONS_DEBUG.keys()
if log:
arrow: str = MarkovChain.ACTIONS_DEBUG[delta]
print(self.current.name + arrow, end="")
next_idx_state: int = (self.current.idx() + delta) % self.num_states()
self.current = State(next_idx_state)
return self.current
def walk(self):
return [self.current] + [self.__next_state(log=True) for _ in range(Config.WALK_LEN-1)]
def sample_and_print_stationary_distribution():
NUM_WALKS = 10_000
d = {State(idx_state).name: 0 for idx_state in range(MarkovChain().num_states())}
for _ in range(NUM_WALKS):
mc = MarkovChain()
mc.randomize_state()
d[State(mc.current.idx()).name] += 1
print("\nStationary distribution:")
for state_name, count in d.items():
perc: float = 100 * count / NUM_WALKS
print(f"state_name {state_name}: {perc:.2f} %")
def is_start_X(state: State):
n = state.idx()
return 1 if n==0 else 0
def offset_sqrt_X(state: State): # Some auxiliary function I made up on the spot
n = state.idx()
excess: int = n - MarkovChain.NUM_FORWARD_STATES
if excess > 0:
assert MarkovChain.DOUBLE_STATES
n -= 2 * excess # Invert value such that return value depends on state.idx() more smoothly
return np.sqrt(n / MarkovChain.NUM_FORWARD_STATES)
random_variable_X = is_start_X if Config.DET_AND_SIMPLE_X_DEBUG else offset_sqrt_X
def sample_autocovariances(walk):
assert Config.NUM_LAGS < Config.WALK_LEN
xs = np.array(list(map(random_variable_X, walk)))
mean = np.mean(xs)
# Recentering, i.e. subtracting the mean, is a customary.
# As a result, that autocov will be non-zero also where xs(t)*xs(t-T)==0
# And, via +/- cancelations, autocov can move towards zero eventually
xs_m = np.array(xs) - mean
if Config.DET_AND_SIMPLE_X_DEBUG:
xs_m = xs # Don't correct, to obtain a simpler plot
autocovs = []
for lag in range(Config.NUM_LAGS):
xs_m_offset = xs_m[lag:]
xs_m_truncated = xs_m[:(Config.WALK_LEN-lag)] # Same lenght as 'xs_m_offset'
es = xs_m_truncated * xs_m_offset # Energy-like quantities
autocovs.append(np.mean(es)) # Power-like quantities
assert autocovs[0] == np.var(xs) or Config.DET_AND_SIMPLE_X_DEBUG
return autocovs, mean
def sample_autocorrelation(walk):
autocovs, mean = sample_autocovariances(walk)
var = autocovs[0]
acf = np.array(autocovs) / var # Customary normalization of max entry (=autocovs[0]) to 1.0 (=> Pearson correlation coefficient)
return acf, autocovs, mean, var
def avg_acf_form_runs():
# Note: This function is really only 7 lines long, but I add a lot of
# logs/checks and debug structures for that
all_autocovs_debug = []
all_means_debug = [] # For print log
all_vars_debug = [] # For print log
# Compute 'avg_acf'
all_acfs = []
for idx_sim in range(Config.NUM_SIMS):
print(f"\nRun #{idx_sim}: ", end="")
walk = MarkovChain().walk()
if idx_sim == 0:
first_walk = walk
acf, autocovs, mean, var = sample_autocorrelation(walk)
all_acfs.append(acf)
all_autocovs_debug.append(autocovs)
all_means_debug.append(mean)
all_vars_debug.append(var)
avg_acf = np.mean(all_acfs, axis=0)
def print_vars_error() -> None:
# Note: Our autocovariances are truncated with NUM_LAGS values, so there's empirical errors.
# This check could be improved.
# * With the badding, there's something to note/improve regarding spectral leakage here)
# * Mirror the (symmetric) autocov to negative lags
# Compute var_fequency_domain via 'power_spectrum'
# with pad to full length to approximate full autocovariance support
avg_autocov = np.mean(all_autocovs_debug, axis=0)
n: int = len(avg_autocov)
assert n == Config.NUM_LAGS
padded_autocov = np.zeros(n)
padded_autocov[:n] = avg_autocov
power_spectrum = np.abs(np.fft.fft(padded_autocov)) ** 2 / n # Note: These are biased downward due to NUM_LAGS truncation
var_fequency_domain: float = np.sum(power_spectrum)
avg_var_debug: float = np.mean(all_vars_debug)
print()
print(f"Sum of power spectrum: {var_fequency_domain:.5f}")
print(f"Average mean = {np.mean(all_means_debug):.2f}")
print(f"Average variance = {avg_var_debug:.5f}")
vars_ratio: float = var_fequency_domain / avg_var_debug
err_debug = abs(1 - vars_ratio)
print(f"{100 * err_debug:.2f}% error in Parseval identity") # Strongly depends on Config
print_vars_error() # Sanity check approximation errors using power spectrum and Plancherel
return avg_acf, first_walk
class Plotter:
@staticmethod
def plot_acf(avg_acf, first_walk):
num_states = MarkovChain().num_states()
FIRST_SAMPLE_PLOT = 0
ACF_PLOT = 1
RING_PLOT = 2
POW_PLOT = 3
T_PLOT = 4 # New: Spectrum vs Period plot
NONE_PLOT = 5
lags = list(range(Config.NUM_LAGS))
fig, axs = plt.subplots(2, 3, figsize=(14, 6)) # Adjust layout for 5 plots
axs = axs.flatten()
axs[NONE_PLOT].axis('off')
# Transition Diagram
ax = axs[RING_PLOT]
ax.set_aspect('equal')
ax.axis('off')
angles = np.linspace(0, 2 * np.pi, num_states, endpoint=False)
coords = [(np.cos(a), np.sin(a)) for a in angles]
for state, (x, y) in enumerate(coords):
ax.plot(x, y, 'o', color='black', markersize=12)
ax.text(x * 1.15, y * 1.15, string.ascii_uppercase[state], ha='center', va='center', fontsize=10)
for i in range(num_states):
fwd = (i + 1) % num_states
back = (i - 1) % num_states
def draw_curved_arrow(ax, start_idx, end_idx, alpha_val, color, rad):
src = coords[start_idx]
dst = coords[end_idx]
ax.annotate(
'', xy=dst, xytext=src,
arrowprops=dict(
arrowstyle='-|>', color=color, alpha=alpha_val, lw=4,
shrinkA=15, shrinkB=15,
connectionstyle=f"arc3,rad={rad}"
),
)
draw_curved_arrow(ax, i, fwd, MarkovChain.P_FORWARD, 'blue', rad=0.3)
draw_curved_arrow(ax, i, back, 1 - MarkovChain.P_FORWARD, 'red', rad=-0.3)
ax.set_title(f'circling Markov chain\nusing P_FORWARD = {MarkovChain.P_FORWARD}\n', fontsize=10)
# ACF Plot
ax = axs[ACF_PLOT]
ax.bar(lags, avg_acf, color='green', edgecolor='black')
ax.axhline(0, color='gray', linewidth=0.8)
for x in range(0, Config.NUM_LAGS, num_states):
ax.axvline(x, color='pink', linestyle='--', linewidth=0.8)
ax.set_xlabel('Lag', fontsize=12)
ax.set_ylabel('Autocorrelation', fontsize=12)
ax.set_title(f'Average ACF ({Config.NUM_SIMS} sims)', fontsize=12)
ax.grid(True, linestyle='--', alpha=0.6)
# First Sampled X Plot
first_sample = [random_variable_X(state) for state in first_walk]
ax = axs[FIRST_SAMPLE_PLOT]
ax.plot(first_sample, color='darkgreen')
for x in range(0, Config.WALK_LEN, num_states):
ax.axvline(x, color='pink', linestyle='--', linewidth=0.8)
ax.set_xlabel('Time step', fontsize=12)
ax.set_ylabel(f'X(state)\nstarting at state={first_walk[0].name}', fontsize=12)
ax.set_title('First simulator Sample of X', fontsize=12)
for idx in range(num_states):
x = random_variable_X(State(idx))
ax.axhline(x, color='orange', linestyle='--', linewidth=0.4)
ax.set_ylim(0, max(first_sample) + 0.5)
# Spectrum (|F(acf)| vs frequency)
_freqs = np.fft.fftfreq(len(avg_acf))
pos_freqs = _freqs[:len(avg_acf) // 2]
F_avg_acf = np.fft.fft(avg_acf)
real_F_avg_acf = F_avg_acf.real[:len(avg_acf) // 2]
imag_F_avg_acf = F_avg_acf.imag[:len(avg_acf) // 2]
abs_F_avg_acf = np.abs(F_avg_acf)[:len(avg_acf) // 2]
ax = axs[POW_PLOT]
ax.plot(pos_freqs, abs_F_avg_acf, color='purple', label='|F(acf)|')
ax.plot(pos_freqs, real_F_avg_acf, color='gray', label='Real(F(acf))', alpha=0.2)
ax.plot(pos_freqs, imag_F_avg_acf, color='orange', label='Imag(F(acf))', alpha=0.2)
ax.set_xlabel('Frequency', fontsize=12)
ax.set_ylabel('Amplitude', fontsize=12)
ax.grid(True, linestyle='--', alpha=0.6)
ax.set_title('Spectrum vs Frequency', fontsize=12)
ax.legend()
for i in range(2, num_states + 1):
f = 1 / i
ax.axvline(x=f, color='gray', linestyle='--', linewidth=1)
ax.text(f + 0.02, max(abs_F_avg_acf) * 0.8, f'1/{i}', color='black', fontsize=7)
# New: Spectrum (|F(acf)| vs Period T = 1/f)
ax = axs[T_PLOT]
nonzero_freqs = pos_freqs[1:]
periods = 1 / nonzero_freqs
abs_F_nonzero = abs_F_avg_acf[1:]
ax.plot(periods, abs_F_nonzero, color='blue', label='|F(acf)| vs T=1/f')
ax.set_xscale('log')
ax.set_xlabel('Period T (1/f)', fontsize=12)
ax.set_ylabel('Amplitude', fontsize=12)
ax.set_title('Spectrum vs Period', fontsize=12)
ax.grid(True, which='both', linestyle='--', alpha=0.6)
ax.legend()
for i in range(2, num_states + 1):
T = i
ax.axvline(x=T, color='gray', linestyle='--', linewidth=1)
ax.text(T * 1.05, max(abs_F_nonzero) * 0.8, f'T={i}', color='black', fontsize=7)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
sample_and_print_stationary_distribution()
avg_acf, first_walk = avg_acf_form_runs()
Plotter.plot_acf(avg_acf, first_walk)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment