1 模型平均方法(MA)

1.1 算法描述与实现

MA算法按照通信间隔的不同，可分为下面两种情况：

from typing import Tuple
from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
n_slices = 4  # Number of Slices
n_iterations = 1500  # Number of iterations
eta = 0.1
mini_batch_fraction = 0.1 # the fraction of mini batch sample
n_local_iterations = 1 # the number local epochs
def logistic_f(x, w):
return 1 / (np.exp(-x.dot(w)) + 1 +1e-6)
""" Compute linear regression gradient for a matrix of data points
"""
idx, (point, w) = pt_w
y = point[-1]    # point label
x = point[:-1]   # point coordinate
# For each point (x, y), compute gradient function, then sum these up
return  (idx, (w, - (y - logistic_f(x, w)) * x))
def update_local_w(iter):
iter = list(iter)
idx, (w, _) = iter[0]
g_mean = np.mean(np.array([ g for _, (_, g) in iter]), axis=0)
return  [(idx, w - eta * g_mean)]
def draw_acc_plot(accs, n_iterations):
def ewma_smooth(accs, alpha=0.9):
s_accs = np.zeros(n_iterations)
for idx, acc in enumerate(accs):
if idx == 0:
s_accs[idx] = acc
else:
s_accs[idx] = alpha * s_accs[idx-1] + (1 - alpha) * acc
return s_accs
s_accs = ewma_smooth(accs, alpha=0.9)
plt.plot(np.arange(1, n_iterations + 1), accs, color="C0", alpha=0.3)
plt.plot(np.arange(1, n_iterations + 1), s_accs, color="C0")
plt.title(label="Accuracy on test dataset")
plt.xlabel("Round")
plt.ylabel("Accuracy")
plt.savefig("ma_acc_plot.png")
if __name__ == "__main__":
X, y = load_breast_cancer(return_X_y=True)
D = X.shape[1]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=0, shuffle=True)
n_train, n_test = X_train.shape[0], X_test.shape[0]
spark = SparkSession\
.builder\
.appName("Model Average")\
.getOrCreate()
matrix = np.concatenate(
[X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)
points = spark.sparkContext.parallelize(matrix, n_slices).cache()
points = points.mapPartitionsWithIndex(lambda idx, iter: [ (idx, arr) for arr in iter])
ws = spark.sparkContext.parallelize(2 * np.random.ranf(size=(n_slices, D + 1)) - 1, n_slices).cache()
ws = ws.mapPartitionsWithIndex(lambda idx, iter: [(idx, next(iter))])
w = 2 * np.random.ranf(size=D + 1) - 1
print("Initial w: " + str(w))

accs = []
for t in range(n_iterations):
print("On iteration %d" % (t + 1))
ws = ws.mapPartitions(lambda iter: [(iter[0][0], w_br.value)])

for local_t in range(n_local_iterations):
ws = points.sample(False, mini_batch_fraction, 42 + t)\
.join(ws, numPartitions=n_slices)\
.mapPartitions(update_local_w)

par_w_sum = ws.mapPartitions(lambda iter: [iter[0][1]]).treeAggregate(0.0, add, add)
w  = par_w_sum / n_slices
y_pred = logistic_f(np.concatenate(
[X_test, np.ones((n_test, 1))], axis=1), w)
pred_label = np.where(y_pred < 0.5, 0, 1)
acc = accuracy_score(y_test, pred_label)
accs.append(acc)
print("iterations: %d, accuracy: %f" % (t, acc))
print("Final w: %s " % w)
print("Final acc: %f" % acc)
spark.stop()
draw_acc_plot(accs, n_iterations)

1.2 算法收敛表现

Initial w: [-4.59895046e-01  4.81609930e-01 -2.98562178e-01  4.37876789e-02
-9.12956525e-01  6.72295704e-01  6.02029280e-01 -4.01078397e-01
9.08559315e-02 -1.07924749e-01  4.64202010e-01 -6.69343161e-01
-7.98638952e-01  2.56715359e-01 -4.08737254e-01 -6.20120002e-01
-8.59081121e-01  9.25086249e-01 -8.64084351e-01  6.18274961e-01
-3.05928664e-01 -6.96321445e-01 -3.70347891e-01  8.45658259e-01
-3.46329338e-01  9.75573025e-01 -2.37675425e-01  1.26656795e-01
-6.79589868e-01  9.48379550e-01 -2.04796940e-04]

Final w: [ 3.56746113e+01  5.23773783e+01  2.10458066e+02  1.13411434e+02
9.03684544e-01 -1.64116119e-01 -8.50221455e-01 -1.02747339e-01
1.01249316e+00  5.86541201e-01 -7.95885004e-01  4.08388583e+00
-3.72622203e+00 -1.22789161e+02  7.15869286e-01 -9.59608820e-01
5.85750752e-01  9.52410298e-01 -4.74590169e-01  8.01080392e-01
3.72102291e+01  6.54164219e+01  2.11443556e+02 -1.34266020e+02
1.19299917e+00 -9.48551465e-01 -3.02582118e-01 -9.50371027e-01
1.30280295e+00  5.02509358e-01  5.34567937e+00]
Final acc: 0.923977

MA算法的在测试集上的ACC曲线如下：

2 模型平均方法的改进——BMUF算法

2.1 算法描述与实现

mu = 0.5
zeta = 0.5
# weight update
delta_w = 2 * np.random.ranf(size=D + 1) - 1

for t in range(n_iterations):
...
w_avg  = par_w_sum / n_slices
delta_w = mu * delta_w + zeta * (w_avg - w)
w = w + delta_w

2.2 算法收敛表现

BMUF算法的终止权重和acc如下：

Final w: [ 3.55587119e+01  4.74476386e+01  2.05646360e+02  8.43207945e+01
-6.24992160e-01  4.09293120e-01 -2.03903959e-01 -7.48743358e-01
6.17507919e-01  1.37260407e-01  3.02077368e-01  2.29466375e+00
-4.29517662e+00 -1.25214901e+02 -3.93164203e-01 -6.65440018e-01
-9.50817683e-01  9.09075928e-01 -8.22611068e-01  6.22626019e-01
3.78429147e+01  5.77291936e+01  2.04653067e+02 -1.17393706e+02
-5.53476597e-03  2.76373535e-02 -1.98339171e+00 -3.34173754e-01
4.17088085e-03  1.15206427e+00  4.68333285e+00]
Final acc: 0.923977

BMUF算法的在测试集上的ACC曲线如下：

3 弹性平均SGD算法（EASGD）

3.1 算法描述与实现

$\underset{w^1, w^2, \cdots, w^k}{\min} \sum_{k=1}^K\hat{l}_k(w_k) + \frac{\rho}{2}\lVert w_k – \overline{w}\rVert^2$

beta = 0.5 # the parameter of history information
alpha = 0.1 # constraint coefficient
def update_local_w(iter, w):
iter = list(iter)
idx, (local_w, _) = iter[0]
g_mean = np.mean(np.array([ g for _, (_, g) in iter]), axis=0)
return  [(idx, local_w - eta * g_mean - alpha * (local_w - w))]
...
if __name__ == "__main__":
...
for t in range(n_iterations):
print("On iteration %d" % (t + 1))

ws = points.sample(False, mini_batch_fraction, 42 + t)\
.join(ws, numPartitions=n_slices)\
.mapPartitions(lambda iter: update_local_w(iter, w=w_br.value))

par_w_sum = ws.mapPartitions(lambda iter: [iter[0][1]]).treeAggregate(0.0, add, add)
w  = (1 - beta) * w + beta * par_w_sum / n_slices

3.2 算法收敛表现

BMUF算法的终止权重和acc如下：

Final w: [ 4.75633325e+01  7.05407657e+01  2.79006876e+02  1.45465411e+02
4.54467492e-01 -2.10142380e-01 -6.30421903e-01  4.53977048e-01
1.01717057e-01 -2.14420411e-01 -2.94989128e-01  4.89426514e+00
-3.05999725e+00 -1.62456459e+02  1.27772367e-01 -4.68403685e-02
-8.63345165e-03  2.15800745e-01  5.77719463e-01 -1.83278567e-02
5.01647916e+01  8.80774672e+01  2.79145194e+02 -1.81621547e+02
2.14490664e-01 -8.83817758e-01 -1.43244912e+00 -5.96750910e-01
1.04627441e+00  4.37109225e-01  6.04818129e+00]
Final acc: 0.929825

BMUF算法的在测试集上的ACC曲线如下：

参考

[1]

McDonald R, Hall K, Mann G. Distributed training strategies for the structured perceptron[C]//Human language technologies: The 2010 annual conference of the North American chapter of the association for computational linguistics. 2010: 456-464.

[2] McMahan B, Moore E, Ramage D, et al. Communication-efficient learning of deep networks from decentralized data[C]//Artificial intelligence and statistics. PMLR, 2017: 1273-1282.

[3] Sutskever I, Martens J, Dahl G, et al. On the importance of initialization and momentum in deep learning[C]//International conference on machine learning. PMLR, 2013: 1139-1147.

[4] Chen K, Huo Q. Scalable training of deep learning machines by incremental block training with intra-block parallel optimization and blockwise model-update filtering[C]//2016 ieee international conference on acoustics, speech and signal processing (icassp). IEEE, 2016: 5880-5884.

[5]

Zhang S, Choromanska A E, LeCun Y. Deep learning with elastic averaging SGD[J]. Advances in neural information processing systems, 2015, 28.

[6] Li T, Hu S, Beirami A, et al. Ditto: Fair and robust federated learning through personalization[C]//International Conference on Machine Learning. PMLR, 2021: 6357-6368.

[7] 刘浩洋，户将等. 最优化：建模、算法与理论[M]. 高教出版社, 2020.

[8] 刘铁岩，陈薇等. 分布式机器学习：算法、理论与实践[M]. 机械工业出版社, 2018.