# HMM和WFST代码示例

## 代码和安装

./configure
make
sudo make install

git clone https://github.com/placebokkk/pyfst.git

pip install cython
python setup.py install

## HMM代码

### HMM回顾

Rabiner的论文提到了HMM有3个问题：

HMM有如下的参数定义：

### 示例模型

N=3
M=4
T=5

O=np.random.randint(0,M,T)
print(O)
# [2 2 0 2 1]

b=np.random.random((N,M))
b/=np.array(b.sum(axis=1, keepdims=True))
print(b)
#输出：
[[0.48229459 0.27721979 0.12239203 0.11809359]
[0.11822906 0.14038183 0.42902164 0.31236747]
[0.34614273 0.10475221 0.38320261 0.16590244]]

print(b.sum(axis=1))
# [1. 1. 1.]

ob=np.zeros((T,N))
for t in range(T):
for i in range(N):
ob[t,i]=b[i,O[t]]
print(ob)
# 输出：
[[0.12239203 0.42902164 0.38320261]
[0.12239203 0.42902164 0.38320261]
[0.48229459 0.11822906 0.34614273]
[0.12239203 0.42902164 0.38320261]
[0.27721979 0.14038183 0.10475221]]

ob=b.T[O]

a=np.random.random((N,N))
a/=np.array([a.sum(axis=-1)]).T
print(a)
print(a.sum(axis=1))
# 输出：
[[0.39884173 0.3486846  0.25247367]
[0.08072832 0.59021607 0.32905562]
[0.25993654 0.25873506 0.4813284 ]]
[1. 1. 1.]

pi=np.random.random(N)
pi/=pi.sum()
print(pi)
# 输出：
[0.23738012 0.47646268 0.28615719]

hmm=(O,pi,a,b,N,M,T)

import pickle
with open('../data/hmm.pkl','wb') as f:
pickle.dump(hmm,f,pickle.HIGHEST_PROTOCOL)

### 前向算法

def forward(O,pi,a,b,N,M,T):
fwd=np.zeros((T,N))
#初始化
for i in range(N):
fwd[0,i]=pi[i]*b[i,O[0]]
#递推
for t in range(T-1):
for j in range(N):
s=0
for i in range(N):
s+=fwd[t,i]*a[i,j]
fwd[t+1,j]=s*b[j,O[t+1]]

return fwd
print(forward(*hmm))

def forward(O,pi,a,b,N,M,T):
fwd=np.zeros((T,N))
#初始化
fwd[0]=pi*b[:,O[0]]
#递推:
for t in range(T-1):
fwd[t+1]=np.dot(fwd[t],a)*b[:,O[t+1]]

return fwd

def full_prob(fwd):
return fwd[-1].sum()
print(full_prob(forward(*hmm)))

### Viterbi算法

np.argmax(ob,axis=1)
# array([0, 0, 2, 0, 2])

def viterbi(O,pi,a,b,N,M,T):
d=np.zeros((T,N))
ph=np.zeros((T,N),dtype=np.int)
#初始化
for i in range(N):
d[0,i]=pi[i]*b[i,O[0]]
ph[0,i]=0
#递推
for t in range(1,T):
for j in range(N):
m=np.zeros(N)
for i in range(N):
m[i]=d[t-1,i]*a[i,j]
ph[t,j]=m.argmax()
d[t,j]=m.max()*b[j,O[t]]
#结束时刻
m=np.zeros(N)
for i in range(N):
m[i]=d[T-1,i]
Pv=m.max()
#back-tracking
Q=np.zeros(T,dtype=np.int)
Q[T-1]=m.argmax()
for t in reversed(range(T-1)):
Q[t]=ph[t+1,Q[t+1]]

return Q
print(viterbi(*hmm))
#　[0 1 2 1 2]

Viterbi和前向算法很像，只不过前向算法是计算到当前时刻t的某个状态的所有路径的概率累加和；而Viterbi算法只计算概率最大的一条路径。

def viterbi(O,pi,a,b,N,M,T):
d=np.zeros((T,N))
ph=np.zeros((T,N),dtype=np.int)
#初始化
d[0]=pi*b[:,O[0]]
ph[0]=0
#递推
for t in range(1,T):
m=d[t-1]*a.T
ph[t]=m.argmax(axis=1)
d[t]=m[np.arange(N),ph[t]]*b[:,O[t]]
#结束时刻
Q=np.zeros(T,dtype=np.int)
Q[T-1]=np.argmax(d[T-1])
Pv=d[T-1,Q[T-1]]
#back-tracking
for t in reversed(range(T-1)):
Q[t]=ph[t+1,Q[t+1]]
return Q

### 前向后向算法

def backward(O,pi,a,b,N,M,T):
bk=np.zeros((T,N))
#初始化
for i in range(N):
bk[T-1,i]=1
#推导:
for t in reversed(range(T-1)):
for i in range(N):
s=0
for j in range(N):
s+=a[i,j]*b[j,O[t+1]]*bk[t+1,j]
bk[t,i]=s
return bk
print(backward(*hmm))

numpy的实现：

def backward(O,pi,a,b,N,M,T):
bk=np.zeros((T,N))
#初始化:
bk[T-1]=1
#递推:
for t in reversed(range(T-1)):
bk[t]=np.dot(bk[t+1]*b[:,O[t+1]],a.T)
return bk

def gamma(fwd,bk,fp):
gm=np.zeros((T,N))
for t in range(T):
for i in range(N):
gm[t,i]=(fwd[t,i]*bk[t,i])/fp
return gm

def gamma(fwd,bk,fp):
return (fwd*bk)/fp
print(gamma(forward(*hmm),backward(*hmm),full_prob(forward(*hmm))))

fig,ax=P.subplots(1,3,figsize=(15,3),sharey=True)

ax[0].pcolormesh(forward(*hmm).T,cmap=P.cm.gray)
ax[0].set_yticks(np.arange(0,N)+.5)
ax[0].set_yticklabels(np.arange(0,N))
ax[0].set_title('Forward')
ax[1].pcolormesh(backward(*hmm).T,cmap=P.cm.gray)
ax[1].set_title('Backward')
g=gamma(forward(*hmm),backward(*hmm),full_prob(forward(*hmm)))
ax[2].pcolormesh(g.T,cmap=P.cm.gray)
ax[2].set_title('Full')

print('Best observation sequence: {}'.format(np.argmax(ob,axis=1)))
print('Best forward prob. sequence: {}'.format(np.argmax(forward(*hmm),axis=1)))
print('Best backward prob. sequence: {}'.format(np.argmax(backward(*hmm),axis=1)))
g=gamma(forward(*hmm),backward(*hmm),full_prob(forward(*hmm)))
print('Best full prob. sequence: {}'.format(np.argmax(g,axis=1)))
print('Best Viterbi sequence: {}'.format(viterbi(*hmm)))
# 输出：
Best observation sequence: [0 0 2 0 2]
Best forward prob. sequence: [1 1 2 1 2]
Best backward prob. sequence: [0 1 0 1 0]
Best full prob. sequence: [1 1 2 1 2]
Best Viterbi sequence: [0 1 2 1 2]

### Baum-Welch算法

def xi(fwd,bk,fp,O,pi,a,b,N,M,T):
ret=np.zeros((T-1,N,N))
for t in range(T-1):
for i in range(N):
for j in range(N):
ret[t,i,j]=(fwd[t,i]*a[i,j]*b[j,O[t+1]]*bk[t+1,j])/fp
return ret

Numpy版本：

def xi(fwd,bk,fp,O,pi,a,b,N,M,T):
return fwd[:-1].reshape((T-1,N,1))*a.reshape((1,N,N))*b[:,O[1:]].T.reshape((T-1,1,N))
*bk[1:].reshape((T-1,1,N))/fp

def exp_pi(gamma):
return gamma[0]

def exp_a(gamma,xi,N):
e_a=np.zeros((N,N))
for i in range(N):
for j in range(N):
e_a[i,j]=np.sum(xi[:,i,j])/np.sum(gamma[:-1,i])

return e_a
#前向概率
fw=forward(*hmm)
#后向概率
bk=backward(*hmm)
#观察全概率
fp=full_prob(fw)
#就是状态占用概率gamma
g=gamma(fw,bk,fp)
#就是状态跳转概率epsilon(xi)
x=xi(fw,bk,fp,*hmm)
#估计新的跳转概率
ea=exp_a(g,x,N)

Numpy的版本：

