在 Flink 算子中使用多线程如何保证不丢数据?

  • 时间:
  • 浏览:0
  • 来源:大发彩神8下载最新版—大发快三官网大发彩神

同类,Flink 任务消费的 Kafka 数据,当做 Checkpoint 时,Flink 任务消费到 offset 为 111500 的位置,但实际上 offset 111500 就让的一小要素数据肯能还在数据缓冲队列中尚未完全消费,肯能肯能没积攒够一定批次好多好多 数据缓处于 client 中,并未请求到第三方。当任务失败后,Flink 任务从 Checkpoint 处恢复,会从 offset 为 111500 的位置现在过后刚开始消费,此时 offset 111500 就让的一小要素缓处于内存缓冲队列中的数据不用再被消费,于是就再次出现了丢数据状况。

答:会丢数据。肯能上述案例中使用的批量 api 来消费数据,若果批量 api 是每积攒 150 条数据请求一次第三方接口,当做 Checkpoint 时肯能只积攒了 150 条数据,好多好多 做 Checkpoint 时内存中肯能还有数据未发送到内外部系统。就让数据缓冲队列中肯能还有缓存的数据,就让上述 Sink 在做 Checkpoint 一定会再次出现 Checkpoint 就让的数据未完全消费的状况。

说明:多多多线程 的方案,不仅限于请求第三方接口,对于非 CPU 密集型的任务不用能 使用该方案,在降低 CPU 数量的同去,单个 CPU 承担多个多多线程 的工作,从而提高 CPU 利用率。同类:请求 HBase 的任务或磁盘 IO 是瓶颈的任务,不能 降低任务的并行度,使得每个并行度内补救多个多多线程 。

上述请求示意图不能 看出 Flink 任务发出请求到响应这 150ms 期间,Flink Sink 算子好多好多 在 wait,并这样 实质性的工作。就让,CPU 使用率肯定很低,当前任务的瓶颈明显在网络 IO。最后结论是 Flink 任务申请了 1150 颗 CPU,意味着 yarn 或一些资源调度框架这样 资源了,就让这 1150 颗 CPU 的使用率未必高,这不能 不能优化通过提高 CPU 的使用率,从而少申请一些 CPU 呢?

Sink 算子代码如下所示,在 open 最好的办法中时需初始化多多线程 池、数据缓冲队列并创建开启消费者多多线程 ,在 invoke 最好的办法中只时需往 bufferQueue 的队尾加进数据即可。

答:不能保证 Exactly Once,Flink 要想端对端保证 Exactly Once,时需要求内外部组件支持事务,这里第三方接口明显不支持事务。

当做 Checkpoint 时 snapshotState 最好的办法中执行 clientBarrier.await(),等候所有的 client 多多线程 将缓冲区数据消费完。snapshotState 最好的办法执行过程中 invoke 最好的办法不用被执行,即:Checkpoint 过程中数据缓冲队列不用增加数据,好多好多 client 多多线程 好快了 就不能 将缓冲队列中的数据消费完。

首先不能 想到的是将同步请求改为异步请求,使得任务不用阻塞在网络请求一些环节,请求示意图如下所示。

就让轮循肯能随机策略会不足英文图片图片,若果 5 个 Client 中 Client3 多多线程 消费较慢,会意味着给 Client3 下发数据时被阻塞,从而使得一些正常消费的多多线程 Client1、2、4、5 也被下发不能数据。

笔者线上有有一一十个 Flink 任务消费 Kafka 数据,将数据转换后,在 Flink 的 Sink 算子内内外部调用第三方 api 将数据上报到第三方的数据分析平台。这里使用批量同步 api,即:每 150 条数据请求一次第三方接口,不能 通过批量 api 来提高请求速率。肯能调用的外网接口,好多好多 每次调用 api 比较耗时。若果批次大小为 150,且请求接口的平均响应时间为 150ms,使用同步 api,就让第一次请求响应就让才会发起第二次请求。请求示意图如下所示:

为了补救上述问题图片图片,不能 在 Sink 算子内申请有一一十个 数据缓冲队列,队列有先进先出(FIFO)的特性。Sink 算子接收到的数据直接插入到队列尾部,十个 Client 多多线程 不断地从队首取数据并消费,即:Sink 算子先接收的数据 Client 先消费,后接收的数据 Client 后消费。

这十个 多多线程 内分别使用同步批量请求的 Client,单个 Client 还是保持 150 条记录为有一一十个 批次,即 150 条记录请求一次第三方 api。请求第三方 api 耗时主要在于网络 IO(性能瓶颈在于网络请求延迟),就让肯能变成 5 个 Client 多多线程 ,每个 Client 的单次请求平均耗时还能保持在 150ms,除非网络请求肯能达到了速率上限或整个任务又遇到一些瓶颈。好多好多 ,多多多线程 模式下使用同步批量 api 不能将请求速率提升 5 倍。

通过异步请求的最好的办法,不能 优化网络瓶颈,若果 Flink Sink 算子的单个并行度平均 10ms 接收到 150 条数据,这样 使用异步 api 的最好的办法平均 1 秒不能 补救 11500 条数据,整个 Flink 任务的性能提高了 5 倍。对于每秒 10 万数据量的业务,这里仅时需申请 20 颗 CPU 资源即可。关于异步 api 的具体使用,不能 根据场景具体设计,这里不完全讨论。

MultiThreadConsumerSink 具体代码如下所示:

