Last active
April 13, 2019 10:15
-
-
Save yamaguchiyuto/d9dbd283c54807faa9ccab532ec0d56e to your computer and use it in GitHub Desktop.
Author Topic Model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import copy | |
import numpy as np | |
from scipy.sparse import lil_matrix | |
class ATM: | |
def __init__(self, K, alpha, beta, max_iter, verbose=0): | |
self.K=K | |
self.alpha = alpha | |
self.beta = beta | |
self.max_iter = max_iter | |
self.verbose=verbose | |
def fit(self,W,A,V,S): | |
self._W = W | |
self._A = A | |
self._D = len(W) # number of documents | |
self._V = V # number of vocabularies | |
self._S = S # number of distinct authors | |
self.Z = self._init_Z() | |
self.Y = self._init_Y() | |
self.nak = self._init_nak() | |
self.nkv = self._init_nkv() | |
nkv_sum = self.nkv.sum(axis=1) | |
nak_sum = self.nak.sum(axis=1) | |
self._max_score = -1 | |
self.max_Z = None | |
self.max_Y = None | |
remained_iter = self.max_iter | |
while True: | |
if self.verbose: print remained_iter | |
for d in np.random.choice(self._D, self._D, replace=False): | |
# Sample Z and Y | |
for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False): | |
k = self.Z[d][i] # topic | |
v = self._W[d][i] # word index | |
j = self.Y[d][i] # author_index within document d | |
a = self._A[d][j] # author | |
self.nak[a][k] -= 1 | |
self.nkv[k][v] -= 1 | |
nkv_sum[k] -= 1 | |
nak_sum[a] -= 1 | |
self.Z[d][i], self.Y[d][i] = self._sample_z_and_y(d,v,nkv_sum,nak_sum) | |
new_a = self._A[d][self.Y[d][i]] | |
new_k = self.Z[d][i] | |
self.nak[new_a][new_k] += 1 | |
self.nkv[new_k][v] += 1 | |
nkv_sum[new_k] += 1 | |
nak_sum[new_a] += 1 | |
s = self.score(nkv_sum,nak_sum) | |
if s > self._max_score: | |
self.max_score = s | |
self.max_Z = copy.copy(self.Z) | |
self.max_Y = copy.copy(self.Y) | |
remained_iter -= 1 | |
if remained_iter <= 0: break | |
return self | |
def _init_Z(self): | |
Z = [] | |
for d in range(len(self._W)): | |
Z.append(np.random.randint(low=0, high=self.K, size=len(self._W[d]))) | |
return Z | |
def _init_Y(self): | |
Y = [] | |
for d in range(len(self._W)): | |
Y.append(np.random.randint(low=0, high=len(self._A[d]), size=len(self._W[d]))) | |
return Y | |
def _init_nak(self): | |
nak = np.zeros((self._S,self.K)) | |
for d in range(self._D): | |
for i in range(len(self._W[d])): | |
k = self.Z[d][i] | |
j = self.Y[d][i] | |
a = self._A[d][j] | |
nak[a,k]+=1 | |
return nak | |
def _init_nkv(self): | |
nkv = np.zeros((self.K,self._V)) | |
for d in range(self._D): | |
for i in range(len(self._W[d])): | |
k = self.Z[d][i] | |
v = self._W[d][i] | |
nkv[k,v]+=1 | |
return nkv | |
def _sample_z_and_y(self,d,v,nkv_sum,nak_sum): | |
nkv = self.nkv[:,v] # k-dimensional vector | |
na = len(self._A[d]) # number of authors in document d | |
prob = [] | |
p1 = ((nkv+self.beta) / (nkv_sum+self.beta*self._V)) | |
for j in range(na): | |
a = self._A[d][j] | |
pa = p1 * ((self.nak[a]+self.alpha) / (nak_sum[a]+self.alpha*self.K)) | |
prob.append(pa) | |
prob = np.array(prob).flatten() | |
prob = prob/prob.sum() | |
zy = np.random.multinomial(n=1, pvals=prob).argmax() | |
z = zy%self.K | |
y = zy/self.K | |
return z,y | |
def score(self,nkv_sum,nak_sum): | |
s = 0 | |
for d in range(self._D): | |
for i in range(len(self._W[d])): | |
v = self._W[d][i] | |
k = self.Z[d][i] | |
a = self._A[d][self.Y[d][i]] | |
s += ((self.nkv[k,v]+self.beta) / (nkv_sum[k]+self.beta*self._V)) * ((self.nak[a,k]+self.alpha) / (nak_sum[a]+self.alpha*self.K)) | |
return s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment