Tencent无量 多线程push&pull实现

实现的初衷

对于参数服务器来说,影响速度性能的主要有两方面的因素:

  1. 第一是worker的计算能力,所以可以利用Eigen库对worker上的计算过程进行优化、减少计算的时间;
  2. 其次就是网络通信,worker和server都需要对消息进行收发,并且经常要等到对方的消息才能进行后续操作。这方面我能想到的有两点可以优化的方向。

    2.1 第一就是尽量减少网络带宽,以梯度g的push和pull为例进行说明。首先,我们可以牺牲梯度的精度用来减少网络带宽,之前g的精度是64位的double,如果可以用32位的float进行代替,就显著减少了网络通信量,这只是很初级的优化,百度的PaddlePaddle有了进阶的优化–梯度量化。另外,也可以进行User-defined Filters——支持用户自定义过滤器来过滤掉那些比较小的被push的entry。需要说明的是,这只是我知道的两种方法,由于此篇的重点不在此,就不详细展开了。

    2.2 第二就是缩短对于通信消息的处理时间,对于无量系统来说,之前是单线程对消息进行处理,以server为例进行说明,PSServer拥有一个Customer对象负责消息的收发,通过std::bind对PSServer::process函数进行绑定,消息的处理流程都是在process里面进行的,对于Customer的实现机制我还不太了解,但是通过process的函数参数可以看到每次只能处理一条消息,具体流程是收到消息,对消息进行拆解,然后调用相关函数对消息的请求进行处理,最后发回反馈信息,对于server来说,每一时刻只能处理一条消息,这样即使此时还收到了其他消息,这些消息的处理也需要被阻塞,进而消息的发送者也需要等待,这就增加了网路通信的时间成本。

如何优化

通过2.2的瓶颈分析,我们发现server端“每次只能处理单条消息”大大增加了网络通信的延时,另外目前server主要的功能就是参数的存储,也就是说对Memory的利用比较充分,但是由于server没有什么复杂的计算,所以对CPU的利用率很低。自然,我们可以想到的是在server端通过多线程对消息进行处理,这样既可以减少网络的通信时间,也可以增加CPU的利用率,何乐而不为呢?

第一个想法:是对于push和pull各开一个线程池,分别有自己的消息队列。process收到消息后,先判断是push还是pull,meta.push\==1就塞进push的消息队列,meta.push\==0就塞进pull的消息队列(注意这里还需要仔细区分clock或者predict等操作,我还没仔细看,就不详细展开了,后续代码上也需要做进一步处理)。线程池中的线程直接从各自的消息队列中拿到消息,然后独自进行处理。这样,server端就可以实现同时对多个消息进行处理。进一步思考,我们发现push消息和push消息同时进行处理,pull消息和pull消息同时进行处理,push消息和pull消息同时进行处理。这三种并行机制有没有什么问题呢,对于pull来说,是从server读消息,所以同时pull没有问题;但是push是写参数,如果同时写到同一个参数,或者同时读写一个参数,由于对server_table没有加锁,就会出现逻辑问题(什么问题??)(也可能core掉)。另外,由于push和pull是独立进行处理,所以在pull一批参数的时候,worker可能获得不同迭代次数的参数,记得这样会导致收敛变慢或者不收敛的问题(这个具体记不清楚了,需要后面查一下资料),但好像单线程处理也会出现这样的问题啊?或者说这个问题是问题吗?

第二个想法:第一个想法的数据一致性问题比较严重,因此做一下改进。在多个server存在的情况下,我们是将一个大的模型表分布式存储在各个server中,每个server独自负责表的一部分——也就说按照server为单位对模型表进行划分,依此类推,我们更加细化——按照线程为单位对模型表进行划分。在“server独自负责表的一部分”基础上,一个server中的每个线程独自负责“一部分”的一部分。定性分析一下如此实现和之前的不同:

  • 对于worker端来说,之前每次发送请求的时候是需要计算每个数据应发送到哪个server就行,即按照server为单位对数据进行拆分(partition)和打包发送(send),现在在WorkerImp中请求的数据key是按照线程为单位进行划分,即首先按照server为单位进行划分,然后再按照线程为单位进行划分,因此需要对partition函数进行重写,即增加了计算量和处理流程。
  • WorkerImp原来针对每个server发送一次请求——数据包比较大,现在WorkerImp针对每个线程发送一次请求,即把原来的大数据包拆分成了小数据包,这样每次的数据包比较小,但是发送消息的频率显著增加了,倍数略等于server端开的线程数目。把大包拆解成小包,是能加快通信还是延缓通信,这个应该到生产环境中进行评测的。
  • 考虑极限情况,频繁进行push和pull操作的话,消息队列的长度将会非常大。

实现细节

