当前位置:首页 > 软件应用 > 正文

==Presto实现原理和美团的使用实践

摘要: ==Presto实现原理和美团的使用实践最佳答案53678位专家为你答疑解惑==Presto实现原理和美团的使用实践-Prest...

==Presto实现原理和美团的使用实践

最佳答案 53678位专家为你答疑解惑

==Presto实现原理和美团的使用实践 -

Presto实现原理和美团的使用实践 -http://tech.meituan.com/presto.html

Facebook的数据仓库存储在少量大型Hadoop/HDFS集群。Hive是Facebook在几年前专为Hadoop打造的一款数据仓库工具。在以前,Facebook的科学家和分析师一直依靠Hive来做数据分析。但Hive使用MapReduce作为底层计算框架,是专为批处理设计的。但随着数据越来越多,使用Hive进行一个简单的数据查询可能要花费几分到几小时,显然不能满足交互式查询的需求。Facebook也调研了其他比Hive更快的工具,但它们要么在功能有所限制要么就太简单,以至于无法操作Facebook庞大的数据仓库。2012年开始试用的一些外部项目都不合适,他们决定自己开发,这就是Presto。2012年秋季开始开发,目前该项目已经在超过 1000名Facebook雇员中使用,运行超过30000个查询,每日数据在1PB级别。Facebook称Presto的性能比Hive要好上10倍多。2013年Facebook正式宣布开源Presto。本文首先介绍Presto从用户提交SQL到执行的这一个过程,然后尝试对Presto实现实时查询的原理进行分析和总结,最后介绍Presto在美团的使用情况。Presto架构

presto架构图

Presto查询引擎是一个Master-Slave的架构,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行。Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。如果配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。Presto执行查询过程简介既然Presto是一个交互式的查询引擎,我们最关心的就是Presto实现低延时查询的原理,我认为主要是下面几个关键点,当然还有一些传统的SQL优化原理,这里不介绍了。完全基于内存的并行计算流水线本地化计算动态编译执行计划小心使用内存和数据结构类BlinkDB的近似查询GC控制

为了介绍上述几个要点,这里先介绍一下Presto执行查询的过程提交查询用户使用Presto Cli提交一个查询语句后,Cli使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。

提交查询

SQL编译过程Presto与Hive一样,使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。如下图中所示从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,这里不详细介绍SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。

SQL解析过程 样例SQL:select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id=c2.id where c1.id > 10 group by c1.rank limit 10; 逻辑执行计划

上面的SQL语句生成的逻辑执行计划Plan如上图所示。那么Presto是如何对上面的逻辑执行计划进行拆分以较高的并行度去执行完这个计划呢,我们来看看物理执行计划。物理执行计划逻辑执行计划图中的虚线就是Presto对逻辑执行计划的切分点,逻辑计划Plan生成的SubPlan分为四个部分,每一个SubPlan都会提交到一个或者多个Worker节点上执行。SubPlan有几个重要的属性planDistribution、outputPartitioning、partitionBy属性。PlanDistribution表示一个查询Stage的分发方式,逻辑执行计划图中的4个SubPlan共有3种不同的PlanDistribution方式:Source表示这个SubPlan是数据源,Source类型的任务会按照数据源大小确定分配多少个节点进行执行;Fixed表示这个SubPlan会分配固定的节点数进行执行(Config配置中的query.initial-hash-partitions参数配置,默认是8);None表示这个SubPlan只分配到一个节点进行执行。在下面的执行计划中,SubPlan1和SubPlan0 PlanDistribution=Source,这两个SubPlan都是提供数据源的节点,SubPlan1所有节点的读取数据都会发向SubPlan0的每一个节点;SubPlan2分配8个节点执行最终的聚合操作;SubPlan3只负责输出最后计算完成的数据。OutputPartitioning属性只有两个值HASH和NONE,表示这个SubPlan的输出是否按照partitionBy的key值对数据进行Shuffle。在下面的执行计划中只有SubPlan0的OutputPartitioning=HASH,所以SubPlan2接收到的数据是按照rank字段Partition后的数据。

物理执行计划

完全基于内存的并行计算查询的并行执行流程Presto SQL的执行流程如下图所示Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行

查询执行流程

上面的执行计划实际执行效果如下图所示。Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点(图中蓝色箭头)SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点所有SubPlan2节点计算完成后将数据分发到SubPlan3节点SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator

执行计划计算流程

源数据的并行读取在上面的执行计划中SubPlan1和SubPlan0都是Source节点,其实它们读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行,每个Worker节点分配的InputSplit数目上限是参数可配置的,Config中的query.max-pending-splits-per-node参数配置,默认是100。分布式的Hash聚合上面的执行计划在SubPlan0中会进行一次Partial的聚合计算,计算每个Worker节点读取的部分数据的部分聚合结果,然后SubPlan0的输出会按照group by字段的Hash值分配不同的计算节点,最后SubPlan3合并所有结果并输出流水线数据模型Presto中处理的最小数据单元是一个Page对象,Page对象的数据结构如下图所示。一个Page对象包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切的一行是真实的一行数据。一个Page最大1MB,最多16*1024行数据。

数据模型

节点内部流水线计算下图是一个Worker节点内部的计算流程图,左侧是任务的执行流程图。Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中。每个Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4,在config中配置。每个空闲的线程从队列中取出一个PrioritizedSplitRunner对象执行,如果执行完成一个周期,超过最大执行时间1秒钟,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列中。每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行。

节点内部流水线计算 节点间流水线计算下图是ExchangeOperator的执行流程图,ExchangeOperator为每一个Split启动一个HttpPageBufferClient对象,主动向上一个Stage的Worker节点拉数据,数据的最小单位也是一个Page对象,取到数据后放入Pages队列中 节点间流水线计算 本地化计算Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates优先选择与Split同一个Host的Worker节点如果节点不够优先选择与Split同一个Rack的Worker节点如果节点还不够随机选择其他Rack的节点

对于所有Candidate节点,选择assignedSplits最少的节点。动态编译执行计划Presto会将执行计划中的ScanFilterAndProjectOperator和FilterAndProjectOperator动态编译为Byte Code,并交给JIT去编译为native代码。Presto也使用了Google Guava提供的LoadingCache缓存生成的Byte Code。

