Last active
July 12, 2025 16:14
-
-
Save Nikolaj-K/fa496c9f770c3094f01a776464275136 to your computer and use it in GitHub Desktop.
Computing the autocorrelation, its spectrum and the power spectrum for a set of simple Markov chains
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Code described in the video | |
https://youtu.be/yyHd7BGGVp8 | |
Note: A day after recording, I refactored the script to use a more continuous | |
random variable (no jump at full circle). I find this gives a nicer to read plot. | |
Secondly, I've also added a tiny `class State`, abstracting away the state indices | |
and conversions using string.ascii_uppercase.index. | |
""" | |
import random | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import string | |
class Config: | |
NUM_LAGS = 25 # Just for the range of the autocorrelation plot | |
WALK_LEN = 60 # Our auto-correlation function is an _empirical_ one, so this number is relevant for its statistics/means | |
NUM_SIMS = 300 # Number of independent simulations to average over | |
DET_AND_SIMPLE_X_DEBUG = False | |
# We may see higher harmonics at multiples of the core frquency | |
# (e.g. at f=2/5 when the period is 5, i.e. with expected frequency f=1/5) | |
# When DET_AND_SIMPLE_X_DEBUG==True, this is particularly visible | |
class State: | |
def __init__(self, n: int) -> None: | |
self.name = string.ascii_uppercase[n] | |
def idx(self) -> int: | |
return string.ascii_uppercase.index(self.name) | |
class MarkovChain: | |
ACTIONS_DEBUG = {-2: 2*"⬅", -1: "⬅", 0: "|", 1: "➡", 2: 2*"➡"} # Mapping deltas to symbols for logging | |
# The chain is hardcoded as a circle with num_state() vertices and always | |
# only one of two possible next vertices (three if USE_THIRD_ACTION==True) | |
# So we're not working with a general transition matrix here. (But we could just as well, it's just more code...) | |
NUM_FORWARD_STATES = 3 | |
P_FORWARD = 1 if Config.DET_AND_SIMPLE_X_DEBUG else .85 # Accordingly, 1-P_FORWARD gives change of going backwards | |
DOUBLE_STATES = True # If True, the random variable is coded such that the second half is a smooth falloff, i.e. more continous. This should mirror smooth dependency on a pendulum process (no hard jumps) | |
USE_THIRD_ACTION = False # Third action is a double-jump forward. | |
# Note: I find it doesn't affect the autocorr simulation much | |
IDX_INIT_STATE = 0 | |
BURN_IN = 300 # A few hundret should suffice to make samples independent of IDX_INIT_STATE. | |
# There's theory when stationarity kicks in, that could be used. | |
def __init__(self): | |
self.current = State(self.IDX_INIT_STATE) | |
if self.P_FORWARD == 1: | |
print("! Warning: Chain set to run deterministically") | |
self.randomize_state() | |
def num_states(self): | |
return self.NUM_FORWARD_STATES * (1 + int(MarkovChain.DOUBLE_STATES)) | |
def randomize_state(self): | |
for _ in range(self.BURN_IN): | |
self.__next_state(log=False) | |
def __next_state(self, log=False): | |
r: float = random.random() | |
if r < self.P_FORWARD: | |
delta = +1 # forward | |
else: | |
delta = 0 # freeze | |
if self.USE_THIRD_ACTION: | |
P_THIRD_ACTION = (1 - self.P_FORWARD) / 2 | |
if r < self.P_FORWARD + (1 - self.P_FORWARD) / 2: | |
delta = -1 # backward | |
assert delta in MarkovChain.ACTIONS_DEBUG.keys() | |
if log: | |
arrow: str = MarkovChain.ACTIONS_DEBUG[delta] | |
print(self.current.name + arrow, end="") | |
next_idx_state: int = (self.current.idx() + delta) % self.num_states() | |
self.current = State(next_idx_state) | |
return self.current | |
def walk(self): | |
return [self.current] + [self.__next_state(log=True) for _ in range(Config.WALK_LEN-1)] | |
def sample_and_print_stationary_distribution(): | |
NUM_WALKS = 10_000 | |
d = {State(idx_state).name: 0 for idx_state in range(MarkovChain().num_states())} | |
for _ in range(NUM_WALKS): | |
mc = MarkovChain() | |
mc.randomize_state() | |
d[State(mc.current.idx()).name] += 1 | |
print("\nStationary distribution:") | |
for state_name, count in d.items(): | |
perc: float = 100 * count / NUM_WALKS | |
print(f"state_name {state_name}: {perc:.2f} %") | |
def is_start_X(state: State): | |
n = state.idx() | |
return 1 if n==0 else 0 | |
def offset_sqrt_X(state: State): # Some auxiliary function I made up on the spot | |
n = state.idx() | |
excess: int = n - MarkovChain.NUM_FORWARD_STATES | |
if excess > 0: | |
assert MarkovChain.DOUBLE_STATES | |
n -= 2 * excess # Invert value such that return value depends on state.idx() more smoothly | |
return np.sqrt(n / MarkovChain.NUM_FORWARD_STATES) | |
random_variable_X = is_start_X if Config.DET_AND_SIMPLE_X_DEBUG else offset_sqrt_X | |
def sample_autocovariances(walk): | |
assert Config.NUM_LAGS < Config.WALK_LEN | |
xs = np.array(list(map(random_variable_X, walk))) | |
mean = np.mean(xs) | |
# Recentering, i.e. subtracting the mean, is a customary. | |
# As a result, that autocov will be non-zero also where xs(t)*xs(t-T)==0 | |
# And, via +/- cancelations, autocov can move towards zero eventually | |
xs_m = np.array(xs) - mean | |
if Config.DET_AND_SIMPLE_X_DEBUG: | |
xs_m = xs # Don't correct, to obtain a simpler plot | |
autocovs = [] | |
for lag in range(Config.NUM_LAGS): | |
xs_m_offset = xs_m[lag:] | |
xs_m_truncated = xs_m[:(Config.WALK_LEN-lag)] # Same lenght as 'xs_m_offset' | |
es = xs_m_truncated * xs_m_offset # Energy-like quantities | |
autocovs.append(np.mean(es)) # Power-like quantities | |
assert autocovs[0] == np.var(xs) or Config.DET_AND_SIMPLE_X_DEBUG | |
return autocovs, mean | |
def sample_autocorrelation(walk): | |
autocovs, mean = sample_autocovariances(walk) | |
var = autocovs[0] | |
acf = np.array(autocovs) / var # Customary normalization of max entry (=autocovs[0]) to 1.0 (=> Pearson correlation coefficient) | |
return acf, autocovs, mean, var | |
def avg_acf_form_runs(): | |
# Note: This function is really only 7 lines long, but I add a lot of | |
# logs/checks and debug structures for that | |
all_autocovs_debug = [] | |
all_means_debug = [] # For print log | |
all_vars_debug = [] # For print log | |
# Compute 'avg_acf' | |
all_acfs = [] | |
for idx_sim in range(Config.NUM_SIMS): | |
print(f"\nRun #{idx_sim}: ", end="") | |
walk = MarkovChain().walk() | |
if idx_sim == 0: | |
first_walk = walk | |
acf, autocovs, mean, var = sample_autocorrelation(walk) | |
all_acfs.append(acf) | |
all_autocovs_debug.append(autocovs) | |
all_means_debug.append(mean) | |
all_vars_debug.append(var) | |
avg_acf = np.mean(all_acfs, axis=0) | |
def print_vars_error() -> None: | |
# Note: Our autocovariances are truncated with NUM_LAGS values, so there's empirical errors. | |
# This check could be improved. | |
# * With the badding, there's something to note/improve regarding spectral leakage here) | |
# * Mirror the (symmetric) autocov to negative lags | |
# Compute var_fequency_domain via 'power_spectrum' | |
# with pad to full length to approximate full autocovariance support | |
avg_autocov = np.mean(all_autocovs_debug, axis=0) | |
n: int = len(avg_autocov) | |
assert n == Config.NUM_LAGS | |
padded_autocov = np.zeros(n) | |
padded_autocov[:n] = avg_autocov | |
power_spectrum = np.abs(np.fft.fft(padded_autocov)) ** 2 / n # Note: These are biased downward due to NUM_LAGS truncation | |
var_fequency_domain: float = np.sum(power_spectrum) | |
avg_var_debug: float = np.mean(all_vars_debug) | |
print() | |
print(f"Sum of power spectrum: {var_fequency_domain:.5f}") | |
print(f"Average mean = {np.mean(all_means_debug):.2f}") | |
print(f"Average variance = {avg_var_debug:.5f}") | |
vars_ratio: float = var_fequency_domain / avg_var_debug | |
err_debug = abs(1 - vars_ratio) | |
print(f"{100 * err_debug:.2f}% error in Parseval identity") # Strongly depends on Config | |
print_vars_error() # Sanity check approximation errors using power spectrum and Plancherel | |
return avg_acf, first_walk | |
class Plotter: | |
@staticmethod | |
def plot_acf(avg_acf, first_walk): | |
num_states = MarkovChain().num_states() | |
FIRST_SAMPLE_PLOT = 0 | |
ACF_PLOT = 1 | |
RING_PLOT = 2 | |
POW_PLOT = 3 | |
T_PLOT = 4 # New: Spectrum vs Period plot | |
NONE_PLOT = 5 | |
lags = list(range(Config.NUM_LAGS)) | |
fig, axs = plt.subplots(2, 3, figsize=(14, 6)) # Adjust layout for 5 plots | |
axs = axs.flatten() | |
axs[NONE_PLOT].axis('off') | |
# Transition Diagram | |
ax = axs[RING_PLOT] | |
ax.set_aspect('equal') | |
ax.axis('off') | |
angles = np.linspace(0, 2 * np.pi, num_states, endpoint=False) | |
coords = [(np.cos(a), np.sin(a)) for a in angles] | |
for state, (x, y) in enumerate(coords): | |
ax.plot(x, y, 'o', color='black', markersize=12) | |
ax.text(x * 1.15, y * 1.15, string.ascii_uppercase[state], ha='center', va='center', fontsize=10) | |
for i in range(num_states): | |
fwd = (i + 1) % num_states | |
back = (i - 1) % num_states | |
def draw_curved_arrow(ax, start_idx, end_idx, alpha_val, color, rad): | |
src = coords[start_idx] | |
dst = coords[end_idx] | |
ax.annotate( | |
'', xy=dst, xytext=src, | |
arrowprops=dict( | |
arrowstyle='-|>', color=color, alpha=alpha_val, lw=4, | |
shrinkA=15, shrinkB=15, | |
connectionstyle=f"arc3,rad={rad}" | |
), | |
) | |
draw_curved_arrow(ax, i, fwd, MarkovChain.P_FORWARD, 'blue', rad=0.3) | |
draw_curved_arrow(ax, i, back, 1 - MarkovChain.P_FORWARD, 'red', rad=-0.3) | |
ax.set_title(f'circling Markov chain\nusing P_FORWARD = {MarkovChain.P_FORWARD}\n', fontsize=10) | |
# ACF Plot | |
ax = axs[ACF_PLOT] | |
ax.bar(lags, avg_acf, color='green', edgecolor='black') | |
ax.axhline(0, color='gray', linewidth=0.8) | |
for x in range(0, Config.NUM_LAGS, num_states): | |
ax.axvline(x, color='pink', linestyle='--', linewidth=0.8) | |
ax.set_xlabel('Lag', fontsize=12) | |
ax.set_ylabel('Autocorrelation', fontsize=12) | |
ax.set_title(f'Average ACF ({Config.NUM_SIMS} sims)', fontsize=12) | |
ax.grid(True, linestyle='--', alpha=0.6) | |
# First Sampled X Plot | |
first_sample = [random_variable_X(state) for state in first_walk] | |
ax = axs[FIRST_SAMPLE_PLOT] | |
ax.plot(first_sample, color='darkgreen') | |
for x in range(0, Config.WALK_LEN, num_states): | |
ax.axvline(x, color='pink', linestyle='--', linewidth=0.8) | |
ax.set_xlabel('Time step', fontsize=12) | |
ax.set_ylabel(f'X(state)\nstarting at state={first_walk[0].name}', fontsize=12) | |
ax.set_title('First simulator Sample of X', fontsize=12) | |
for idx in range(num_states): | |
x = random_variable_X(State(idx)) | |
ax.axhline(x, color='orange', linestyle='--', linewidth=0.4) | |
ax.set_ylim(0, max(first_sample) + 0.5) | |
# Spectrum (|F(acf)| vs frequency) | |
_freqs = np.fft.fftfreq(len(avg_acf)) | |
pos_freqs = _freqs[:len(avg_acf) // 2] | |
F_avg_acf = np.fft.fft(avg_acf) | |
real_F_avg_acf = F_avg_acf.real[:len(avg_acf) // 2] | |
imag_F_avg_acf = F_avg_acf.imag[:len(avg_acf) // 2] | |
abs_F_avg_acf = np.abs(F_avg_acf)[:len(avg_acf) // 2] | |
ax = axs[POW_PLOT] | |
ax.plot(pos_freqs, abs_F_avg_acf, color='purple', label='|F(acf)|') | |
ax.plot(pos_freqs, real_F_avg_acf, color='gray', label='Real(F(acf))', alpha=0.2) | |
ax.plot(pos_freqs, imag_F_avg_acf, color='orange', label='Imag(F(acf))', alpha=0.2) | |
ax.set_xlabel('Frequency', fontsize=12) | |
ax.set_ylabel('Amplitude', fontsize=12) | |
ax.grid(True, linestyle='--', alpha=0.6) | |
ax.set_title('Spectrum vs Frequency', fontsize=12) | |
ax.legend() | |
for i in range(2, num_states + 1): | |
f = 1 / i | |
ax.axvline(x=f, color='gray', linestyle='--', linewidth=1) | |
ax.text(f + 0.02, max(abs_F_avg_acf) * 0.8, f'1/{i}', color='black', fontsize=7) | |
# New: Spectrum (|F(acf)| vs Period T = 1/f) | |
ax = axs[T_PLOT] | |
nonzero_freqs = pos_freqs[1:] | |
periods = 1 / nonzero_freqs | |
abs_F_nonzero = abs_F_avg_acf[1:] | |
ax.plot(periods, abs_F_nonzero, color='blue', label='|F(acf)| vs T=1/f') | |
ax.set_xscale('log') | |
ax.set_xlabel('Period T (1/f)', fontsize=12) | |
ax.set_ylabel('Amplitude', fontsize=12) | |
ax.set_title('Spectrum vs Period', fontsize=12) | |
ax.grid(True, which='both', linestyle='--', alpha=0.6) | |
ax.legend() | |
for i in range(2, num_states + 1): | |
T = i | |
ax.axvline(x=T, color='gray', linestyle='--', linewidth=1) | |
ax.text(T * 1.05, max(abs_F_nonzero) * 0.8, f'T={i}', color='black', fontsize=7) | |
plt.tight_layout() | |
plt.show() | |
if __name__ == "__main__": | |
sample_and_print_stationary_distribution() | |
avg_acf, first_walk = avg_acf_form_runs() | |
Plotter.plot_acf(avg_acf, first_walk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment