select date, count(distinct student_id),count(distinct course), sum(score)from student_coursegroup by dateorder by
在odps上和order by相似的功能在还有sort by, distribute by,cluster by。 后面的语法在普通的关系型数据库都不存在。算是mapreduce特有的功能。这里先解释下每个语句的含义:
order by —— order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间。
sort by —— sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受Hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。
distribute by —— distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。
cluster by —— cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。
MapReduce的几个阶段
input
split
map
shuffle
reduce
output 这每个阶段都会出现各种问题,我们依次从前到后来讲怎么处理各个阶段出现的问题。
Input & split
根据MaxCompute的功能,input可以是本地文件,也可以是数据库的表。可以通过InputFormat借口来定义。但是这个Format和后面的split阶段息息相关。因为split只切割比block小的文件,对于小文件则不作处理。所以当存在大量的小文件(特指大小达不到block大小的文件),会生成大量的split块,同时也会启动大量map任务。
可能出现的问题
分区裁剪中出现问题 > 解决方法是让odps在生成任务之前就能确定好读区到分区的范围
输入存在大量小文件,导致map instance数量超标 > 解决办法是读取时候设定块大小,可以使用setSplitSize来控制读取文件总大小 > 解决方案二是提前就把这些小文件给合并了
输入文件大小分布非常不均匀,导致split的块大小分布不均匀,从而导致map端倾斜 > 可以使用setSplitSize来控制读取文件总大小
输入的文件不能被切割,导致split块大小不均匀
暂时没有找到解法
相比于hadoop,odps系统在小文件处理方面的功能已经比较完善,主要体现在以下两个方面:
(1) 默认情况下,当Job完成之后,如果满足一定的条件,系统会自动分配一个FuxiTask(调度任务)进行小文件合并,即我们经常看到的MergeTask;
map
map阶段的输入是上面Input&split阶段来保障的,一个分片一个map任务。所以当分片处理的不合理,map阶段就会出现问题。而map端经过shuffle和combianer(可选)后,会把数据交给reduce端。
从input&split 到map可能出现的问题
输入存在大量小文件,导致map instance数量超标 > 同上
因为ODPS的SQL或者其他任务会解析成一个Task DAG。所以从最初输入到最终输出会有很多的中间计算。而这些中间计算之间也是对应着一个个的map reduce。如果当上一个map/reduce任务产生的输入可能形成一个种长尾分布,导致下一个mapreduce输入出现长尾。也就是map端任务倾斜。
shuffle
这个阶段是mapreduce的核心,设计到sort,group和数据分发。
可能出现的问题
数据量特别大,可以使用combinar来进行mapper端的聚合。odps的参数是
reduce
知道mapreduce计算模型的人都知道,map阶段输入是非结构化的,并不需要实现规定好输入的内容,输出则是一块块分区好的pair。而到reduce则有要求,那就是同样key的map处理的pair需要发送到同样的reduce中。这样就会出现某key数据量很大,某key数据量很小的时候对应的reduce处理的数据量大小也是不均匀的。一旦出现这种情,任务执行的结束时间必然会受到最长任务的拖累。,v>,v>
能产生reduce数据分布不均匀的操作,最长出现的有两分类:
- join
这里推荐本书《[url=]mapreduce设计模式[/url]》,其中的连接模式篇章把各种join的描述。在这里大概说下join的类型:
reduce端连接
map端连接(在odps中使用mapjoin即可),这个操作的前提是存在一个小表能放入到mapreduce中的环形内存中。而且大表必须作为“主表”(比如left join的话就必须是左表,而right join就必须是右表)。
所以到底为什么会产生倾斜呢?map端连接肯定是不会产生数据倾斜的,那么倾斜的必然是reduce连接。当一张表出现数据热点。这样就会出现热点reduce的运行远远大于其它的长尾,导致数据不均衡。
大概总结下就是:
- 如果存在小表,且如果左外连接时候小表是右表(或者是右外连接,小表必须是左表),可以使用mapjoin。
- 如果都是大表且有热点,这样会出现倾斜,这时候需要剔除热点数据单独处理。
- 如果都是大表没有热点,这样不会出现倾斜,这样还需要怎么优化?——这里首选想办法减小数据集合,如果不能在查看是否出现某些热门的数据,如果有,则对数据进行分桶。
count(distinct) 对于distinct的实现,单键的时候会被直接附到group by的字段后,同时作为map输出的key值来处理。这样转化成了group by处理,一般是没有问题的。但是麻烦的是多键值count(distinct),这个没有办法直接把所有的distinct的字段附到group by后面了。因为这样无法利用shuffle阶段的排序,到了reduce阶段需要做很多遍的去重操作。所有一般对于multi distinct都是采用给distinct 字段做编号,然后复制数据。比如输入数据是这样:
可以看到distinct会导致数据翻倍膨胀,而这些膨胀的数据后会通过网络传输到reduce,必然会造成很大的浪费。所以要治理,方法一是首先把distinct转成group by放在子查询中,然后外层再套一层查询进行分组count。