我在进行TensorFlow的分布式训练,想通过筛选梯度的方式,来实现较少通信时长的目的。这是我的代码:
import time
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data # 数据的获取不是本章重点,这里直接导入
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer('thread_steps', 0, 'Steps run before sync gradients.')
tf.app.flags.DEFINE_string('data_dir', '/tmp/mnist-data', 'Directory for storing mnist data')
tf.app.flags.DEFINE_string("job_name", "worker", "ps or worker")
tf.app.flags.DEFINE_integer("task_id", 0, "Task ID of the worker/ps running the train")
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps机")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker机,用逗号隔开")
MODEL_DIR = "./distribute_model_ckpt/"
BATCH_SIZE = 32
THREAD_STEPS = FLAGS.thread_steps
def main(self):
# ========== STEP1: 读取数据 ========== #
mnist = input\_data.read\_data\_sets(FLAGS.data\_dir, one\_hot=True) # 读取数据
# ========== STEP2: 声明集群 ========== #
# 构建集群ClusterSpec和服务声明
ps\_hosts = FLAGS.ps\_hosts.split(",")
worker\_hosts = FLAGS.worker\_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps":ps\_hosts, "worker":worker\_hosts}) # 构建集群名单
server = tf.train.Server(cluster, job\_name=FLAGS.job\_name, task\_index=FLAGS.task\_id) # 声明服务
n\_workers = len(worker\_hosts) # worker机的数量
# ========== STEP3: ps机内容 ========== #
# 分工,对于ps机器不需要执行训练过程,只需要管理变量。server.join()会一直停在这条语句上。
if FLAGS.job\_name == "ps":
with tf.device("/cpu:0"):
server.join()
# ========== STEP4: worker机内容 ========== #
# 下面定义worker机需要进行的操作
is\_chief = (FLAGS.task\_id == 0) # 选取task\_id=0的worker机作为chief
# 通过replica\_device\_setter函数来指定每一个运算的设备。
# replica\_device\_setter会自动将所有参数分配到参数服务器上,将计算分配到当前的worker机上。
device\_setter = tf.train.replica\_device\_setter(
worker\_device="/job:worker/task:%d" % FLAGS.task\_id,
cluster=cluster)
# 这一台worker机器需要做的计算内容
with tf.device(device\_setter):
# 输入数据
x = tf.placeholder(name="x-input",shape=[None,28\*28],dtype=tf.float32) # 输入样本像素为28\*28
# x\_shape = x.get\_shape().as\_list()
# length = x\_shape[1]
# x\_reshaped = tf.reshape(x, [-1,length])
y\_ = tf.placeholder(name="y-input", shape=[None,10],dtype=tf.float32) # MNIST是十分类
# 第一层(隐藏层)
with tf.variable\_scope("layer1"):
weight1 = tf.get\_variable(name="weight1", shape=[28\*28, 10], initializer=tf.glorot\_normal\_initializer())
biases1 = tf.get\_variable(name="biases1", shape=[10], initializer=tf.glorot\_normal\_initializer())
layer1 = tf.nn.relu(tf.matmul(x, weight1) + biases1, name="layer1")
# 第二层(输出层)
with tf.variable\_scope("layer2"):
weight2 = tf.get\_variable(name="weight2", shape=[10, 10], initializer=tf.glorot\_normal\_initializer())
biases2 = tf.get\_variable(name="biases2", shape=[10], initializer=tf.glorot\_normal\_initializer())
y = tf.add(tf.matmul(layer1, weight2), biases2, name="y")
pred = tf.argmax(y, axis=1, name="pred")
global\_step = tf.contrib.framework.get\_or\_create\_global\_step() # 必须手动声明global\_step否则会报错
# 损失和优化
cross\_entropy = tf.nn.sparse\_softmax\_cross\_entropy\_with\_logits(logits=y, labels=tf.argmax(y\_, axis=1))
loss = tf.reduce\_mean(cross\_entropy)
with tf.name\_scope('train'):
optimizer = tf.train.GradientDescentOptimizer(0.01)
with tf.name\_scope('gradient'):
gradient\_all = optimizer.compute\_gradients(loss,weight2)
gradients\_node=tf.gradients(loss,weight2)
grads\_holder = [(tf.placeholder(tf.float32,shape=g.get\_shape()), v)
for (g, v) in gradient\_all]
# \*\*通过tf.train.SyncReplicasOptimizer函数实现函数同步更新\*\*
opt = tf.train.SyncReplicasOptimizer(
tf.train.GradientDescentOptimizer(0.01),
replicas\_to\_aggregate=n\_workers,
total\_num\_replicas=n\_workers
)
sync\_replicas\_hook = opt.make\_session\_run\_hook(is\_chief)
train\_op = opt.apply\_gradients(grads\_holder, global\_step=global\_step)
if is\_chief:
train\_op = tf.no\_op()
hooks = [sync\_replicas\_hook, tf.train.StopAtStepHook(last\_step=10000)] # 把同步更新的hook加进来
config = tf.ConfigProto(
allow\_soft\_placement=True, # 设置成True,那么当运行设备不满足要求时,会自动分配GPU或者CPU。
log\_device\_placement=False, # 设置为True时,会打印出TensorFlow使用了哪种操作
)
# ========== STEP5: 打开会话 ========== #
# 对于分布式训练,打开会话时不采用tf.Session(),而采用tf.train.MonitoredTrainingSession()
# 详情参考:https://www.cnblogs.com/estragon/p/10034511.html
with tf.train.MonitoredTrainingSession(
master=server.target,
is\_chief=is\_chief,
# checkpoint\_dir=MODEL\_DIR,
hooks=hooks,
# save\_checkpoint\_secs=10,
config=config) as sess:
print("session started!")
start\_time = time.time()
step = 0
while not sess.should\_stop():
# for step in range(THREAD\_STEPS):
xs,ys= mnist.train.next\_batch(BATCH\_SIZE) # batch\_size=32
#求每个梯度
grads = sess.run(gradients\_node, feed\_dict={x:xs, y\_: ys})
grads=np.array(grads)
grads=grads.reshape((10,10))
print(grads)
grad\_abs=np.abs(grads)
variance = np.var(grad\_abs, axis=1)
print('variance:',variance)
#取方差最大的几组值
topk\_var=tf.constant(variance)
k=1
output1 = tf.nn.top\_k(topk\_var, k)
with tf.Session() as sess1:
print(sess1.run(output1))
a=output1.indices[-1]
# print(sess1.run(a)) #a是所在TOPK个方差最大的索引值
x=a.eval()
a=int(x)
g\_a=grads[a,:]
# print('g\_a=',g\_a)
# print('\n')
#取方差最大的一组值中的前几个大的梯度值,设置梯度阈值
g\_a\_abs=np.abs(g\_a)
k=3
output2 = tf.nn.top\_k(g\_a\_abs, k)
with tf.Session() as sess2:
print(sess2.run(output2))
b=output2.indices[-1]
# print(sess2.run(b)) #a是所在TOPK个方差最大的索引值
x=b.eval()
b=int(x)
threshold=g\_a\_abs[b]
grad\_end=np.where(grad\_abs<threshold,0,grads)
grad\_end=[grad\_end]
grad\_var={}
for i in range(len(grads\_holder)):
k = grads\_holder[i][0]
if k is not None:
# grad\_var[k] =np.var([tf.reshape(g, [-1]) for g in grads])
grad\_var[k] =[g[i][0] for g in grad\_end]
\_, loss\_value, global\_step\_value = sess.run([train\_op, loss, global\_step], feed\_dict=grad\_var)
if step > 0 and step % 100 == 0:
duration = time.time() - start\_time
sec\_per\_batch = duration / global\_step\_value
print("After %d training steps(%d global steps), loss on training batch is %g (%.3f sec/batch)" % (step, global\_step\_value, loss\_value, sec\_per\_batch))
print('Training elapsed time:%f s' % duration)
step += 1
if __name__ == "__main__":
tf.app.run()
这是我的报错信息: