Press "Enter" to skip to content

Flink ML API,为实时机器学习设计的算法接口与迭代引擎

本站内容均来自兴趣收集,如不慎侵害的您的相关权益,请留言告知,我们将尽快删除.谢谢.

摘 要 : 本文整理自阿里巴巴高级技术专家林东、阿里巴巴技术专家高赟(云骞)在 Flink Forward Asia 2021 核心技术专场的演讲。主要内容包括:

 

 

面向实时机器学习的 API

 

流批一体的迭代引擎

 

Flink ML 生态建设

 

 

Tips: 点击 「阅读原 文」 查看原文视频 & 演讲PDF~

 

一、面向实时机器学习的 API

 

 

Flink ML API 指的是提供给用户使用算法的接口。通过把所有算法打包为统一的 API 提供给用户,让所有使用者的体验保持一致,也能降低学习和理解算法的成本,此外算法之间也可以更好地交互和兼容。

 

举个例子,在 Flink ML API 中提供一些基础的功能类,通过使用这些功能类可以把不同算子连接组合成一个高级的算子,可以大大提高了算法的开发效率。同时,通过使用统一的 Table API,让所有的数据都以 Table 格式进行传输,可以使得不同公司开发的算法能够互相兼容,降低不同公司重复开发的算子的成本,提升算法合作的效率。

 

 

之前版本的 Flink ML API 还是存在不少痛点。

 

首先是表达能力方面。之前的 API 的输入只支持单个 Table 的形式,无法表达一些常见的算法逻辑。比如有些训练算法的输入表达是一张图,把数据通过不同的 Table 传进来,这种情况下单个 Table 输入的接口就不适用了。再比如有些数据预处理的逻辑需要将多个输入得到的数据进行融合,用单个 Table 输入的 API 也不适合。因此我们计划把算法接口扩展为支持多输入多输出。

 

其次是实时训练方面。之前的 API 无法原生支持实时机器学习场景。在实时机器学习中,我们希望训练算法可以实时产生模型数据,并将模型数据以流的方式实时传输到多个前端服务器中。但是现有的接口只有一次性的训练和一次性的推理 API,无法表达这种逻辑。

 

最后是易用性方面。之前采用 toJson() 和 fromJson() 来导出和加载模型数据,并且允许用户储存这些数据。但是有些模型的数据量高达几个 G,在这种情况下将模型数据以 string 的方式进行储存,效率会非常低,甚至可能无法实现。当然,存在一些 hacky 方法,可以把模型数据储存到一个远程终端,再把相关的 url 通过 toJson() 方法传导出来。但是这种情况下会存在易用性的问题,算法使用者需要自己去解析 URL,并从远程获取这些数据。

 

受限于以上几个方面的因素,我们决定对 Flink ML API 进行扩展。

 

 

在经过大量讨论以及思考之后,我们对新的 API 赋予了以下特性,解决了上面所有问题。

 

第一,在 API 上增加了获得模型数据的接口,比如在 model 上增加了 getModelData() 和 setModelData() API,能够帮助实现实时机器学习场景;

 

第二,对算子的接口做了扩展,让算子可以支持多输入多输出,还可以将不同算子以 有向无环图的方式进行整合,打包成更高级的算子;

 

第三,新的 API 是基于 datastream 实现的,可以支持流批一体的特性,能够同时实现基于有限流和无限流的在线训练;

 

第四,对算法的参数存取 API 做了改进,新的算法参数的存取 API 更容易使用;

 

第五,对模型的存取 API 做了改进,新的模型存取 API 采用了 save() 和 load() API,模型数据非常大的情况下用户也无须考虑这方面的复杂度,只需要调用 save() 和 load() 就可以实现相关的功能;

 

第六,提供了无模型语义的抽象类。

 

 

上图是最新的 API 构架图。最上层有一个 WithParams interface,它提供了存取参数的 API。我们对这个接口做了改进,用户不再需要表达 isOptional 之类的 field。这个接口之下是一个 stage 接口,它包含了所有算法模块,并提供了存取模块的 API,即 save() 和 load()。save() 负责把模型数据和参数储存下来,load() 负责把模型数据和参数读取出来,还原原先的 stage 实例。用户不用考虑参数存取的复杂度。

 

Stage 下分为两块,一块是表达训练逻辑的 Estimator,另一块是表达推理逻辑的 AlgoOperator 和 Model 类。Estimator 的核心 API 是 fit()。与之前不同的是现在支持多个 Table 的输入,可以用来表达需要多个 Table 输入逻辑,比如特征的拼接。Estimator::fit() 输出的是一个 Model。Model 属于 AlgoOperator。AlgoOperator 表达的是计算逻辑,支持多个 Table 作为输入和输出,每个 Table 都是一个数据源,可以用来表达通用的计算逻辑。

 

AlgoOperator 之下是 Transformer,可以表达对数据做转换的逻辑。它与 AlgoOperator 具有相同的 API 格式,但是它们的抽象概念却有所不同。Transformer 是一个具有模型语义的数据转换逻辑。在计算中,比如数据预处理,存在一些更通用的将不同数据进行拼接转换的操作,例如对数据进行过滤,在通用的概念下可能并不适用于 Transformer。因此我们特意增加了 AlgoOperator 类,方便用户的理解和使用。

 

Transformer 之下是 model 类。我们增加了 setModelData() 和 getModelData() API。这两个 API 是为实时机器学习专门设计的,可以让用户把模型数据实时导出到多个远程终端做在线的推理。

 

 

上图是一个比较简化但经典的实时机器学习场景。

 

这里的数据来源主要有两个,静态数据来自于 HDFS,动态数据来自于 Kafka。由 AlgoOperator 读取来自以上两个数据源的数据,将它们拼接之后形成一个 Table 输入到 Estimator 逻辑。Estimator 读取刚才拼接得到的数据并产生一个 Model,然后可以通过 getModelData() 拿到代表模型数据的 Table。再通过 sink() API 将这些数据传输到 Kafka topic。最后在多个前端服务器上面运行程序,这些程序可以直接创建一个 Model 实例,从 Kafka 中读出模型数据形成一个 Table,再通过 setModelData() 把这些数据传递给 Model,使用得到的 Model 做在线推理。

 

 

在支持在线训练和在线推理之后,我们进一步提供了一些基础组件,方便用户通过简单的算子构建更复杂算子,这个组件便是 FLIP-175 提供的 GraphBuilder。

 

假设用户的输入也是与上文一致的两个数据源,最终输出到一个数据源。用户的核心计算逻辑可以分为两块,第一块是数据预处理,比如特征拼接,把两个数据源的数据读进来之后做整合,以 Table 的形式输出到 Estimator,执行第二块的训练逻辑。我们希望先执行训练算子,得到一个 Model。然后将预处理算子和 Model 连接,表达在线推理逻辑。

 

用户需要做的只是通过 GraphBuilder API 将上述步骤连接进行,不需要专门为在线推理逻辑再写一遍连接逻辑。GraphBuilder 会自动从前面一个图生成,并与后面图中的算子形成一一对应的关系。AlgoOperator 在训练图中的形式是直接转换为推理图中的算子,而 Estimator 在训练图中得到的 Model 会成为推理图中对应的节点,通过将这些节点相连,便得到了最后的 ModelA,最终用作在线推理。

 

二、流批一体的迭代引擎

 

 

Flink 是一个基于 Dag 描述执行逻辑的流批一体的处理引擎,但是在许多场景下,尤其是机器学习\图计算类型的应用中,用户还需要数据迭代处理的能力。例如,一些算法的离线训练、在线训练以及模型部署后根据结果动态调整模型参数的场景,都需要数据迭代处理。

 

由于实际的场景同时会涵盖离线和在线处理的案例,因此需要在迭代这一层能够同时支持离线和在线处理。

 

 

前文提到的三个场景的处理逻辑既存在区别,也存在共性。

 

对于离线训练,以逻辑回归为例,在迭代中可以使用一个节点来缓存整个模型,这个节点会将最新的模型发送给训练节点,而训练节点会在迭代开始前预先读取整个数据集,随后在每次收到最新的模型后,从数据集中选择一个mini-batch数据对模型进行更新,并把结果发送回模型缓存节点。

 

对于在线计算,由于训练数据是从外部源源不断到达的,无法预先读取所有训练数据,一般的做法是动态读取一个 mini-batch 的数据,计算完模型的更新后将其发送给模型缓存的节点,等模型的缓存节点进一步发送更新后的模型以后,再读取下一个 mini-batch 数据,这也就要求训练节点必须采用优先级读的方式对数据进行读取,从而最终实现逐个处理 mini-batch 的能力。

 

在这两种场景下的训练都存在同步和异步方式,具体取决于模型缓存节点是否要收集到所有更新后再开始下一轮训练。此外还存在一些模型,在预测的时候会对参数进行动态的更新,处理完每一条数据之后都要立刻评估是否会进行参数更新,如果需要就再发起更新。

 

 

这几种场景下的计算逻辑存在一定的共性,首先都需要在作业图中引入迭代的结构来支持数据的循环处理,并且在数据循环处理之后需要进行是否终止迭代的判断。另一方面,计算过程中还需要在每一轮数据接收完成后,接收到相应的通知,触发特定的计算,比如离线训练中,接触完整个模型后就要开始下一轮的计算。

 

这里其实存在两个选择,一个是在迭代这一层,直接提供将数据集划分为多个 mini-batch,并且对每个 mini-batch 赋予迭代的能力。它在逻辑上可以接受所有类型的迭代,但是如果想要同时支持逐个 mini-batch 处理与多个 mini-batch 并行处理的逻辑,就必须引入一套新的基于 mini-batch 的流处理接口,并且在实践层支持这两种处理逻辑。

 

另外在线训练和离线训练的 mini-batch 产生的方式也不一样,离线 mini-batch 是通过本地预先读取所有数据,然后在每一轮处理中进行选取来实现。而在线 mini-batch 通过从外部数据中读取特定数据量的数据来实现的。因此如果想要兼容这两种情况,会进一步增加接口和实现的复杂度。

 

最后如果要兼容单个 per-record 的处理,还必须引入无限大的 mini-batch,或将每条记录看作一个单独的 mini-batch,前者会进一步增加接口的复杂度,而后者需要每一记录对算子进行一次通知,会增加运行时的开销。

 

考虑到上述情况,我们在迭代中只提供了整个数据集级别的通知,而将划分 mini-batch 的功能放在了迭代之上。在离线训练中,用户可以通过从整个数据集中选取一部分数据来高效实现 mini-batch 选择的功能。而在在线计算中,用户可以通过 Flink 自带的运行集输入算子,实现逐个 mini-batch 处理的功能。

 

 

基于上述思路,整个迭代的设计如上图所示,主要由 4 部分组成。初始模型这类有回边的输入、数据集这类无回边的只读输入、迭代回边的位置以及最终输出。其中回边对应的数据集与有回边的输入是一一对应的,从回边返回的数据与初始数据进行 union 之后,实现数据的迭代处理。

 

迭代内部为用户提供了数据集处理完成的通知功能,即进度追踪的能力。用户基于这一能力可以实现处理完成数据集的某一轮之后执行特定操作。比如在离线训练的时候,用户可以在某个算子收到模型的更新数据之后,计算模型的下一轮更新。

 

此外对于没有回边的输入数据,允许用户指定每一轮是否进行重放。对算子也提供了每轮新建一个算子以及通过一个算子的实例处理所有轮次数据的能力。通过这种方式,用户无须重建算子实例就能实现在轮次之间缓存数据的能力。用户也可以通过回放输入数据并重建算子来复用迭代外的算子,比如 Reduce、Join 这种常用算子,输入数据进行重放并且算子会在某一轮进行重建,这种情况下用户可以直接复用这些迭代外的算子。

 

同时,我们提供了两种终止判断逻辑,一种是当整个迭代中已经没有数据在处理的时候,会自然终止迭代。另外一种是在有限流的情况下,也允许用户指定特定数据集,如果这一数据集在某轮没有数据产生,用户可以提前终止整个迭代。

 

 

DataStream<double[]> initParameters = …
DataStream<Tuple2<double[], Double>> dataset = …


DataStreamList resultStreams =
    Iterations.iterate(
          DataStreamList.of(initParameters),
          ReplayableDataStreamList.notReplay(dataset),
          IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
          (variableStreams, dataStreams) -> {
                    DataStream<double[]> modelUpdate = variableStreams.get(0);
                    DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
 
                  
                    DataStream<double[]> newModelUpdate = …
                    DataStream<double[]> modelOutput = …
                    return new IterationBodyResult(
       DataStreamList.of(newModelUpdate), 
       DataStreamList.of(modelOutput)
                });
         
DataStream<double[]> finalModel = resultStreams.get("final_model");

 

上图是使用迭代 API 来构建迭代的例子。用户需要指定有回边和无回边的输入列表、算子是否需要每轮重建以及迭代体的计算逻辑的等。对于迭代体,用户需要返回回边对应的数据集以及迭代的最终输出。

 

 

public static class ModelCacheFunction extends ProcessFunction<double[], double[]>
        implements IterationListener<double[]> { 
         
        private final double[] parameters = new double[N_DIM];
 
        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
        }
 
        void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector) {
            if (epochWatermark < N_EPOCH * N_BATCH_PER_EPOCH) {
                collector.collect(parameters);
            }
        }
 
        public void onIterationEnd(int[] round, Context context) {
            context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }
    }

 

对于迭代内的算子,如果它实现了 IterationListener 接口,就会在所有数据集处理完某一轮之后,通知迭代的算子。如果整个迭代都处理终止则会通过 onIterationTerminated 对算子进行进一步通知,用户可以在这两个回调中实现需要的计算逻辑。

 

 

在迭代的实现中,对于用户通过代码来创建的迭代处理结构,会增加一部分迭代内部的节点,并对用户所有的处理节点进行 wrap 操作,从而达到管理算子生命周期并对数据类型进行转换的目的。最后, 迭代基于 Colocation 与本地内存实现了回边,这样在调度器看来整个作业逻辑仍然是一个 DAG,从而可以直接复用现在的调度逻辑。

 

 

在迭代中插入的专用算子主要包括 input、output、head 与 tail 算子,其中 input 和 output 负责数据类型的转换,外部数据进入迭代内时会为每一条记录增加一个迭代头,里面记录了该 record 处理的轮次,每个算子的 wrap 会将迭代头去掉后交给用户原始的算子处理。

 

head 和 tail 算子负责实现回边及计算某一轮迭代是否已经全部处理完成。head 算子从 input 读取完整个输入,并在最后插入一条特殊的 EpochWatermark 事件,来标记第零轮迭代的终止。由于 head 算子可能会存在多个并发,所以必须等 head 算子的所有并发都读取完输入后,才能发送第 0 轮终止的事件。

 

head 算子通过 Operator Coordinator 来同步所有并发。Operator Coordinator 是一个位于 JobManager 中的全局组件,它可以与所有 head task 进行双向通信,所有 head 算子并发都收到每一轮处理完成的通知后,就会发送全局广播给所有 head task,告诉他们这一轮的处理已经全部结束。head 发送 EpochWaterMark 之后,所有迭代中的算子都会与这一消息进行对齐。算子从所有输入中都读取到特定轮次的 EpochWatermark 之后,就会认为这一轮处理完成,并调用这一轮结束的回调。当 tail 节点收到某一轮数据或 EpochWatermark 之后,会将轮次加 1,然后通过回边再次发送给 head,从而实现数据循环处理。

 

 

最后我们也支持了有迭代情况下的 checkpoint 功能。由于 Flink 默认的 checkpoint 机制无法支持带环的计算图,因此我们对 Flink 的 checkpoint 机制进行了扩展,实现了带环的 Chandy-Lamport 算法,会同时缓存来自回边的消息。另外 head 算子在对齐的时候,除了要读取正常输入的 barrier 之外,也会等待来自 Operator Coordinator 的特殊的 barrier。每一轮全局结束的消息也是来自 Operator Coordinator 广播,可以保证每个 checkpoint 中所有迭代内的算子都处在同一轮,从而简化算子后续进行并发修改的操作。

 

另外还有一个开发中的优化,Operator Coordinator 会将收到的 barrier 延迟到下一个全局对齐消息之后,再发送通知,从而使得整个迭代内的算子的 state 都是恰好处于读取完某一轮数据之后。许多同步算法都是先将缓存收到的数据存储在算子中,直到读取完一轮所有数据之后再进行统一处理。这一优化可以保证在进行 snapshot 操作的时候,正好清空所有缓存,从而使整个 checkpoint 中缓存的数据量最小。

 

 

以上是关于 Flink 流批一体迭代引擎的介绍,这些引擎可以同时在在线和离线场景中使用,并且支持 exactly-once 的容错。未来我们将进一步支持 batch 的执行模式,并提供更多的上层开发工具。

 

三、Flink ML 生态建设

 

 

最近我们已经将 Flink ML 相关代码从 Flink 核心代码库中移入一个单独的 Flink ML 代码库。这样做的首先是为了方便 Flink ML 的快速迭代,其次也希望通过这个手段减少 Flink 核心代码库的复杂度,避免 Flink 核心代码库过于臃肿。

 

另外,我们在 Github 上建立了一个中立的组织 Flink-extended,能够为所有 Flink 社区的开发者提供平台来贡献一些他们希望开源的项目。方便大家分享不带有特定公司的名字的代码,使不同公司的开发人员可以把代码贡献出来,方便 Flink 社区来使用和共建。我们希望借此促进 Flink 生态的繁荣发展。

 

目前中立项目中已经有一些比较重要的项目,比如 Deep Learning on Flink,它是由阿里大数据团队主要开发的一个开源项目,核心作用是可以把 Tensorflow 打包成 Java 算子在 Flink 中运行,方便将 Flink 的预处理程序与 Tensorflow 深度学习的训练算法相结合,形成端到端的训练以及推理。

 

最近我们已经在 Flink ML 中新增了若干常见算法,之后还会继续提供更多开箱可用的算法。

 

 

上图是我们目前正在进行中的重要工作,其中最核心的工作是将现有的阿里巴巴开源的 Alink 代码库进行改造,使其中的算法能够适配新设计的 Flink ML API,并将改造后的算法贡献到 Apache 项目,方便 Flink 用户得到更多开箱可用的算法。

 

此外,我们还与 360 一起合作共建 Clink 项目,核心目标是在离线计算中用 Java 去运行某些算子,得到训练结果。另一方面,这些算子需要能够以非常低的延迟做在线推理。然而低延迟在线推理很难用 Java 实现,通常需要用 C++ 来实现。为了使开发者只写一遍算法就能同时应用于 Java 和 C++ 环境,Clink 提供了一些打包的基础类的功能,方便算法开发者写好 C++ 算子之后,能够使用 JNI 打包成 Java 算子,并在 Flink 中使用这些算子。

 

最后,我们计划在 Flink ML 中开发对于 Python 的支持,其中包括允许算法使用者通过写 Python 程序将 Flink ML 中的 Java 算子进行连接和组合使用,希望能提高机器学习开发者的效率和使用体验。

 

以上工作基本都已经进入开源项目,其中算法 API 的设计在 FLIP-173 中 ,迭代引擎的设计主要在 FLIP-176 中 ,FLIP-174 和 FLIP-175 分别提供了算法参数的 API 以及 GraphBuilder 的 API。Clink 和 Deep Learning on Flink 等项目也已经在 Flink-extended 的组织上,欢迎大家使用。

 

Be First to Comment

发表评论

您的电子邮箱地址不会被公开。