
由于大多数Spark计算的内存特性,Spark程序可能会受到群集中任何资源的瓶颈:CPU,网络带宽或内存。大多数情况下,如果数据适合内存,瓶颈就是网络带宽,但有时候还需要进行一些调整,例如以序列化形式存储RDD、以减少内存使用。下面将给大家举几个Spark性能优化的例子。
数据序列化
Java序列化:java.io.Serializable 可以通过控制序列化的性能java.io.Externalizable。Java序列化是灵活的,但通常很慢,并导致许多类的大型序列化格式。
Kryo序列化:Kryo序列化比Java序列化速度高达10倍,更快、更紧凑,但不支持所有Serializable类型。
配置方式:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
如果要使用Kryo注册自己的自定义类,请使用registerKryoClasses方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
内存调整
默认情况下,Java对象访问速度很快,但与其字段中的“原始”数据相比,可以轻松占用2-5倍的空间。这是由于以下几个原因:
Spark中的内存使用大致属于以下两种类别之一:执行和存储。执行内存是指用于在随机,连接,排序和聚合中进行计算的内存,而存储内存是指用于在群集中缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当没有使用执行内存时,存储可以获取所有可用内存,反之亦然。如有必要,执行可以驱逐存储,但仅限于总存储内存使用量低于某个阈值(R)。换句话说,R描述了M从不驱逐缓存块的子区域。由于实施的复杂性,存储可能不会驱逐执行。
该设计确保了几种理想的特性。首先,不使用缓存的应用程序可以使用整个空间执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小存储空间(R),其中数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户内部划分内存的专业知识。
虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载:
spark.memory.fraction应该设置值,以便在JVM的旧版或“终身”代中舒适地适应这个堆空间量。
确认内存消耗
调整数据集所需内存消耗量的最佳方法是创建RDD,将其放入缓存中,然后查看Web UI中的“存储”页面。该页面将告诉你RDD占用多少内存。
为了估计特定对象的内存消耗,使用SizeEstimator的estimate方法。这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆上占用的空间量非常有用。
调整数据结构
减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装器对象。做这件事有很多种方法:
并行程度
Spark会根据其大小自动设置要在每个文件上运行的“map”任务的数量(尽管可以通过可选参数来控制它SparkContext.textFile等),并且对于分布式“reduce”操作,例如groupByKey和reduceByKey,它使用最大的父级RDD的分区数量。设置config属性spark.default.parallelism以更改默认值。通常,建议群集中每个CPU核心有2-3个任务。
减少任务的内存使用情况
有时候,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,例如其中一个reduce任务groupByKey,太大了。这里最简单的解决方法是 增加并行度,以便每个任务的输入集更小。Spark可以有效地支持短至200毫秒的任务,因为它在许多任务中重用了一个执行程序JVM,并且它具有较低的任务启动成本,因此可以安全地将并行度提高到超过群集中的核心数。
广播大变量
如果任务使用其中的驱动程序中的任何大对象(例如静态查找表),请考虑将其转换为广播变量。Spark会在主服务器上打印每个任务的序列化大小,因此可以查看它以确定任务是否过大; 一般来说,大于约20 KB的任务可能值得优化。
| 留言与评论(共有 0 条评论) |