如何实现呢?时需借助 CyclicBarrier。CyclicBarrier 会让所有多多线程 都等候某个操作完成后才会继续下一步行动。在这里不能 使用 CyclicBarrier,让 Checkpoint 等候所有的 client 将数据缓冲区的数据完全消费完并对 client 执行过 flush 操作,言外之意,offset 111500 就让的数据时需完全消费完成才允许 Checkpoint 执行完成。好多好多 就不能 保证 Checkpoint 时不用有数据被缓处于内存,不能 保证数据源 offset 111500 就让的数据都消费完成。

对于一些不支持异步 api 的场景,肯能未必能使用上述优化方案,同样,为了提高 CPU 使用率,不能 在 Flink Sink 端使用多多多线程 的方案。如下图所示,不能 在 Flink Sink 端开启 5 个请求第三方服务器的 Client 多多线程 :Client1、Client2、Client3、Client4、Client5。

Sink 算子的单个并行度内现在有 5 个 Client 用于消费数据,但 Sink 算子的数据都来自于上游算子。如下图所示,有一一十个 简单的实现最好的办法是 Sink 算子接收到上游数据后通过轮循或随机的策略将数据下发给 5 个 Client 多多线程 。

这样 上述 Sink 能保证 At Lease Once 吗?言外之意,上述 Sink 会丢数据吗?

分析到这里,亲戚亲戚亲戚我们我们我们 设计的 Sink 终于不能 保证不丢失数据了。对 CyclicBarrier 不了解的同学请 Google 或百度查询。再次强调这里多多多线程 的方案,不仅限于请求第三方接口,对于非 CPU 密集型的任务都不能 使用该方案来提高 CPU 利用率,且该方案不仅限于 Sink 算子,各种算子都适用。本文主要希望帮助亲戚亲戚亲戚我们我们我们 理解 Flink 中使用多多多线程 的优化及在 Flink 算子中使用多多多线程 如何保证不丢数据。

十个 多多线程 共用同有一一十个 队列完美地补救了单个多多线程 消费慢的问题图片图片,当 Client3 多多线程 阻塞时,不影响一些多多线程 从队列中消费数据。这里使用队列还起到了削峰填谷的作用。

MultiThreadConsumerSink 实现了 CheckpointedFunction 接口,在 open 最好的办法中增加了 CyclicBarrier 的初始化,CyclicBarrier 预期容量设置为 client 多多线程 数加一,表示当 client 多多线程 数加有一一十个 多多线程 都执行了 await 操作时,所有的多多线程 的 await 最好的办法才会执行完成。这里为那先 要加一呢?肯能除了 client 多多线程 外, snapshotState 最好的办法中也时需执行过 await。

从数据缓冲队列中 poll 数据时,增加了 timeout 时间为 150ms。肯能从队列中拿到数据,则执行消费数据的逻辑,若拿不能数据说明数据缓冲队列中数据消费完了。此时时需判断是算是有等候的 CyclicBarrier,肯能有等候的 CyclicBarrier 说明此时正在执行 Checkpoint,好多好多 client 时需执行 flush 操作。flush 完成后,Client 多多线程 执行 barrier.await() 操作。当所有的 Client 多多线程 都执行到 await 时,所有的 barrier.await() 一定会被执行完。此时 Sink 算子的 snapshotState 最好的办法就会执行完。通过一些策略不能 保证 Checkpoint 时将数据缓冲区中的数据消费完,client 执行 flush 操作不能 保证 client 端不用缓存数据。

异步请求相比同步请求而言,优化点在于每次发出请求时,不时需等候请求响应后再发送下一次请求,好多好多 当下一批次的 150 条数据准备好就让,直接向第三方服务器发送请求。每次发送请求后,Flink Sink 算子的客户端时需注册监听器来等候响应,当响应失败时时需做重试肯能回滚策略。

原理明白了,具体代码如下所示,首先是消费数据的 Client 多多线程 代码,代码逻辑很简单,一个劲从 bufferQueue 中 poll 数据,取出数据后,执行相应的消费逻辑即可,在本案例中消费逻辑便是 Client 积攒批次并调用第三方 api。

MultiThreadConsumerClient 具体代码如下所示:

平均下来,每 150 ms 向第三方服务器发送 150 条数据,也好多好多 每个并行度 1 秒钟补救 11150 条数据。假设当前业务数据量为每秒 10 万条数据,这样 Flink Sink 算子的并行度时需设置为 1150 不能正常补救线上数据。从 Flink 资源分配来讲,1150 个并行度时需申请 1150 颗 CPU,就让当前 Flink 任务时需占用集群中 1150 颗 CPU 以及不少的内存资源。请问此时 Flink Sink 算子的 CPU 肯能内存压力大吗?

如何保证数据不丢失呢?很简单,不能 在 Checkpoint 时强制将数据缓冲区的数据完全消费完,并对 client 执行 flush 操作,保证 client 端不用缓存数据。

实现思路:Sink 算子不能 实现 CheckpointedFunction 接口,当做 Checkpoint 时,会调用 snapshotState 最好的办法,最好的办法内不能 触发 client 的 flush 操作。但 client 在 MultiThreadConsumerClient 对应的十个 多多线程 中,时需考虑多多线程 同步的问题图片图片,即:Sink 算子的 snapshotState 最好的办法中做有一一十个 操作,要使得十个 Client 多多线程 感知到当前正在执行 Checkpoint,此时应该把数据缓冲区的数据完全消费完,并对 client 执行过 flush 操作。

代码逻辑相对比较简单,请问上述 Sink 能保证 Exactly Once 吗?