动态编译执行计划 动态编译执行计划 上面的两段代码片段中,第一段为没有动态编译前的代码,第二段代码为动态编译生成的Byte Code反编译之后还原的优化代码,我们看到这里采用了循环展开的优化方法。循环展开最常用来降低循环开销,为具有多个功能单元的处理器提供指令级并行。也有利于指令流水线的调度。小心使用内存和数据结构使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝,Slice仓库参考:https://github.com/airlift/sliceFacebook工程师在另一篇介绍ORCFile优化的文章中也提到使用Slice将ORCFile的写性能提高了20%~30%,参考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/类BlinkDB的近似查询为了加快avg、count distinct、percentile等聚合函数的查询速度,Presto团队与BlinkDB作者之一Sameer Agarwal合作引入了一些近似查询函数approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法实现。GC控制Presto团队在使用hotspot java7时发现了一个JIT的BUG,当代码缓存快要达到上限时,JIT可能会停止工作,从而无法将使用频率高的代码动态编译为native代码。Presto团队使用了一个比较Hack的方法去解决这个问题,增加一个线程在代码缓存达到70%以上时进行显式GC,使得已经加载的Class从perm中移除,避免JIT无法正常工作的BUG。Presto TPCH benchmark测试介绍了上述这么多点,我们最关心的还是Presto性能测试,Presto中实现了TPCH的标准测试,下面的表格给出了Presto 0.60 TPCH的测试结果。直接运行presto-main/src/test/java/com/facebook/presto/benchmark/BenchmarkSuite.java。benchmarkName cpuNanos(MILLISECONDS) inputRows inputBytes inputRows/s inputBytes/s outputRows outputBytes outputRows/s outputBytes/s count_agg 2.055ms 1.5M 12.9MB 730M/s 6.12GB/s 1 9B 486/s 4.28KB/s double_sum_agg 14.792ms 1.5M 12.9MB 101M/s 870MB/s 1 9B 67/s 608B/s hash_agg 174.576ms 1.5M 21.5MB 8.59M/s 123MB/s 3 45B 17/s 257B/s predicate_filter 68.387ms 1.5M 12.9MB 21.9M/s 188MB/s 1.29M 11.1MB 18.8M/s 162MB/s raw_stream 1.899ms 1.5M 12.9MB 790M/s 6.62GB/s 1.5M 12.9MB 790M/s 6.62GB/s top100 58.735ms 1.5M 12.9MB 25.5M/s 219MB/s 100 900B 1.7K/s 15KB/s in_memory_orderby_1.5M 1909.524ms 1.5M 41.5MB 786K/s 21.7MB/s 1.5M 28.6MB 786K/s 15MB/s hash_build 588.471ms 1.5M 25.7MB 2.55M/s 43.8MB/s 1.5M 25.7MB 2.55M/s 43.8MB/s hash_join 2400.006ms 6M 103MB 2.5M/s 42.9MB/s 6M 206MB 2.5M/s 85.8MB/s hash_build_and_join 2996.489ms 7.5M 129MB 2.5M/s 43MB/s 6M 206MB 2M/s 68.8MB/s hand_tpch_query_1 3146.931ms 6M 361MB 1.91M/s 115MB/s 4 300B 1/s 95B/s hand_tpch_query_6 345.960ms 6M 240MB 17.3M/s 695MB/s 1 9B 2/s 26B/ssql_groupby_agg_with_arithmetic 1211.444ms 6M 137MB 4.95M/s 113MB/s 2 30B 1/s 24B/s sql_count_agg 3.635ms 1.5M 12.9MB 413M/s 3.46GB/s 1 9B 275/s 2.42KB/s sql_double_sum_agg 16.960ms 1.5M 12.9MB 88.4M/s 759MB/s 1 9B 58/s 530B/s sql_count_with_filter 81.641ms 1.5M 8.58MB 18.4M/s 105MB/s 1 9B 12/s 110B/s sql_groupby_agg 169.748ms 1.5M 21.5MB 8.84M/s 126MB/s 3 45B 17/s 265B/s sql_predicate_filter 46.540ms 1.5M 12.9MB 32.2M/s 277MB/s 1.29M 11.1MB 27.7M/s 238MB/s sql_raw_stream 3.374ms 1.5M 12.9MB 445M/s 3.73GB/s 1.5M 12.9MB 445M/s 3.73GB/s sql_top_100 60.663ms 1.5M 12.9MB 24.7M/s 212MB/s 100 900B 1.65K/s 14.5KB/s sql_hash_join 4421.159ms 7.5M 129MB 1.7M/s 29.1MB/s 6M 206MB 1.36M/s 46.6MB/s sql_join_with_predicate 1008.909ms 7.5M 116MB 7.43M/s 115MB/s 1 9B 0/s 8B/s sql_varbinary_max 224.510ms 6M 97.3MB 26.7M/s 433MB/s 1 21B 4/s 93B/s sql_distinct_multi 257.958ms 1.5M 32MB 5.81M/s 124MB/s 5 112B 19/s 434B/s sql_distinct_single 112.849ms 1.5M 12.9MB 13.3M/s 114MB/s 1 9B 8/s 79B/s sql_tpch_query_1 3168.782ms 6M 361MB 1.89M/s 114MB/s 4 336B 1/s 106B/s sql_tpch_query_6 286.281ms 6M 240MB 21M/s 840MB/s 1 9B 3/s 31B/s sql_like 3497.154ms 6M 232MB 1.72M/s 66.3MB/s 1.15M 9.84MB 328K/s 2.81MB/s sql_in 80.267ms 6M 51.5MB 74.8M/s 642MB/s 25 225B 311/s 2.74KB/s sql_semijoin_in 1945.074ms 7.5M 64.4MB 3.86M/s 33.1MB/s 3M 25.8MB 1.54M/s 13.2MB/s sql_regexp_like 2233.004ms 1.5M 76.6MB 672K/s 34.3MB/s 1 9B 0/s 4B/s sql_approx_percentile_long 587.748ms 1.5M 12.9MB 2.55M/s 21.9MB/s 1 9B 1/s 15B/s sql_between_long 53.433ms 1.5M 12.9MB 28.1M/s 241MB/s 1 9B 18/s 168B/ssampled_sql_groupby_agg_with_arithmetic 1369.485ms 6M 189MB 4.38M/s 138MB/s 2 30B 1/s 21B/s sampled_sql_count_agg 11.367ms 1.5M 12.9MB 132M/s 1.11GB/s 1 9B 87/s 791B/ssampled_sql_join_with_predicate 1338.238ms 7.5M 180MB 5.61M/s 135MB/s 1 9B 0/s 6B/s sampled_sql_double_sum_agg 24.638ms 1.5M 25.7MB 60.9M/s 1.02GB/s 1 9B 40/s 365B/s stat_long_variance 26.390ms 1.5M 12.9MB 56.8M/s 488MB/s 1 9B 37/s 341B/s stat_long_variance_pop 26.583ms 1.5M 12.9MB 56.4M/s 484MB/s 1 9B 37/s 338B/s stat_double_variance 26.601ms 1.5M 12.9MB 56.4M/s 484MB/s 1 9B 37/s 338B/s stat_double_variance_pop 26.371ms 1.5M 12.9MB 56.9M/s 488MB/s 1 9B 37/s 341B/s stat_long_stddev 26.266ms 1.5M 12.9MB 57.1M/s 490MB/s 1 9B 38/s 342B/s stat_long_stddev_pop 26.350ms 1.5M 12.9MB 56.9M/s 489MB/s 1 9B 37/s 341B/s stat_double_stddev 26.316ms 1.5M 12.9MB 57M/s 489MB/s 1 9B 38/s 342B/s stat_double_stddev_pop 26.360ms 1.5M 12.9MB 56.9M/s 488MB/s 1 9B 37/s 341B/s sql_approx_count_distinct_long 35.763ms 1.5M 12.9MB 41.9M/s 360MB/s 1 9B 27/s 251B/ssql_approx_count_distinct_double 37.198ms 1.5M 12.9MB 40.3M/s 346MB/s 1 9B 26/s 241B/s

美团如何使用Presto选择presto的原因2013年我们也用过一段时间的impala,当时impala不支持线上1.x的hadoop社区版,所以搭了一个CDH的小集群,每天将大集群的热点数据导入小集群。但是hadoop集群年前完成升级2.2之后,当时的impala还不支持2.2 hadoop版本。而Presto刚好开始支持2.x hadoop社区版,并且Presto在Facebook 300PB大数据量的环境下可以成功的得到大量使用,我们相信它在美团也可以很好的支撑我们实时分析的需求,于是决定先上线测试使用一段时间。部署和使用形式考虑到两个原因:1、由于Hadoop集群主要是夜间完成昨天的计算任务,白天除了日志写入外,集群的计算负载较低。2、Presto Worker节点与DataNode节点布置在一台机器上可以本地计算。因此我们将Presto部署到了所有的DataNode机器上,并且夜间停止Presto服务,避免占用集群资源,夜间基本也不会有用户查询数据。Presto二次开发和BUG修复年后才正式上线Presto查询引擎,0.60版本,使用的时间不长,但是也遇到了一些问题:美团的Hadoop使用的是2.2版本,并且开启了Security模式,但是Presto不支持Kerberos认证,我们修改了Presto代码,增加了Kerberos认证的功能。Presto还不支持SQL的隐式类型转换,而Hive支持,很多自助查询的用户习惯了Hive,导致使用Presto时都会出现表达式中左右变量类型不匹配的问题,我们增加了隐式类型转换的功能,大大减小了用户SQL出错的概率。Presto不支持查询lzo压缩的数据,需要修改hadoop-lzo的代码。解决了一个having子句中有distinct字段时查询失败的BUG,并反馈了Presto团队 https://github.com/facebook/presto/pull/1104

