从网上找到自己的答案
Use the Right Search Engine
成为一个程序员也有一段时间了。回过头来想想看以前做的事情,只想说:非常low!
我记得东哥那时候推荐我用bing或者翻墙上谷歌。我当时可是一脸懵逼,感觉用习惯了,也觉得没什么不好的地方。
不说其他的,就说程序这块吧。当你运行代码碰到一个报错信息时候,直接用google搜的话很快就能找到解决方案。比如Python的话google返回的很有可能是stackoverflow这些链接。linux的是askubuntu的这些链接。
而百度返回的很有可能是Oschina或者Csdn等等。然而很多好的解答都是在stackoverflow那些网站。
English is Important
有些比较少罕见的问题,国内搜索引擎压根就找不到,国内压根就没有这方面答案。你必须从国外的网站获取解答。比如 Github issue 、Quora或者官方文档。
Spark repartition和coalesce的区别
Spark 的宽依赖和窄依赖
Spark 数据倾斜调优
关于Spark 的数据倾斜的问题
数据倾斜的现象
1、绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
2、原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。
数据倾斜的原理
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。
浅谈SPARK 的 广播变量
简单谈谈SPARK 广播变量的用法
首先我们看看官网的文档对Broadcast的描述:
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
意思就是说广播变量让你可以在每个节点机器上缓存一个只读的变量,而减少了在每个任务中复制一份的繁琐。
- Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
- Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
创建的方法很简单
浅谈SPARK 的 combineByKey 算子
理解comebineByKey的原理
SPARK的combineByKey算子和aggregate类似。首先我们看看官网的文档:
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=
) Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
Users provide three functions:
createCombiner, which turns a V into a C (e.g., creates a one-element list)
mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
mergeCombiners, to combine two C’s into a single one.
都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。
combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。
这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。
浅谈SPARK 的 aggregate 算子
理解aggregate的原理
刚开始我觉得SPARK的aggregate算子比较难理解的。首先我们看看官网的example
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)
看了后,我就一脸懵逼了,两个lambda函数中的x,y各自代表什么东西呢?
我们先看看官网的aggregate 用法说明:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”
The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U
Python的yield 问题
Python yield示范
yield 关键词在python中不算常用。以前我只知道它的作用和return相似,就是返回一个值。并不知道它具体的用途和用法。
要理解yield,你必须知道generators(生成器)。而要知道生成器,需要了解iterables(可迭代对象)
生成器(Generators)就是迭代器的一种特殊形式,只不过它只能迭代一次。这是因为生成器不是将所有的数值存储在内存里,而是,立即生成数值。
和迭代器相似,我们可以通过使用next()来从generator中获取下一个值
顺便说说:集合数据类型,如list、tuple、dict、set、str等,这些可以用for循环的对象都是可迭代对象(Iterable)。
但list、dict、str虽然是Iterable,却不是迭代器(Iterator)。把list、dict、str等Iterable变成Iterator可以使用iter()函数。
迭代器对象可以被next()函数调用并不断返回下一个数据,直到没有数据时抛出StopIteration错误。
Python的for循环本质上就是通过不断调用next()函数实现的
我们看看下面例子: