来也科技2015年成立以来,深耕于AI行业,先后推出智能助理“助理来也”、对话机器人平台、智能文档理解平台等人工智能类产品,服务超过两百家海内外五百强客户,在OCR、NLP领域积累了大量的数据、代码和模型。
为了满足各行各业的需求,我们通常需要在新的数据集上优化模型效果。这个工作是重复性较强的,都是基于已有的模型,去执行数据获取、标注、处理、finetune、评测效果、发布模型的流程。我们希望通过机器学习平台将上述工作尽可能自动化,更敏捷的交付项目;进一步释放算法工程师的人力,投入到新的创新性工作中。在当前大数据、大模型的背景下,深度学习基础设施平台的架构需要能够满足大规模训练所需的算力、存储和网络,具备强可扩展性的云原生架构是不二之选。本系列文章我们重点关注机器学习平台的技术难点及其在Kubernetes云原生底座之上的解决方案。覆盖编排、调度、存储、通信、推理等方方面面。本文是系列的第一篇:编排调度篇。
01分布式训练编排调度痛点在分布式训练编排和调度方面,存在以下技术问题需要解决:
02层次结构基于Kubernetes体系结构,我们可以通过不同层级的组件,分别解决上述问题。
我们可以看到上图的组件数量比较多,使用分层的体系架构的优点在于,基于Kubernetes框架抽象出的每个组件各司其职,缺点是学习曲线比较陡峭。为了让读者对各个组件的协作关系先有一个大致的了解,本节会做一个简单介绍,在后续章节里还会再进一步介绍每个组件的作用和原理。
在图中这些组件由上至下,依次是:
03Kubernetes基本原理
Kubernetes是云计算时代的操作系统,也是新一代基础设施的接入层,它提供了极强的可扩展性,鼓励用户以插件的形式介入服务生命周期的每一个阶段。我们的机器学习平台将运行在这样的基础设施平台上。因此,我们必须先对Kubernetes架构,编排、调度的原理和设计模式有一定了解。对Kubernetes比较熟悉的读者可以跳过这一章节。
3.1 架构
master上部署了控制平面的组件:
以下组件会在每个node节点上运行:
3.2 编排原理
上文说道controller manager主要负责编排工作。编排是k8s的当家本领,也集合了k8s最精粹的设计理念,理解controller的工作原理对理解整个系统非常重要。对于编排实现原理,两个设计模式不得不提:3.2.1 声明式API
在向k8s部署服务时,我们一般会定义一个yaml文件,描述我们所期望的服务的最终状态,k8s接收到请求后自动执行一些操作,让系统达到期望状态。与声明式API相对的是命令式API,比如通过CURD直接操作集群内资源。声明式API屏蔽了服务部署的细节,把复杂留给自己,将易用性留给使用者。以下文为例,定义了一个3个副本的nginx服务,部署服务时,只需要将这个yaml应用到k8s。
3.2.2 控制器模式
在机器人设计和自动化领域,控制器模式是非常常见的设计模式。控制器会不断检测期望状态(spec)和实际状态(status)的一致性,并控制系统达到期望状态。下图展示了k8s中的具体实现,以上文的nginx-deployment为例,informer组件会通过List&Watch监听到集群内出现了一个新的deployment,然后通过队列通知控制循环(也叫control loop、reconcile loop)处理这个请求。control loop接受到请求后执行具体的diff操作,发现集群内现在pod数为0,而期望启动3个。最后根据deployment的定义,向集群内创建3个1.14.2版本的nginx pod。而这些pod也是以yaml的方式创建的。
k8s原生的controller manager只能控制原生对象,比如deployment、statefulset、service、configmap等。但k8s也提供了很好的可扩展性,让用户也可以自己定义资源类型和控制器。现在大家一般会用CoreOS开源的operator框架进行自定义。operator框架主要做两件事:
3.3 调度原理其实,不只是controller运用了控制器模式,scheduler和kubelet也是以控制器模式工作的。上节说到,当资源被controller处理后会被拆分为pod。这些新的pod被应用到k8s集群时,实际只是向etcd里保存了一个key value对。真正的调度工作是scheduler和kubelet做的。3.3.1 schedulerscheduler的工作职责是为pod找到最适合的node,然后将nodeName更新到pod的yaml上。在scheduler的control loop中,scheduler会监听etcd的变更,拿到未被调度的pod,为pod选择node。选择node时会执行预选和优选两个策略:
k8s脱胎于google的Brog,一开始主要管理在线服务,原生的scheduler无法满足批处理任务的调度。深度学习训练场景的调度器应该支持批量调度、公平调度等高级调度策略。这要求我们自定义调度器。为了满足自定义scheduler的需求,k8s先后提出了scheduler extender、multiple scheduler、scheduling framework 3种方案,最F新的scheduling framework已经在v1.19达到stable状态。读者如果对3种方案的优缺点和演进历程感兴趣,可以移步阿里云ACK团队发表的文章:进击的 Kubernetes 调度系统(一):Kubernetes scheduling framework3.3.2 kubelet3.1我们提到每个node都会有一个kubelet。这个kubelet的control loop会监听包含当前节点nodeName字段,但还未被调度的pod。kubelet会再次判断当前节点的资源是否满足调度需求,如果满足,则调用容器运行时的接口创建容器,并指定资源申请相关的参数。原生的k8s只提供CPU和内存资源,对于GPU、SR-IOV这类extended resource,设备厂商一般会提供device plugin,以deamonset的形式启动在每个node上,向kubelet上报设备的类型、数量甚至拓扑结构,在容器启动阶段负责设备的虚拟化和绑定。
至此,我们花了比较大的篇幅介绍kubernetes的架构和编排调度原理,特别指出了声明式API和控制器模式的重要性。理解这些内容对理解云原生机器学习平台技术非常重要,想深入学习Kubernetes的读者,可以购买张磊老师的《深入剖析Kubernetes》一书,或学习极客时间的同名课程,相信一定会有所收获。
04流程编排机器学习训练通常是一个任务流,其中每个组件(component)分别承担不同职责。以来也通用文字识别模型为例,下图展示了数据下载、数据处理、训练、测试、寻找最优模型、上传模型一系列组件,这些组件之间的关系可以被表示为一个DAG(有向无环图)。
流程引擎核心要解决的问题是控制流程中每个组件的执行顺序,负责组件之间的数据传递。流程引擎作为发起模型训练的入口,选型是十分重要的。Kubeflow、Airflow、Mlflow都提供了流程编排的能力。Kubeflow作为k8s云原生的开源项目,拥有更开放的生态,提供了更多定制化的可能性。因此,本节会主要关注Kubeflow。Kubeflow Pipeline底层其实是基于流程引擎Argo做了一层封装,所以我们会先介绍一下Argo的工作原理,再介绍Kubeflow Pipeline。
4.1 ArgoArgo本身是用于CI/CD的云原生流程引擎,是CNCF的孵化项目,后来被Kubeflow引入模型训练和数据处理场景。如3.2.2中所描述的operator的工作原理,Argo其实就是一个用来做资源编排的operator,发起一个Argo任务也是向集群里应用一个yaml。下面定义了一个工作流,可以看到yaml里定义了2个component,一个用来生成字符串,一个接受字符串打印到控制台。通过这个简单的yaml,我们可以大致理解流程里每个组件是怎么定义的,他们之间是怎么通信的。
apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata: generateName: output-parameter-spec: entrypoint: output-parameter templates: # 定义2个component的关系 - name: output-parameter steps: - - name: generate-parameter template: whalesay - - name: consume-parameter template: print-message arguments: parameters: - name: message # 使用第一个step的输出作为当前step的输入 value: "{{steps.generate-parameter.outputs.parameters.hello-param}}" # 定义第一个component,用于生成字符串 - name: whalesay container: image: docker/whalesay:latest command: [sh, -c] # 打印字符串并写入文件 args: ["sleep 1; echo -n hello world > /tmp/hello_world.txt"] outputs: parameters: # 将文件内作为当前步骤的参数输出 - name: hello-param valueFrom: default: "Foobar" # Default value to use if retrieving valueFrom fails. If not provided workflow will fail instead path: /tmp/hello_world.txt # 定义第二个component,用于打印字符串 - name: print-message inputs: parameters: - name: message container: image: docker/whalesay:latest # 接受一个参数并在控制台打印参数 command: [cowsay] args: ["{{inputs.parameters.message}}"]
在实际运行的时候,一个component就是一个pod。我们知道在kubernetes分布式环境下,不同的pod会被调度到不同的node上。在模型训练这个大数据量的场景里,跨服务器的数据通信的效率十分重要。因此我们有必要了解一点Argo数据传递的原理。这里指的数据传递主要有两种情况:传递参数和传递文件。4.1.1 传递参数当pod之间需要传递参数时,operator会从前一个pod中查询到信息,在第二个pod启动时将参数注入到yaml的annotation字段里。由于yaml是用k8s etcd作为存储,因此这种传输方式对数据大小有限制,每个参数要满足etcd max-request-bytes的限制,默认是1.5M,而etcd最大支持存储8G的数据。这就意味着我们不可能通过这种方式传太大的数据,只能用来传输一些超参数。
4.1.2 传递文件当pod之间需要传输文件时,operator在启动component时不仅会启动用户容器,还会启动一个用来复制数据的sidecar容器。这个sidecar会在用户容器执行结束时从其中拷贝结果文件,通过s3协议上传到跟argo一起部署的一个minio里。在下一个component启动时,sidecar再从minio把文件下载下来。这个方案的问题在于,在模型训练场景里,每个component的output非常大,以OCR训练来说,一个数据集有超kw数据,如果在每个component里都要上传下载,是很浪费时间的做法。
由于argo原生的两种数据传递方式对模型训练场景并不是很友好,目前也没有太好的解决方案,我们现在的做法是通过挂载一个ReadWriteMany的分布式文件存储系统来实现的。文件存储是机器学习平台另一个重要话题,机器学习workload要求文件存储支持较大的存储量、优秀的性能和小文件随机读写能力,近些年云原生社区也涌现了一些数据编排方案,我们将在未来再发一篇博客详细讨论。感兴趣的读者可以关注《来也技术团队》公众号。对于Argo数据传输相关更多实现原理,可以移步华为云的技术博客:KubeFlow-Pipeline及Argo实现原理速析(KubeFlow-Pipeline及Argo实现原理速析-云社区-华为云)。
4.2 Kubeflow Pipeline(kfp)既然Argo提供了流程编排能力,kfp又做了什么呢?我认为kfp主要在3个方面提供了重要价值。
那么现在还有一个问题没有讨论清楚,基于kfp,怎么能够用到分布式训练的controller和scheduler。实际上,kfp提供了3种定义component的方式:
使用第三种方式,我们可以直接把自定义的CRD作为pipeline的一个component,自定义的controller和scheduler只需要监听特定的资源执行control loop即可。
05分布式机器学习理论
在了解如何启动分布式训练任务之前,我们需要对分布式机器学习理论有基本了解。本章会分3个方面简单介绍分布式机器学习理论:
5.1 数据与模型划分模块
5.1.1 数据并行(Data Parallelism)将大规模数据集划分到不同节点并行训练的方法,称之为“数据并行”。当模型参数单机能放得下时,通常采用数据并行,OCR、NLP模型训练一般用数据并行的方案。数据划分主要有2种操作的角度:一是对训练样本进行划分,二是对特征维度进行划分。我们主要讨论前者。对训练样本划分有2种常见做法:
目前数据并行技术已经达到可用阶段,Tensorflow、PyTorch、Horovod都提供了数据并行API。5.1.2 模型并行(Model Parallelism)将大规模模型切分成若干子模型(包含一部分模型参数),将每个子模型放在一个工作节点上的方法,称之为“模型并行”。搜推⼴、大规模预训练模型训练会用到模型并行方案。划分模型主要有横向划分和纵向划分两种方法:
一般来说,每层神经元数目很多但层数很少,可以考虑横向划分;反之如果每层神经元数目很多而层数较少,可以考虑纵向划分。
5.2 通信模块通信分为两种方式,一是共享内存(share memory)、二是消息传递(message passing)。共享内存主要用在单机多卡场景,单机多卡不可避免会遇到内存墙的问题。多机多卡通信需要用message passing。所以这节我们主要讨论后者。通信对于分布式训练至关重要,主要关注3个挑战:“怎样同步”、“何时同步”、“如何最小化通信的代价”,这3个挑战分别对应3个小节:“拓扑结构”、“步调”和“频率”。5.2.1 拓扑结构5.2.1.1 中心化中心化的拓扑结构是指所有节点分为server和worker两种角色。worker用来做计算,server负责协调各个worker。
中心化最经典的编程范式是google的MapReduce。MapReduce将程序抽象成Map和Reduce两个操作,Map负责数据分发和并行处理,Reduce负责数据同步和规约。
MapReduce很适合大数据处理,但用来做并行计算并不高效。这是因为Reduce流程是同步的,训练速度会受系统中最慢节点的影响,5.2.2小节会介绍同步和异步两种通信步调。Parameter Server(PS)是另一种常见的中心化异步编程模型。参数服务器中存储了模型的所有参数,worker可以去参数服务器中获取参数、更新参数。
Tensorflow最早发布分布式训练功能时就使用了PS架构。PS架构的主要问题在于多个worker共享同一套server,集群带宽不足时,server会产生网络拥塞,导致worker一直在数据同步环节阻塞,而GPU处于空转状态。5.2.1.2 去中心化跟中心化相对的是p2p的无共享拓扑结构,没有中心节点协调各个worker之间的通信,worker之间自行建立链接同步参数。拓扑结构的实现方式多种多样,星形、树形、蝶形、all-to-all理论上都可以,而拓扑结构影响了数据传输的数量和次数。每个节点之间一般是使用MPI的AllReduce接口实现通信,在gpu训练场景,一般使用Nvidia的NCCL通信库,它实现了部分AllReduce接口。现在常用的拓扑结构是环形结构(Ring All-Reduce)。2017年百度首先实现Ring All-Reduce算法,如今,Ring All-Reduce已经成为主流的单机多卡和多机多卡训练方案。
与PS架构的server负责所有通信工作不同,Ring All-Reduce将每个节点的参数切分成不同分片,在参数同步时,每轮将一个分片同步到下一个节点。这种做法能够充分利用到集群内的带宽资源,避免PS架构的网络阻塞问题。理论上Ring All-Reduce的传输耗时只受模型规模和带宽影响,不受节点数量影响,因此可以达到线性加速比。
5.2.2 步调通信还要考虑步调。同步通信是指一个节点完成本轮迭代后,等待其他节点都完成再统一更新参数。异步通信是指完成本轮迭代后,不等其他节点,直接更新自己的参数。同步通信的问题在于整体速度受掉队者(straggler)的影响,导致计算资源浪费严重。这个问题一般叫做straggler problem。因此,同步通信也不适合用在异构计算场景。
异步通信虽然能够提升效率,但速度较慢的节点依然会影响整体效率,当速度较慢的节点将一个陈旧的局部模型同步到其他节点时,会导致收敛性变差。这要求模型训练采用更小的学习率,进而影响收敛速度。为了解决这一问题,学界在结合同步、异步通信方法上有进一步研究成果,本文不再详述。
5.2.3 频率直观上看,通信频率越高,工作节点之间协调的越好,模型训练的效果应该越有保障。但通信频率越高,随之而来的通信代价就越高,在这一方向有人进行了一些研究,寻找计算代价和通信代价的最优匹配,从而实现最高的学习效率。除了降低通信频率外,还可以使用过滤、量化、压缩等方式提高通信效率,本文不再详述。
5.3 数据与模型聚合模块不同节点产生各自的模型或梯度之后,聚合模块负责将多个节点的参数聚合,更新模型。
本章主要提供了一个理论视角,用来看待分布式训练算法,实际上不同的分布式训练算法分别采用了不同的划分、聚合算法,选择了不同的通信拓扑结构、步调和频率。在云原生背景下,从工程角度来看,了解分布式算法的重点在于理解通信的拓扑结构,因为通信的拓扑结构决定了部署服务的拓扑结构。如果读者希望分布式机器学习有更直观的了解,推荐学习王树森博士的DeepLearning课程(https://github.com/wangshusen/DeepLearning)Parallel Computing一节。如果希望深入了解理论,推荐阅读微软亚洲研究院《分布式机器学习:算法、理论与实践》一书。
06分布式训练框架及其operator出于篇幅考虑,这节不会介绍分布式训练框架具体实现,而是会先从使用者的角度切入,了解当前常用的三种分布式训练框架Tensorflow、Pytorch、Horovod如何启动分布式训练任务,再介绍一下在云原生社区提供的分布式训练operator。
6.1 分布式训练任务Setup6.1.1 TensorflowTensorflow1.0使用parameter server进行分布式训练,下面这段代码可以直观了解到如何启动一个分布式训练任务。这段代码接受命令行传入的参数,参数列表指定了当前运行服务是parameter server还是worker,根据不同角色执行不同的初始化逻辑。
def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": is_chief = (FLAGS.task_index == 0) # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # Compute
下面这段脚本展示了如何在4台节点上启动2个ps节点和2个worker节点。
# On ps0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0# On ps1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1# On worker0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0# On worker1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1
import osimport sysimport tempfileimport torchimport torch.distributed as distimport torch.nn as nnimport torch.optim as optimimport torch.multiprocessing as mpfrom torch.nn.parallel import DistributedDataParallel as DDPdef setup(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12355' # initialize the process group dist.init_process_group("gloo", rank=rank, world_size=world_size)def cleanup(): dist.destroy_process_group()
6.1.3 HorovodHorovod是uber开源的、兼容主流模型训练框架的分布式训练框架,也是目前最流行的第三方分布式训练框架。使用Horovod对代码有一定侵入性,下方的示例展示了Horovod在Tensorflow上的一个example。通过horovodrun命令启动训练任务时,需要通过命令行参数传入各节点的地址。
import tensorflow as tfimport horovod.tensorflow.keras as hvd# Horovod: initialize Horovod.# 初始化 Horovod,启动相关线程和MPI线程hvd.init() # Horovod: pin GPU to be used to process local rank (one GPU per process)# 依据 local rank 为不同的进程分配不同的GPUgpus = tf.config.experimental.list_physical_devices('GPU')for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')(mnist_images, mnist_labels), _ = \ tf.keras.datasets.mnist.load_data(path='mnist-%d.npz' % hvd.rank())# 切分数据 dataset = tf.data.Dataset.from_tensor_slices( (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32), tf.cast(mnist_labels, tf.int64)))dataset = dataset.repeat().shuffle(10000).batch(128)mnist_model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, [3, 3], activation='relu'), ...... tf.keras.layers.Dense(10, activation='softmax')])# Horovod: adjust learning rate based on number of GPUs.# 根据Worker的数量增加学习率的大小scaled_lr = 0.001 * hvd.size()opt = tf.optimizers.Adam(scaled_lr)# Horovod: add Horovod DistributedOptimizer.# 把常规TensorFlow Optimizer通过Horovod包装起来,进而使用 ring-allreduce 来得到平均梯度opt = hvd.DistributedOptimizer( opt, backward_passes_per_step=1, average_aggregated_gradients=True)# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow# uses hvd.DistributedOptimizer() to compute gradients.mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), optimizer=opt, metrics=['accuracy'], experimental_run_tf_function=False)callbacks = [ # 广播初始化,将模型的参数从第一个设备传向其他设备,以保证初始化模型参数的一致性 hvd.callbacks.BroadcastGlobalVariablesCallback(0), hvd.callbacks.MetricAverageCallback(), hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1),]# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.# 只有设备0需要保存模型参数作为checkpointif hvd.rank() == 0: callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))# Horovod: write logs on worker 0.verbose = 1 if hvd.rank() == 0 else 0# Train the model.# Horovod: adjust number of steps based on number of GPUs.mnist_model.fit(dataset, steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose)
# To run on 4 machines with 4 GPUs each:$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
6.2 Operators从6.1示例中可以看到,setup一个分布式训练任务时,所有训练框架都要求使用者通过环境变量或命令行参数传入节点地址和标识符。这些信息会用来互相发现,建立拓扑关系,从而在各个节点内实现参数的传递和同步。但是这种操作带来诸多不便,每次启动都要传很多参数,不同节点标识符也不一样,使用MPI通信的训练框架还需要配置节点间访问秘钥,如果同时用很多机器训练模型,启动过程会非常繁琐,而这个启动流程完全是可以被自动化的。在3.2章节我们介绍了k8s的编排原理,重点说明如何在k8s自定义一个CRD,自己开发operator控制CRD的编排。kubeflow正是利用了k8s这一可扩展的特性,开发了一系列training operator,支持将不同框架实现的分布式训练任务的编排自动化。这节我们以基于tensorflow的tfoperator为例,介绍training operator的实现原理。tfoperator定义了一个名叫TFJob的CRD,先看一个使用TFJob启动的训练任务的示例。下面的yaml定义了2个ps节点、2个worker节点,将yaml apply到k8s里,operator会自动启动训练任务。
apiVersion: kubeflow.org/v1kind: TFJobmetadata: generateName: tfjob namespace: your-user-namespacespec: tfReplicaSpecs: PS: replicas: 2 restartPolicy: OnFailure template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: tensorflow image: gcr.io/your-project/your-image command: - python - -m - trainer.task - --batch_size=32 - --training_steps=1000 Worker: replicas: 2 restartPolicy: OnFailure template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: tensorflow image: gcr.io/your-project/your-image command: - python - -m - trainer.task - --batch_size=32 - --training_steps=1000
tfoperator实现的具体原理是:当监听到集群中出现了TFJob类型的资源时,operator会解析tfReplicaSpecs并创建ps, worker, chief类型的pod。由于tensorflow启动分布式任务时需要设置TF_CONFIG的环境变量,因此在启动pod之前,operator也会生成TF_CONFIG并注入到环境变量中。
// TF_CONFIG{ "cluster":{ "ps": [ "tfjob-ps-0.your-user-namespace.svc:2222", "tfjob-ps-1.your-user-namespace.svc:2222" ], "worker":[ "tfjob-worker-0.your-user-namespace.svc:2222", "tfjob-worker-1.your-user-namespace.svc:2222" ] }, "task":{ "type":"ps", "index":0 }}
在上文的TF_CONFIG示例中,可以看到每个ps和worker的pod都可以由一串dns地址表示,各个pod之间可以通过dns地址互相访问。使用dns地址而不是pod ip的好处是:如果运行过程中pod发生重启,dns地址不会变,其他pod也就不用感知到这次重启,更新本地的TF_CONFIG了。在k8s中,这种为每个pod绑定不变dns地址的需求可以通过创建headless service实现。总而言之,tfoperator利用了控制器模式,监听TFJob对象当前状态和期望状态的差异,保证集群内启动了足够数量的ps、worker pod,并为每个pod创建一个headless service用于互相访问。
07批处理任务调度7.1 模型训练场景下的调度问题Training operators解决了训练任务编排的问题,但也带来新的问题。一个训练任务(job)包含了多个pod,但k8s的默认scheduler是以pod为粒度调度的,不是以job维度调度。这种简单的调度策略在深度学习模型训练场景存在一些缺陷:
7.2 VolcanoVolcano(以前叫kube-batch)是华为开源的、k8s生态中高性能计算场景最常用的调度器,能够解决上述调度问题。7.2.1 在training operator中使用Volcanokubeflow training operator支持将workload指定给volcano调度器调度。如官网(https://www.kubeflow.org/docs/components/training/job-scheduling/)所描述:在k8s中安装volcano调度器,并为training operator设置一个命令行参数即可。在集群中发起新的训练任务时会自动被volcano调度。我们从实现的视角理解一下volcano调度器是如何被指定的。首先,就像我们在6.2所描述的那样,kubeflow training operator会监听TFJob类型的CRD并创建pod。当设置了volcano调度器时,training operator还做了其他事情:
而volcano则负责给pod找到一个最合适的节点,修改pod的nodeName字段。
7.2.2 Volcano实现原理Volcano scheduler的工作流程如下图所示:
回到7.1提出的问题,volcano通过提供各种plugin能够解决这些困境:
volcano的强可扩展性也提供给我们很大的自由度,去定制符合业务要求的调度策略。
08总结
本文从分布式训练痛点出发,提出了云原生场景解决这些问题的层次架构,针对流程编排、operator和scheduler展开具体描述,介绍了常见的开源方案和实现原理。理解kubernetes原理和分布式机器学习理论对理解mlops系统至关重要,因此也在文中穿插对两者的介绍。希望能够对读者有所帮助。由于篇幅原因,本文没有涉及弹性计算和拓扑感知相关技术,我们会在公众号上继续发布云原生机器学习平台系列文章,也欢迎大家关注《来也技术团队》公众号。
关于来也科技
来也科技RPA+AI现已开放免费使用社区版,要想体验智能自动化平台产品,您可点击链接试用!
更多关于LAIYE 智能自动化等问题,欢迎登陆
了解!更有免费专属一对一教练为您提供解答疑惑!也可私信来也科技咨询了解!
来也科技作为中国乃至全球的智能自动化领军品牌,为客户提供变革性的智能自动化解决方案,提升组织生产力和办公效率,释放员工潜力,助力政企实现智能时代的人机协同。
来也科技的产品是一套智能自动化平台,包含机器人流程自动化(RPA)、智能文档处理(IDP)、对话式AI(Conversational AI)等。基于这一平台,能够根据客户需要,构造各种不同类型的软件机器人,实现业务流程的自动化,全面提升业务效率。
目前,来也科技帮助电力,银行,保险、通信、零售等多行业的企业客户,以及数字政府、公共医疗、高校职教在内的公共事业领域,实现了各种业务场景的深度突破与打通,构建起了端到端的自动化解决方案,已服务超过 200 家 500 强企业,200 余个省市政府及上千家中小企业,2021年《财富》世界500强榜单前十名企业中,7 家在使用来也科技的智能自动化产品。
留言与评论(共有 0 条评论) “” |