所有代码的修改可以参考我们在github上的仓库 https://github.com/MTDATA/presto/commits/mt-0.60实际使用效果这里给出一个公司内部开放给分析师、PM、工程师进行自助查询的查询中心的一个测试报告。这里选取了平时的5000个Hive查询,通过Presto查询的对比见下面的表格。自助查询sql数

hive

presto

presto/hive

1424154427s27708s0.179424582489

参考Presto官方文档 http://prestodb.io/

Facebook Presto团队介绍Presto的文章https://www.facebook.com/notes/facebook-engineering/presto-interacting-with-petabytes-of-data-at-facebook/10151786197628920

SlideShare两个分享Presto 的PPThttp://www.slideshare.net/zhusx/presto-overview?from_search=1http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto

新东方实时数仓实践

背景介绍

在传统数据仓库方面,通常以 T+1 离线批量计算为主,按照数仓建模方式,把要处理的业务按照主题域划分,构建各种数据模型,来满足公司经营分析,财务分析等各种公司管理层的数据需求。

然而,随着在线教育快速发展市场竞争非常激烈,T+1 的方式在某些需求上很难对业务产生实际的价值,很可能因为数据延迟导致业务动作滞后,管理要求跟进不及时,最终导致客户流失,影响公司业务发展。

目前我们遇到的主要痛点如下:

续费业务场景:在线教育上课主要分为 4 个时段(春季,暑假,秋季,寒假)。当每一个时段上课要结束的时候,就会有一个续费周期,每个学科每个班级续费率的高低直接影响公司是否盈利的问题。所以实时的观测每个学科每个班级每个学科负责人每个教学负责人的续费率完成情况就显得尤为重要。直播行课场景:分析课中学员与老师互动行为,其中包含实时的连麦、发言、红包等行为数据,同时分析学员实时到课、完课、考试等数据对于管理学员和调整老师动作有重要的指导意义。销售场景:监控新线索的实时分配,以及后续销售外呼频次、外呼时长,统计销售线索覆盖量,外呼覆盖量等指标。通过分析销售对于学员的跟进与转化数据,对比个人和团队当日人次和金额达成目标,指导运营管理动作。算法线索分场景:每当进行广告投放的时候,针对每一个销售线索给出一个评分值,来评估这个线索可能转化的高低,利于销售人员更好的跟进,提高转化率。

实时数仓技术架构

01 实时数仓选型

在 2020 年以前公司实时数据部分,主要由小时级和分钟级的支持。小时级部分使用基于 Hive/Spark 的小时级任务方案,分钟级使用 Spark-Streaming 方案。

基于 Hive/Spark 小时级方案虽然能满足快速响应业务需求和变化的特点,但延迟性还是很高,并且大量的小时任务对集群计算资源有很大压力,很有可能导致这一批小时任务根本跑不完。分钟级 Spark-Streaming 方案,能够满足数据时效性需求,但采用纯代码方式来开发,无法满足快速变化的数据需求

基于此,我们开始调研业界方案,目前业界有主要有两种实时数仓方案,分别是实时数仓方案和准实时数仓方案。

02 实时数仓方案

实时数仓方案分别可以采用 Lambda 架构和 Kappa 架构。

Lambda 架构