worker端:

  • WorkerImp增加两个函数multi_pull和multi_push,和之前pull、push的不同在于分线程对key进行处理,分别发送消息。这里有一个小坑:在worker端msg.meta.threadindex要被赋值,这样server端才能根据threadindex将不同的消息分发给不同的线程,但是仅仅在这里设置是无效的,在van.cc里面,函数Van::PackMeta还需要进行pb.set_threadid(meta.threadindex)设置,对应Van::UnpackMeta还需要进行meta->threadindex = pb.threadid()设置,因此需要对meta.proto也增加threadindex字段。
  • 目前只是在multi_pull和multi_push里面对msg.meta.threadindex进行了赋值,对于clock操作没有实现multi_clock功能,因此在server端都是threadindex=0的线程执行clock操作,之所以没有实现主要是考虑到clock操作本身就很复杂,并且调用的频率应该不是很频繁,因此直观上感觉指定threadindex=0的线程进行处理也没有什么问题。
  • 新增文件kv_worker_table_multi_thread.h,和kv_worker_table.h对比,其实就是重新实现了partition函数,其中函数参数out的类型发生了改变,比原来的out内部多了一层vector,用来存储不同的线程应该接收到的消息。目前只实现了hash_partition,没有实现range_partition。
  • 在lr_minibatch.cc里面需要在void StartServer()和void RunWorker()分别设置
   '''GlobalContext::Get()->set_server_thread_num(server_thread_num);'''

因为是两个进程嘛,在全局参数的设置和读取其实都是独立的,需要各自去读配置文件。

ps端:

  • 增加了solve_thread.h,它内部维护一个消息队列,可以进行push、pull、clock、predict操作。
  • ps_server.cc改动比较大,首先构造函数里初始化线程池,注册receive函数,receive函数可以针对不同的请求调用相关的函数。process函数和之前的功能不同了,现在只是将消息insert到不同的线程的消息队列中。
  • (之前有一个问题没有发现,就是clock比较特殊。在server端处理worker端的clock请求的时候,所有的其他线程都不可以再进行其他原子请求的处理了(push、pull),因为)——这个仔细想了一下,应该是不用加锁,后续再想想实验一下。

其他说明:

  • 注意和实现之前的优化算法不同,对于lr_minibatch.cc等这些有main函数的文件来说,我们只需要更改worker注册的模型表(因为需要更改消息分发的机制),比如之前用的是KVWorkerTable,现在用的是KVWorkerTableMultiThread,对于server端来说没有任何变化(优化算法仅需要更改server端的注册模型表)。

性能评测

评测主要在传统sgd和adam上进行,这两个算法在server端的逻辑有不同。

先看server端单线程的情况:对于sgd来说,单纯看pull和push的耗时,是相同的,都是0.001秒;而对于Adam来说,push操作具有比较繁琐的计算过程,因此push的操作是0.01秒,pull的耗时不变——还是0.001秒。我打印了server端的消息队列长度,对于sgd来说,队列的长度一般是0、1、2,即基本没有阻塞,server端的push和pull操作的for循环很快,不是server和worker交互时的瓶颈;但对于adam来说,push操作很耗时,因此server端复杂的计算过程成为了交互时的瓶颈,消息队列的长度是180、190这么长。因此,adam一个minibatch时间都很长(3.5s左右),一个epoch耗时208min。sgd一个minibatch时间约为0.5s(0.3s——0.6s之间),一个epoch耗时34min。

现在在server端多线程push&&pull(线程池大小为40,因为server申请了40个CPU),对于adam来说,每个线程的消息队列长度基本就是0了,峰值的时候会到达18、19(极少情况会出现,但是没有想明白为什么会出现峰值情况)。每个线程的消息队列长度基本就是0,是因为每个push和pull操作相对于单线程时需要处理的量减少了很多(40倍),因此for循环的时间也急剧减少(pull的耗时为0,push的耗时一般也为0,有时会为0.001),这样就会和单线程sgd的情况一致了,server端的计算过程不会是两者交互的瓶颈,因此一个minibatch的时间基本和单线程sgd的时间差不多——0.6左右(0.6-0.8区间,除掉WorkerImp::process一些无用日志后,基本也就是0.5了),一个epoch耗时41min(除掉WorkerImp::process一些无用日志后,耗时缩减为33min),因此提升了6倍的速度。对于sgd来说,一个epoch耗时为32min,性能提升不多的主要原因是因为server端的for循环基本不是两者交互的瓶颈(从消息队列长度一般是0、1、2就可以看出来),有一点点提升是因为单线程有时消息队列还是被阻塞的(长度为1、2的时候)。

总结

之前其实比较纠结,相对于单线程来说,多线程处理到底好在哪里。现在知道了,通过多线程其实减少了server端for循环的耗时,只要for循环是server-worker交互的瓶颈,那么通过多线程可以将for循环的长度减少了,极大消除了瓶颈问题。