1. SQL Optimizer解析

1. 大数据体系介绍与SQL处理流程

image-20250226115658151

消息队列用来解耦存储和计算

分析引擎都支持SQL,如Spark,MapReduce逐渐开始支持

最终目标是通过SQL处理所有大数据

image-20250226120018246

Parser:词法分析和语法分析已经很成熟了,有很多API,得到AST

image-20250226120053021

Analyzer:检查合法性,把AST变成Logical Plan

逻辑:只是逻辑,没有指定一定要用哪一种算法

  • 三张表连接,一些过滤条件和选择,group by是聚合,top n用堆排序(limit 10)
  • left-deep tree:

image-20250226121421163

查询优化:找到正确和执行代价最小的计划,对效率的帮助很大,有些是NP的。单机可能优化比较简单,大数据:多机、多张表,执行性能相差很大,需要优化

image-20250226121651256

拆分执行计划(F#1->F#7),发送给节点,充分利用并行机制。子树拆分也是一个优化问题

image-20250226121943574

总结:

image-20250226122024639

2. 常见查询优化

image-20250226122208612

RBO

运算符可以拆分(结合律),表连接时顺序可以变(交换律)

image-20250226122317661

优化网络:project选出两列出来

image-20250226122551981

优化规则1列裁剪:用不到的列去掉,可以减少IO和内存占用,只使用需要的列

image-20250226122732798

优化规则2微词下推:filter是谓词,尽早地过滤数据,减少传输和计算开销。下推是有条件的,这里的join不影响

image-20250226122927449

优化规则3传递闭包:没有filter创造filter

image-20250226123033527

优化规则4 Runtime Filter Builder构建哈希表:min-max:右边的数据的最大值和最小值;in-list:0-100的101个数,和100万的1个数,用集合排列;bloom filter:固定大小,不随集合大小改变

  • 传递给左边,减少左边的扫描量

image-20250226123852279

缺点是不保证得到最优的执行计划

image-20250226124102247

CBO

用模型来估算执行代价->拆解成所有算子,得到所有可能得执行计划

公式比较多,每个系统不一样

image-20250226124149754

选择率:对于过滤条件,会返回多少比例得数据

基数:算子需要处理得行数

image-20250226124500748

三种方式在实际中都会用到,用于统计信息便于推导代价最小的计划

image-20250226124615611

fs是filter selectivity 选择率

image-20250226125232467

有些列是有关联性的,比如车型和制造商,不能按照独立性相乘,需要特殊处理此类统计信息

image-20250226125433881

得到统计信息后,如何找到代价最小的执行计划?枚举:贪心或动态规划,全局最优->局部最优

image-20250226125547741

计算子问题的代价:如三表连接先用两表连接(两种连接方式),选最好的

image-20250226125718651image-20250226125812528

image-20250226125849454

大表和小表连接开销会很大:

image-20250226125941327

CBO可以减少数据量(这个例子是spark的)

image-20250226130101517

有一些query在CBO下有很好的提升

image-20250226130140889

image-20250226130202524

大数据下CBO重要

image-20250226130223416

3. 社区开源实践

Calcite是Java,Orca是C++

Clickhouse只有简单的RBO

image-20250226130352034

作为通用的SQL查询引擎

核心是Query Optimizer

  • Metadata是统计信息
  • Pluggable Rules是插件,可以不同系统定义不同的规则

image-20250226130504128

RBO:遍历树,等价替换成新的规则

把Logical Filter拆成两个Filter

image-20250226130644351

CBO:求每个子问题的最优,用memo记录,初始每个算子放在一个group里

image-20250226160548268

从上往下遍历,才能得到完整的执行计划

image-20250226160814220

最优解是递归退出来的,best1->best3…

image-20250226160855029

剪枝:避免遍历所有空间,如果已经超过上限了就不看了

image-20250226160937856

Top-down:注意是全局最优,所以第二行选了cost=80

image-20250226161108310

image-20250226161823187

4. 前沿趋势

视频是22年录的

image-20250226161911379

趋势:

数据仓库:预先定义表和模型,对原始日志进行管理,存入:对业务迭代不太好,因为固定结构,不包含原始日志。数据湖就比较灵活,希望两者结合。联邦查询。

image-20250226161956448

用AI进行SQL查询优化。建索引、视图根据经验。

在数据库里嵌入AI算法,让用户学习成本更少

image-20250226162224857

优化器必不可少,很重要

image-20250226162428313

image-20250226162451687

2. 流/批/OLAP 一体的Flink引擎介绍

Flink:实时分析

不仅有结构化数据,还有音频、视频等非结构化数据。

image-20250226164634053

image-20250226164713342

随着数据规模扩大,Google发布了三个论文:HDFS/MD/离线计算。

Spark:MD的数据罗盘,Spark内存迭代;MD有两个算子,api比较固化,Spark支持sql

Flink:流式数据处理实时数据,支持SQL

image-20250226164842810

image-20250226164910660

批式数据处理的是静态数据,一批到了才处理

流式计算:水龙头,对实时数据,动态无边界,持续不断地运行

image-20250226165016127

image-20250226165044756

image-20250226165113449

能否做到精确一次计算

状态:Flink之前,中间结果会存在外部数据库,现在不用依赖外部系统,用引擎完成处理需求

SQL很重要,对业务维护成本低

image-20250226165345770

可以基于无边界(Kafka消息队列,流数据)和有边界数据集(批数据)进行处理,既可以处理流也可以处理批数据

  • Dataflow思想

image-20250226165540086

可以部署在k8s等

可以支持图计算

2. Flink整体架构

有些场景SQL写不了,用DataStream的Java API,Python是实现一些ML解决方案的基础

Task之间会有一些数据交换,使用Shuffle Service

状态数据

SDK是Software Development Kit(软件开发工具包),可以同时使用不同的SDK

image-20250226170251459

JM:恢复task重新调度

TM:TM之间有数据交换的需求

Client端会将用户代码抽象为逻辑的执行图,JM将逻辑执行图转化为物理执行图,调度到不同的Worker节点执行

image-20250226170458984

Dispatcher:

image-20250226170718110

从Kafka消息队列中读流数据

timeWindow是10s一次,“sink”(接收器)指的是数据处理流水线的末端,负责将处理后的数据输出到外部系统 。

image-20250226171137223

image-20250226171227807

配置情况:sink 算子的并发配置为 1,意味着只有 1 个任务处理数据输出;其余算子(Source、map、keyBy/window/apply)并发为 2,表示这些算子分别有 2 个并行的子任务来处理数据。不同子任务间通过 Stream Partition 传输数据。而 Sink 算子只有一个子任务,它会接收来自上游两个并行子任务处理后的数据进行最终输出 。 这种配置体现了数据处理过程中不同阶段的并行程度差异。

下图是从逻辑图(精简视图)到并发视图的过程

image-20250226171246818

source和map可以chain在一起,可以在一个线程,读到数据就马上map,不存在线程的切换、数据序列化和反序列化,chain在一起后提升效率(减少了数据在TM缓冲区的交换,减少延迟提高吞吐率)

什么算子可以chain在一起是有一些判断条件的

image-20250226171856667

TM申请了一些资源,一个TM包括几个slot是用户自己定义的,一个slot是一个独立的线程,slot之间的cpu和内存没有完全隔离,有一定隔离

image-20250226172326366

上面就是完整的一段代码从精简试图到TM中slot这样具体结构的过程

image-20250226172447453

流批一体

image-20250226172600188

1是流处理,2是批处理

image-20250226172637828

数据源:app的行为数据、活动数据等

实时Flink:统计一些数据,放到服务层

离线:查到一些用户的数据,提供服务

image-20250226190412597

image-20250226190519776

image-20250226190531392

如何实现流批一体:

image-20250226190634103

批式数据是特殊的流式数据,因为是有界的

image-20250226190826874

如何支持:对于流式数据,按照时间段切成有边界

image-20250226191052153

SQL:支持有边界和无边界的输入

DataStream:Java/Scala

调度层:流调度、批调度

image-20250226191118075

Scheduler:DAG指有向无环图,可以用来表示这些任务之间的依赖关系和执行顺序,其中节点代表任务,边代表任务之间的依赖方向,且不存在循环依赖,这样可以保证任务按照正确的顺序执行。

image-20250226191308480

EAGER:流场景,一个作业需要全部的资源

LAZY:批场景,先处理上游,处理完后资源会释放掉

image-20250226191555795

哈希Shuffle

需要12个任务的资源全部拿到后,才能开始工作

image-20250226224718085

因为A1数据是有可能结束的(和流不同),所以可以一个个工作,复用资源

image-20250226224817543

上文是1.12版本之前,下文是最新的调度机制:

image-20250226224949992

BLOCKING:A1的数据会写入磁盘,不是实时传给B1,B1再从磁盘拿,实时性不强,用于批处理

pipeline region:如果有BLOCKING,则认为是2个pipeline region

PIPELINED:用于流处理,数据没有中间被存储下来,性能高

image-20250226230223963

Shuffle

用于连接上下游数据交互的过程是Shuffle

A->B,B->C,C->D,只要是涉及上下游衔接就叫Shuffle

image-20250226230424077

image-20250226230533191

流作业的Shuffle和Task是绑定的,因为Shuffle存在内存钟,而批处理的数据存在磁盘里面了

机器挂机了后批处理重跑的代价高,所以批处理数据有时会复制几份

image-20250226230859356

Flink提出了满足两个模式的Shuffle架构

image-20250226231055864

image-20250226231113573

image-20250226231201291

image-20250226231548760

抖音红包:如红包雨的数据可以决定下一场红包雨的时间

image-20250226231603848

image-20250226231631599

OLAP需要高并发

image-20250226231648742

image-20250226232052906

OLAP是特殊的批处理

image-20250226232124568

image-20250226232151570

Flink支持OLAP

image-20250226232229279

Flink可以从很多数据源读取数据

image-20250226232254910

挑战:面向s级,还要高并发,频繁申请内存和磁盘如何处理

QPS:并发调度和执行能力

image-20250226232334110

Gateway:和前文Flink Client有点像,给SQL语法解析优化,然后生成DAG图

Session Cluster:

  1. Per-Job模式的集群,一个集群只运行一个作业,作业执行完毕则集群销毁。
  2. Session-Cluster模式的集群,一个集群中运行多个作业

image-20250226232409848

OLAP场景的难点

一些字节内部的设想与提升方向:

image-20250226233918817

image-20250226234008580

image-20250226234119998

image-20250226234155026

总结:

image-20250226234252551

使用案例

image-20250226234404672

image-20250226234420426

image-20250226234452117

希望把流批变成一体,而不是现在分开

image-20250226234544252

image-20250226234611821

  • OLTP(联机事务处理,Online Transaction Processing):主要用于处理日常的、高并发的事务性操作,像电商系统中的下单、支付,银行系统中的转账等。图中 app data 对 OLTP 进行 Update 操作,就是典型的联机事务处理场景,注重数据的实时性、一致性和事务完整性。
  • Hive:是基于 Hadoop 的数据仓库工具。可以将结构化的数据文件映射为一张数据库表,并提供类 SQL 的查询功能(HiveQL),用于处理大规模的离线数据。图中 OLTP 通过 binlog(数据库二进制日志)将数据传输到 Hive,之后经过 ETL(Extract,Transform,Load,即数据抽取、转换和加载)处理,再存入 Clickhouse 进行查询分析,体现了 Hive 在离线数据处理流程中的作用。
  • HTAP(混合事务分析处理,Hybrid Transactional/Analytical Processing):融合了 OLTP 和 OLAP(联机分析处理)的功能,既能处理事务性操作,又能进行实时分析。图中走 HTAP 之后的链路,相比原来链路减少了中间环节,Flink 可以直接基于 HTAP 提供数据查询与分析能力,且延迟达到毫秒级,大大提升了数据处理和分析的效率 。

以前在线和离线是完全分开的,Hive是离线的数据仓库,小时/天级别的延时

现在查询、分析都能在HTAP中,秒级别,很快

image-20250226234758840

image-20250226234902450

3. Exactly Once语义在Flink中的实现

1. 数据流和动态表

image-20250227153740006

端到端的exactly once是什么

image-20250227154201135

流是无限的,不能被完整访问

传统批处理的处理时间一定是可以终止的,流查询是永远不会终止的

image-20250227154414106

表和流可以动态进行转化

image-20250227154451972

查询的语义跟批处理完全一样(相当于某一刻时间静止),只是一直在进行连续查询,不会终止

image-20250227154646182

image-20250227154929512

第一个sql统计用户在网站查询的所有次数,第二个统计用户在每个小时段内的点击次数

image-20250227155501111

Mary点击1次,后面又有,下游要更新,如何?发retract进行回撤,再发更新后的消息Mary,2

image-20250227155624995

SQL连续查询时会建立clicks的数据表,作为状态保存中间计算结果,当新的数据到来时就可以更新

查询结果是动态变化的

image-20250227155945420

消费:从消息队列或流式数据源中读取并处理数据的过程。消费者(Consumer)是负责执行这一过程的实体,它们从生产者(Producer)发送的数据中提取信息,并根据需要进行处理、分析或存储。

At-most-once:出现故障时啥也不做,下游数据可能丢失,可以保证所有时间都处理数据流,时延最低,对数据准确性不高就可以用

At-least-once:虽然存在数据重复处理的风险,但它确保了数据不会丢失。这种语义适用于可以容忍少量数据重复,但不能接受数据丢失的场景

Exactly-once:提供了最高的数据处理准确性和一致性,但实现起来相对复杂,通常需要结合状态管理和一致的检查点机制。

image-20250227160418770

2. Exactly-Once 和 Checkpoint

设置状态保存的时间点作为故障恢复点进行备份,如果有故障回退到此刻

  • sum_even和sum_odd分别负责计算偶数和奇数的总和
  • Recovery 1: Restart application 如果直接重启应用而不使用检查点,那么sum_odd任务会丢失之前的状态信息。在这种情况下,当新的数据到来时,sum_odd将从零开始重新计算,这会导致数据不一致。
  • Recovery 2: Reset application state from Checkpoint 利用检查点进行恢复。JobManager从最近的检查点加载状态信息,并重置所有任务的状态。这样,sum_odd任务可以恢复到故障前的状态,继续处理数据。此时,sum_odd的值仍然是9,而不是从零开始。
  • Recovery 3: Continue processing 恢复完成后,系统继续处理新的数据。例如,当新的数字8到来时,sum_even将其累加到当前值上,得到12;而sum_odd则保持不变,因为8是偶数。最终,当另一个奇数到来时,sum_odd将继续累加。

image-20250227162241041

时间点不能选择任意的时间点,要等到所有处理逻辑消费完成

这个算法要等所有逻辑消费完成,效率差,所以提出Chandy-Lamport算法

image-20250227162609685

Chandy-Lamport算法

两个source,并行独立处理各自的数据,统计偶数累加和和奇数累加和

image-20250227163943244

JM会往下游发送Checkpoint Barrier,将数据流分割成多个部分,这些部分代表了不同检查点周期的数据。

当一个算子(Operator)从其输入流接收到barrier时,它不会立即继续处理后续的数据,而是等待从所有输入流接收到对应于同一检查点的barrier。这意味着该算子会暂停处理位于barrier之后的数据,直到所有的输入流都达到了相同的检查点边界。

image-20250227165204164

一旦所有输入流的barrier都已经到达,算子会触发自己的状态快照操作,将当前的状态保存下来。这个状态包括所有中间计算结果,如聚合值或窗口内容等。完成状态快照后,算子会向下游发送barrier,允许下游算子也执行相同的过程。这确保了整个作业图中的所有算子都能同步地创建一致的状态快照。

image-20250227165256795

在所有必要的状态快照完成后,算子将继续处理那些被暂时搁置的数据记录,即位于barrier之后的数据。

image-20250227165405657

barrier下发后就可以数据处理了,barrier和数据处理是解耦的

image-20250227165557365

所有算子的状态都保存好了

image-20250227165912844

异步将快照保存到远端

用Checkpoint保证故障发生时有exactly once,保证数据只被消费一次

image-20250227170015612

3. 端到端 Exactly-Once 实现

sink是往下游发数据的,恢复的时间这段时间sink会重复下发,这是不对的,如银行重复存储

image-20250227170334553

两阶段提交协议:分布式协议

事务性(原子性):要不全部执行操作,要不全部不执行

image-20250227171110760

预提交阶段:没有真正提交

image-20250227171217810

提交阶段:

image-20250227171313416

协作者:管理和统筹所有协议

参与者:参与决策

Kafka:2维数组,有序,无限长,从头读数据,新数据存入尾

image-20250227171446598

第一步:协作者告诉参与者要准备提交了,类似checkpoint barrier,状态制作的开始

image-20250227171652141

第二步:告诉剩下的参与者要准备提交

image-20250227171721627

提交结束后,向JM说提交完成了,JM再发说所有节点提交成功了

image-20250227171758226

有一个算子失败,就会回滚

一旦有延时和数据慢,会堵塞,要避免

image-20250227172014422

image-20250227172630560

简单实现的Exactly-Once:

image-20250227172732534

问题:实际上是At-least-once

image-20250227172915797

用Flink解决以上问题,实现严格的端到端Exactly-Once

image-20250227173056828image-20250227173241683

4. 流计算中的Window计算

1. 流式计算的概述

SQL是主导,Flink在字节也用的非常多

企业数仓围绕Hadoop,每天凌晨准备好数据,以天为单位,流式计算是以秒为级别

资源模型:指使用的计算资源是什么,批跑完任务就结束了,所以是定时的,流的计算资源是长期使用,没有释放和申请,一直在使用

image-20250227175800336

如何更实时?从天变成小时,技术上没问题,申请/释放的周期调度需要资源,数仓需要分层,数据产生到计算完成有时在1小时做不到

image-20250227180041389

image-20250227180114942

时间窗口:数据进来就直接计算,时间结束就下发数据

image-20250227180241467

引入Watermark:表示数据处理到哪里了,表示窗口结束

image-20250227181006448

image-20250227181111396

2. Watermark 基本机制概述

对比机器时间:现在是10:30,但真实事件时间可能是昨天

image-20250227181321389

从原始时间-5s/-20s作为Watermark,表示事件时间

image-20250227181350613

33会传递W(33)给29

window1取上游传递数值的最小值,也就是14

image-20250227181430554

Flink应用有数据输入,没有数据输出:很有可能是Watermark不正常

image-20250227181536932

上图一个source消费两个partition,多个partition可能加重乱序

新版本取最小值,解决问题

image-20250227181704346

白天有数据,晚上没数据,让断流了怎么办:设置超时时间,忽略这个subtask

image-20250227181835940

迟到数据:晚于watermark

image-20250227182009236

image-20250227182118604

3. Window 基本功能和高级优化

窗口的目的:将无限流数据划分为有限块、支持基于时间或数量的聚合操作

累计窗口:如0-1点累计是多少

image-20250227182411493

SQL大部分的算子是自己优化过的,并不是直接用window

image-20250227182443583

图里有14个窗口,3有4个,12有5个

image-20250227215027682

image-20250227215134694前面两个窗口是根据时间划分的,会话窗口不是,并不确定数据所处的窗口,划分是动态过程,可能与其他数据合并

image-20250227215326185

迟到数据:只有事件时间才会迟到,来了就算的模式不算迟到

image-20250227215642052

image-20250227215832419

增量:每条数据来了就计算,保存的值少一些,如只保留中间结果sum

全量计算:需要大量的缓存,积累一定数据后再一起计算

image-20250227221817451

窗口结束才触发,窗口时间可以设置

EMIT允许中间结果可以输出多次,防止窗口比较大,实时计算慢

FIRE跟案例场景一致,相当于保留中间结果

image-20250227221930349

image-20250227223859964

高级优化

解决中间结果偏多和访问频繁的问题

image-20250227223938107

mini-batch:将多条数据打包成一个批次(Mini-batch)进行处理,减少状态操作的频率。

image-20250227224203268

倾斜问题通常发生在数据分布不均匀的情况下,导致某些任务需要处理大量数据,而其他任务则相对空闲,从而影响整体处理效率。

局部聚合会在本地对数据进行初步的聚合操作,减少后续传输的数据量。

  • 由于已经进行了局部聚合,每个Global Agg节点接收到的数据量大大减少,避免了单个节点过载的问题。

image-20250227224356513

每个COUNT(DISTINCT item_id) FILTER (WHERE flag IN (…))都需要独立地计算唯一值,这会导致大量的重复计算和资源浪费。

distinct(去除查询结果中的重复记录)有时会优化为group by,而flink有窗口,不能随便优化

image-20250227224513054

TODO:这里看不懂image-20250227224653124

同时属于1和2,开两个窗口,让两个窗口都计算,数据参与2个,这样计算量会大,能否划分成更小的粒度?

image-20250227225035240

image-20250227225132071

image-20250227225209715

4. 流计算相关案例分析

日活曲线:每个分钟都有点,看从早到晚DAU(Daily Active Users,即日活跃用户数量)的变化过程

image-20250228195007037

思路:滚动窗口,每隔一段时间输出

这个SQL的问题:不能进行全局的聚合

image-20250228195147921

改进:变成2阶段,每个桶进行distinct uid

第一部分聚合后数据量已经小了

image-20250228195231393

image-20250228195403919

大数据任务特指离线计算,尽可能快的在任务结束后拿到资源开销

image-20250228195628620

image-20250228195713197

image-20250228195835634

5. Spark原理与实践

1. 大数据处理引擎 Spark 介绍

云原生方向

image-20250228201342007

存储要选择合适的格式,可以存到HDFS也可以存到Kafka

image-20250228201943605

image-20250228202018528

Spark的几个feature:仪表盘、机器学习

image-20250228202108594

image-20250228202136841

image-20250228202217188

image-20250228202241009

多语言支持,很多的资源调度

image-20250228202327938

image-20250228202346313

RDD:弹性数据集,对任何函数的计算可以看作一个算子

DataFrame:类似传统的二维表格

image-20250228202421308

CM:监控Worker节点,负责支援调度

Worker:从节点,可称为Node Manager

任务:只有一个Driver

Executor:实际执行任务

image-20250228202556297

–master表示委托给谁进行资源管理

image-20250228202715296

bin目录有很多提交命令,后面可以跟许多参数image-20250228202727473

image-20250228202822105

线上时,大部分用Spark scala来提交

image-20250228202847034

image-20250228202901554

image-20250228202954855

2. SparkCore 重要机制解析

image-20250228203132524

RDD:创建时要指定分区;每个RDD有计算函数;每个RDD依赖别人,像pipeline一样

image-20250228203236090

创建RDD:

image-20250228203340910

image-20250228203355630

窄依赖:父RDD至多对应一个子RDD

image-20250228203520196

image-20250228203608716

遇到宽依赖,就划分stage,不然加入stage

image-20250228203727809

image-20250228203759942

内存机制

栈(Stack):栈是一种后进先出(LIFO, Last In First Out)的数据结构。栈上的内存分配和释放速度通常比堆上快得多。这是因为栈的操作仅仅是移动栈顶指针来分配或释放内存,这个过程非常迅速且高效。此外,栈上的数据存储位置是在进程的调用栈中,这使得对栈内数据的访问也很快,因为这些数据通常是连续存储的,能够更好地利用CPU缓存。每当一个函数被调用时,一个新的“栈帧”(也叫活动记录)就会被创建并压入调用栈中。
堆(Heap):堆则是一个更复杂的自由存储区,允许动态分配内存。与栈不同,堆上的内存分配(如通过malloc()或new操作)涉及到搜索可用空间并将该块标记为已使用的过程,这通常比简单地调整栈指针要复杂和耗时。同样,释放堆上的内存也可能需要额外的操作来维护堆的结构。因此,堆的分配和释放速度较栈慢。

当一个程序执行时,操作系统会为该程序分配一定的内存空间,这部分内存包括了代码区、数据区、堆区以及栈区等。

这些是堆内存,off-heap也有,比heap更好管理

image-20250228203846236

存储和执行内存是可以互相借用的

一个Executor可以允许多个Task

image-20250228205211578

Shuffle

Shuffle是指数据重新分配的过程,这个过程通常发生在并行计算任务之间,目的是将数据按照某种规则重新分组或排序,以便后续的任务可以基于这些重组后的数据进行处理。Shuffle是实现许多常见操作(如groupByKey、reduceByKey、join等)的关键步骤。

image-20250228205241755

image-20250228205248878

image-20250228205337298

3. SparkSQL 原理解析

优化逻辑计划->选择最佳的物理执行计划:基于规则的优化等

image-20250228205450079

Catalyst:生成可以执行的代码

image-20250228205608143

image-20250228205642436

RBO:经验式、启发式,谓词下推image-20250228205738770

CBO:依赖数据库对象的统计

image-20250228205848000

自适应查询:执行中检测哪些可以进行优化,边执行边优化并有反馈

  • 动态合并Shuffle分区
  • 动态切换join策略
  • 处理状态数据倾斜的问题

image-20250228210032816

partition要适中,刚开始shuffle较大,然后合并相邻的(下图从5->3)

实现时开启参数即可

image-20250228210058331

image-20250228210220515

image-20250228211813688

image-20250228211919341

image-20250228212036945

常规代码是遍历语法树,每个节点都要进行类型匹配,会额外创建很多对象:没必要,只是一个表达式

image-20250228212302721

每个filter只用关注自己的逻辑image-20250228212322600

image-20250228212433921

4. Spark 在业界的挑战与实践

image-20250228212527860

image-20250228212613620

近些年的优化关注CPUimage-20250228212650778

image-20250228212741051

image-20250228212754641

image-20250228212804049

image-20250228212827340

6. 大数据Shuffle原理与实践

1. Shuffle 概述

从Oracle单机引擎到现在的分布式MapReduce

把数据拆分成小份partition,做并发处理

  • 对不同颜色的数据分别处理
  • Shuffle:移动数据

image-20250228212926664

image-20250228214733621

image-20250228213038555

image-20250228213047273

不光有网络连接,还有网络请求

可能要以颜色进行排序

序列化消耗大量CPU

image-20250228213117696

image-20250228213158568

2. Shuffle 算子

repartition:改变分区

join:本身不在一起的数据放在一起

image-20250228213241259

image-20250228213800653

需要数据移动就使用shuffle

image-20250228213840300

image-20250228213904374

image-20250228213914912

numberPartitions:一共有多少分区,get:给一个k返回所在分区

image-20250228213928871

image-20250228213959819

3. Shuffle 过程

tradeoff问题

image-20250228214042252

写满了就把file写入磁盘,同时打开和生成的文件都很多image-20250228214106068

image-20250228214138813

不是给每个partition一个buffer,现在是一个task一个buffer,包含所有数据

image-20250228214204311

image-20250228214237430

image-20250228214249605

image-20250228214341128

image-20250228214354668

最终生成两个文件。没排序,只适合数据较少时

OS中的zero copy?

image-20250228214402762

unsafe:没有垃圾回收的开销

image-20250228214458227

image-20250228214526189

有排序和combine,和mapreduce有点类似

image-20250228214549493

image-20250228214641991

image-20250228222759637

image-20250228222816443

image-20250228222830694

image-20250228222901250

image-20250228222917770

image-20250228222934278

做wordcount,右侧shuffle的数据量多,左边提前算了hello的count,算子是有优先级的。如果能聚合尽量做聚合

image-20250228223000283

image-20250228223051795

倾斜:某个reduce数据非常多,其他很少,因为总时间取决于最大的作业

image-20250228223104839

image-20250228223200757

84万文件,5千task,数据量非常大

image-20250228223259439

优化后只有5000个partition,shuffle数据量变少(因为单个task处理的变多了),有combine用conbine

减缓随机读的要求

image-20250228223412900

4. Push Shuffle

大量随机读的问题:用push聚合在一起,写在远程buffer

image-20250228230604240

image-20250228230657119

image-20250228230717237

image-20250228230745295

image-20250228230800600

新的问题:数据丢失的成本变多了

自己进行数据备份

image-20250228230831689

image-20250228230915517

image-20250228230929291

image-20250228230957434

image-20250228231011793

image-20250228231032385

7. Presto架构原理与优化浅析

1. 大数据与 OLAP 的演进

不等价于,如何提取想要的数据

image-20250228231155520

image-20250228231258351

OLAP:多维分析

image-20250228231338410

可以以时间为单位、分析某一类产品image-20250228231443236

image-20250228231518104

image-20250228231548422

image-20250228231635252

image-20250228231641396

2. Presto 基本原理与概念

黄色:数据源

绿色:Presto的服务

蓝色:用户 根据需求最后会得到结果

image-20250228231727723

多数据源引擎,每个connector是适配多数据源的接口,可以联邦查询

image-20250228231805656

stage之间用shuffle来区分

pipeline根据算子拆分成不同的涵义

image-20250228231845202

image-20250228231909557

stage内部可以rehash,类似进程和线程的关系

image-20250228231957420

image-20250228232040113

image-20250228232114994

浅绿色负责桥接

image-20250228232132686

image-20250228232154376

SHUTDOWN:不是worker本身关闭了就关闭,属于想要关闭,本现在在跑的task尽量处理完,处理不完到达超时时间了也会结束。image-20250228232216664

image-20250228232235343

考虑到task下所有的pipeline,最后乘以worker数(因为一个worker调度很多task)

image-20250228232317925

3. Presto 重要机制

image-20250228232603263

image-20250228232616313

image-20250228232643156

image-20250228232655346

image-20250228232714038

image-20250228232739440

默认同时调度,符合流式数据处理的特点image-20250228232748991

image-20250228232817762

image-20250228232846292

image-20250228232857526

image-20250228232909791

image-20250228232928548

image-20250228232940574

image-20250228232956396

image-20250228233025309

image-20250228233039317

image-20250228233049147

image-20250228233104292

image-20250228233128867

image-20250228233156167

4. Presto 性能优化实战

image-20250228233224935

image-20250228233241941

image-20250228233255542

通过url可以直接查询task的状态

image-20250228233309656

image-20250228233334712

image-20250228233351963

image-20250228233419156

image-20250228233433534

image-20250228233441837

把配置文件copy,会产生很多rehash

image-20250228233509317

image-20250228233608788

image-20250228233716814

image-20250228233728499

image-20250228233858490

image-20250228233924305

image-20250228233748564

image-20250228233759250

image-20250228233811561