def exp_a(gamma,xi,N):
return xi[:].sum(axis=0)/gamma[:-1].sum(axis=0).reshape(N,1)

def exp_b(gamma,O,N,M):
e_b=np.zeros((N,M))
for j in range(N):
for k in range(M):
e_b[j,k]=np.sum(gamma[O==k,j])/np.sum(gamma[:,j])
return e_b
#前向概率
fw=forward(*hmm)
#后向概率
bk=backward(*hmm)
#观察全概率
fp=full_prob(fw)
#就是状态占用概率
g=gamma(fw,bk,fp)
#更新发射概率，然后打印
print(exp_b(g,O,N,M))

Baum-Welch算法是一种EM算法，首先我们随机初始化一组参数，然后就可以用前向和后向算法计算前向后向概率，然后根据它们计算状态占用概率$\gamma_t(i)$和状态跳转概率$\epsilon_t(i,j)$，接下来就可以用这两个概率重新估计pi,$a_{ij},b_{jk}$。每次迭代都可以增加训练数据上的似然概率，我们可以不断的迭代直到收敛(到局部最优解)。

print('Initial probability: {}'.format(full_prob(forward(*hmm))))
hmm_new=hmm
# 15次EM迭代
for i in range(15):
# E-step
# 前向概率
fw=forward(*hmm_new)
# 后向概率
bk=backward(*hmm_new)
# 观察全概率
fp=full_prob(fw)
# 状态占用概率 gamma
g=gamma(fw,bk,fp)
# 状态跳转概率 epsilon
x=xi(fw,bk,fp,*hmm_new)

# M-step
# 更新参数pi
pi_new=exp_pi(g)
# 更新跳转概率a
a_new=exp_a(g,x,N)
# 更新发射概率b
b_new=exp_b(g,O,N,M)

err=np.concatenate(((pi_new-hmm_new[1]).ravel(),(a_new-hmm_new[2]).ravel(),
(b_new-hmm_new[3]).ravel()))

hmm_new=(O,pi_new,a_new,b_new,N,M,T)
print('Update #{} probability: {} -- mean error:
{}'.format(i+1,full_prob(forward(*hmm_new)),np.mean(err**2)))

g g əʊ əʊ əʊ h əʊ əʊ m
g əʊ əʊ əʊ əʊ h əʊ əʊ m
...

b b ae ae ae ae ae ae ae t

## WFST代码

WFST定义了一种关系，给定一个输入字符串，它可以输出多个(或者零个)字符串已经对应的weight。

### 例子

f=fst.Transducer()
f[0].final=True

i=fst.linear_chain(['a','a','c','b'],syms=f.isyms)

### Composition

WFST最有用的操作就是composition。这个操作的输入是两个WFST，第一个WFST的输入字母表是A，输出字母表是B；第二个的输入字母表是B输出字母表是C，则Composition之后的WFST的输入字母表是A而输出字母表是C。我们可以使用重载的»运算符实现Composition操作，比如i»f就是把i和f进行Composition。

o=i>>f
o.project_output()

f=fst.Transducer()
f[0].final=True
i1=fst.linear_chain(['a','b','c','d','e'],syms=f.isyms)
i2=fst.linear_chain(['a','b','x','y','z'],syms=f.isyms)
i3=fst.linear_chain(['g','h','c','d','e'],syms=f.isyms)
i=i1.union(i2)
i=i.union(i3)
i.remove_epsilon()

i=i.determinize()
i.minimize()

### HMM使用WFST的例子

def obs_mat(O,b,N,T):
f=fst.StdAcceptor()

for t in range(T):
for j in range(N):

f[T].final=True
return f
obs_fsa=obs_mat(O,b,N,T)

def trans_mat(pi,a,N):
f=fst.StdAcceptor()

for i in range(N):

for i in range(N):
for j in range(N):

for i in range(N):
f[i+1].final=True

return f
tr_fsa=trans_mat(pi,a,N)

out_fsa=obs_fsa>>tr_fsa

sh_p=out_fsa.shortest_path()shp_l=[]
for p in sh_p.paths():
for s in p:
shp_l.append(sh_p.osyms.find(s.olabel))
print(shp_l)
# 输出：
['0', '1', '2', '1', '2']
# 这和Viterbi算法的结果完全一样：
print(viterbi(*hmm))
[0 1 2 1 2]