如上图所示例,Lambda 架构存在离线和实时两条链路,实时部分以消息队列的方式实时增量消费,一般以 Flink 和 Kafka 的组合实现,维度表存在 MySQL 数据库或者 Hbase ;离线部分一般采用 T+1 周期调度分析历史存量数据,每天凌晨产出,更新覆盖前一天的结果数据,计算引擎通常会选择 Hive ,优点是数据准确度高,出错后容易修复数据;缺点是架构复杂,运维成本高。

Kappa 架构

Kappa 架构是由 LinkedIn 的 Jay Kreps 提出的(参考 Paper: https://www.oreilly.com/radar/questioning-the-lambda-architecture/ ),作为 Lambda 方案的一个简化版,它移除了离线生产链路,思路是通过在 Kafka 里保存全量历史数据,当需要历史计算的时候,就启动一个任务从头开始消费数据。优点是架构相对简化,数据来源单一,共用一套代码,开发效率高;缺点是必须要求消息队列中保存了存量数据,而且主要业务逻辑在计算层,比较消耗内存计算资源。

但由于之前流处理系统本身不成熟,对窗口计算、事件时间、乱序问题和 SQL 支持上的不成熟,导致大部分公司普遍采用 Lambda 架构方案。但自从 2020 年 2 月 11 日,Flink 发布了 1.10 以及随后的 1.11 版本,引入了 Blink Planner 和 Hive 集成极大地增强了对于 SQL 和流批一体支持,这为真正实现 Kappa 架构带来了一丝希望。

03 准实时数仓方案

准实时数仓方案

其核心思路是,采用 OLAP 引擎来解决聚合计算和明细数据查询问题,配合分钟级别调度(一般是 30 或者 15 分钟)能力来支持业务实时数据需求。该架构优点是:

一般 OLAP 引擎都对 SQL 的支持度很好,开发成本极低少量人员都可以支持复杂的业务需求,灵活应对业务变化。对于差钱的公司来说无疑非常节约成本的(财大气粗的公司除外);对于数据修复成本低,因为可以基于一个周期内的数据进行全量计算,所以修复数据只需要重跑任务即可;对于运维成本也较低,只需要监控任务运行成功失败即可;对于数据时效性要求较高的场景,配合 Flink 实时计算能力,在数据接入的时候进行部分聚合计算,之后再把结果写入 OLAP 引擎,不需要再配合调度计算,以此来到达秒级延迟。

该架构的缺点是:

将计算转移到了 OLAP 引擎并同时兼顾了计算和查询需求,对 OLAP 引擎性能有较高的要求;因为计算转移到了 OLAP 端,所以这种方案适用的数据体量规模有一定限制。

基于 Doris 的准实时方案

结合公司数据规模、业务灵活性、成本方面的考虑,我们选用的准实时的方案。那么接下来的就是确定采用什么 OLAP 引擎的问题了,目前业界开源的 OLAP 引擎主要有 Clickhouse、Durid、Doris、Impala+Kudu、Presto等。

这些引擎目前业界都有公司采用,比如:头条采用 Clickhouse(因为头条数据量巨大,并且头条专门有一个团队来优化和改进 Clickhouse 的内核,有钱就是好),快手采用 Durid 、Doris 美团/作业帮有采用,Impala+Kudu 网易是深度用户,Presto 主要用做 Adhoc 查询。

对此可以看到这些引擎都可以采用,主要问题是,是否对这些引擎有足够的掌握程度以及引擎本身学习成本。经过一番对比,结合之前本身对 Doris 有一定的了解,再横向对比了使用公司的规模,最后选择了 Doris 引擎。

Doris 的优点如下:

单表和多表 Join 查询性能都很强,可以同时较好支持宽表查询场景和复杂多表查询,灵活性高支持实时数据更新操作支持流式和批量数据导入兼容 MySQL 协议和标准 SQL支持 HA,在线升级扩容,运维成本低

总体架构

系统架构方案

上图是目前公司采用的架构方案,总体流程如下:

1. 数据接入部分,分为业务数据和日志数据。业务数据通过 Binlog 方式收集到 Kafka 后,再通过 Flink 写入到 Doris ODS 层中2. MDS 层,采用每 10 分钟、半小时、一小时进行增量或全量的方式更新,构建业务模型层3. ADS 层,构建大宽表层加速上层查询速度4. 对于一些临时的 Ad hoc 大查询需求,通过 Doris 的 Export 功能导出到 Hive 通过 Presto 提供查询,避免影响在线业务核心报表5. BI 查询直接通过 MySQL 协议访问 DB,配合查询层缓存来提供报表分析服务6. 每层 ETL 任务通过自研调度系统调度运行,报警监控一体化

在实际业务应用中也遇到了一些问题,主要有如下几个方面的问题。

Socket 文件描述符泄漏问题

因为进行查询和 ETL 都需要通过 JDBC 或 MySQL 客户端与 FE 的 9030 端口连接,在测试使用 0.10 版本时发现有的 Socket 的文件描述符没有办法正常关闭,每小时都会产生几十个 FD 泄露,最后达到上限无法创建新的 Socket 连接,只能通过重启的方式释放掉关闭异常的 FD 。

经过排查所有相关连接使用的代码,发现并没有可能产生连接不关闭的部分,咨询发现其他 Doris 使用者并没有出现过类似问题,经过反复和社区沟通最终确定是 MysqlNIOServer 的 Bug ,目前已经修复。

语法相关问题

union all null值问题在使用 with 语句生成的虚拟临时表时,如果有值为 null 的字段,这个字段在后续使用时的值会变成空字符串,不论是直接插入表还是通过 not null 进行过滤都无法得到正确结果。常量值 Join 时会关联出错误数据如果 with 查询中使用了 case when 等对字段进行常量值赋值的情况,如果 Join 关联时的关联条件使用到了这个字段,则有可能出现错误关联的情况,例如无法关联上的数据错误的关联上了。

lead 和 lag 函数导致的数据错位如图所示,在使用 lead 或者 lag 函数对数据处理时,会出现时间数据错位的问题。

以上三个在使用时发现的问题,经过与百度 Doris 团队开发者的反馈,现在这两个问题已经在新版本中完成修复。

副本不一致为了保证数据的高可用,避免因为某个 BE 磁盘损坏导致的数据丢失,同时可以提高本地计算的概率,通常会给表设置大于 1 的副本数,虽然 TabletChecker 和 TabletScheduler 会定期检查所有分片,并对不健康的分片进行修复,然而依然会出现某些副本不一致的情况。针对某些副本不一致的情况我们进行研究,发现在使用 Uniq 模型时,如果进行 ETL 插入的数据含有多条相同 Unique Key 且没有对这些数据进行排序时,不同副本中实际存入的数据可能会出现不一致的情况。后续对 SQL 进行改进,对 Unique Key 进行去重或者排序之后,这种情况的副本不一致就没有再次出现。

Json 解析时如果含有制表符则无法解析

在使用 getjsonobject 解析 Json 格式的字符串时,如果字符串中含有制表符,最后的解析会失败,这时必须对字符串进行字符替换,将制表符替换成空字符,然后再解析 Json 。

使用总结

在资源有限的情况,实时数仓还不能完全代替离线数仓。实时数仓对资源要求较高,成本换时间。离线数仓是时间换成本。也许在不久的将来随着算力的提高,新的技术的应用可以实现,但目前还没有。随着数据量的增加计算延迟也会增加,两者呈线性关系,这就需要在业务需求和成本上做一个折中。使用 Doris 支撑了公司大部分实时数据需求,在保证开发成本和使用灵活性方面非常友好。未来随着 Flink SQL 方面越来越成熟可以把计算任务压力进一步转移到 Flink 上,结合Doris 的 OLAP 能力,可以提供更低延迟数据需求。

发表评论