Spark性能优化(5分钟了解)

由于大多数Spark计算的内存特性,Spark程序可能会受到群集中任何资源的瓶颈:CPU,网络带宽或内存。大多数情况下,如果数据适合内存,瓶颈就是网络带宽,但有时候还需要进行一些调整,例如以序列化形式存储RDD、以减少内存使用。下面将给大家举几个Spark性能优化的例子。

数据序列化

Java序列化:java.io.Serializable 可以通过控制序列化的性能java.io.Externalizable。Java序列化是灵活的,但通常很慢,并导致许多类的大型序列化格式。

Kryo序列化:Kryo序列化比Java序列化速度高达10倍,更快、更紧凑,但不支持所有Serializable类型。

配置方式:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

如果要使用Kryo注册自己的自定义类,请使用registerKryoClasses方法。


val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

内存调整

默认情况下,Java对象访问速度很快,但与其字段中的“原始”数据相比,可以轻松占用2-5倍的空间。这是由于以下几个原因:

  • 每个不同的Java对象都有一个“对象头”,大约16个字节,并包含诸如指向其类的指针之类的信息。对于其中包含非常少数据的对象(比如一个Int字段),这可能比数据大。
  • Java String在原始字符串数据上有大约40个字节的开销,并且由于UTF-16的内部使用而将每个字符存储为两个字节String编码。因此,10个字符的字符串可以轻松消耗60个字节。
  • 公共集合类,例如HashMap和LinkedList,使用链接数据结构,其中每个条目都有一个“包装”对象(例如Map.Entry)。此对象不仅具有标题,还具有指向列表中下一个对象的指针(通常为每个字节8个字节)。
  • 原始类型的集合通常将它们存储为“包装”对象,例如java.lang.Integer。

Spark中的内存使用大致属于以下两种类别之一:执行和存储。执行内存是指用于在随机,连接,排序和聚合中进行计算的内存,而存储内存是指用于在群集中缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当没有使用执行内存时,存储可以获取所有可用内存,反之亦然。如有必要,执行可以驱逐存储,但仅限于总存储内存使用量低于某个阈值(R)。换句话说,R描述了M从不驱逐缓存块的子区域。由于实施的复杂性,存储可能不会驱逐执行。

该设计确保了几种理想的特性。首先,不使用缓存的应用程序可以使用整个空间执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小存储空间(R),其中数据块不受驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户内部划分内存的专业知识。

虽然有两种相关配置,但典型用户不需要调整它们,因为默认值适用于大多数工作负载:

  • spark.memory.fraction表示大小M为(JVM堆空间 - 300MB)的一小部分(默认值为0.6)。剩下的空间(40%)保留用于用户数据结构,Spark中的内部元数据,以及在稀疏和异常大的记录的情况下防止OOM错误。
  • spark.memory.storageFraction表示大小R为M(默认为0.5)的一小部分。R是M缓存块不受执行驱逐的存储空间。

spark.memory.fraction应该设置值,以便在JVM的旧版或“终身”代中舒适地适应这个堆空间量。

确认内存消耗

调整数据集所需内存消耗量的最佳方法是创建RDD,将其放入缓存中,然后查看Web UI中的“存储”页面。该页面将告诉你RDD占用多少内存。

为了估计特定对象的内存消耗,使用SizeEstimator的estimate方法。这对于尝试使用不同的数据布局来调整内存使用情况以及确定广播变量在每个执行程序堆上占用的空间量非常有用。

调整数据结构

减少内存消耗的第一种方法是避免增加开销的Java功能,例如基于指针的数据结构和包装器对象。做这件事有很多种方法:

  1. 设计您的数据结构以优先选择对象数组和基本类型,而不是标准的Java或Scala集合类(例如HashMap)
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构
  3. 考虑使用数字ID或枚举对象而不是键的字符串
  4. 如果RAM少于32 GB,设置JVM标志-XX:+UseCompressedOops以使指针为四个字节而不是八个字节

并行程度

Spark会根据其大小自动设置要在每个文件上运行的“map”任务的数量(尽管可以通过可选参数来控制它SparkContext.textFile等),并且对于分布式“reduce”操作,例如groupByKey和reduceByKey,它使用最大的父级RDD的分区数量。设置config属性spark.default.parallelism以更改默认值。通常,建议群集中每个CPU核心有2-3个任务。

减少任务的内存使用情况

有时候,你会得到一个OutOfMemoryError,不是因为你的RDD不适合内存,而是因为你的一个任务的工作集,例如其中一个reduce任务groupByKey,太大了。这里最简单的解决方法是 增加并行度,以便每个任务的输入集更小。Spark可以有效地支持短至200毫秒的任务,因为它在许多任务中重用了一个执行程序JVM,并且它具有较低的任务启动成本,因此可以安全地将并行度提高到超过群集中的核心数。

广播大变量

如果任务使用其中的驱动程序中的任何大对象(例如静态查找表),请考虑将其转换为广播变量。Spark会在主服务器上打印每个任务的序列化大小,因此可以查看它以确定任务是否过大; 一般来说,大于约20 KB的任务可能值得优化。

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();