使用AWK和R来解析25TB的DNA数据

大数据工作该怎么样做?用巨量的及其搭建Hadoop集群,利用Spark内存计算?Kafka?课本上都是这样教的。那么实际中的大数据是怎么处理的?是怎么样的一个流程?用什么软件架构的呢?本文虫虫带领大家一起学习下国外一位大学生物信息研究工作者在探索解决巨量(25TB)DNA基因数据的探索历程,期间有用到Spark,AWS S3对象存储,最后使用AWK,管道和R完成了整个数据的处理。整个文章给我们很多学习的素材,比如大数据、巨量数据分区、排序,分布式计算、云计算等等的实例。

概述

Nick Strayer是范德比尔特大学生物统计学系的博士研究生,他负责近负责为实验室建立一个处理大量原始DNA测序(SNP芯片)数据的工作流程。目标是能够快速获取给定基因位点(SNP)的数据做数据建模。使用R语言和AWK他以简介自然的方式清理和组数据,大大加快了查询速度,达到满意的数据处理效果。

数据

些数据由范德比尔特大学遗传学处理中心提供为25 TB的tsvs数据。数据有五部分构成,每部分包含大约240个4G大小的文件。文件中,每行包含一个样本的单个SNP的数据。随着SNP值的增加,读数的强度,不同等位基因的频率等等都有多个数字列。要从中获取大约30列有意义的片段数据。

目标

和任何数据项目一样,最重要的是考虑如何使用数据。主要做法是通过SNP基础在SNP上拟合模型和工作流程。即一次只需要一个SNP的数据。需要尽可能简单,快速和便宜地提取与250万SNP记录中的一个相关的所有记录。

失败尝试

第一次尝试

首先Nick通过一两个小时架设了一个Hive服务器来运行处理数据。由于数据存储在AWS S3对象存储上,他使用因此Athena的服务,该服务允许对S3数据运行Hive SQL查询。你可以避免设置/启动Hive群集,只需为搜索到的数据付费即可。

在将Athena指向实验数据做其格式后,运行了一些测试,比如:

select * from intensityData limit 10;

很快就能获得查询结果。但是当要实际使用这些数据时,查询要求获取SNP的所有数据,运行如下查询:

select * from intensityData where snp = 'rs123456';

该查询耗费了8分钟和4+T字节的数据,才查询到了结果。Athena的服务收费为每TB 5美元的。所以查询需要8分钟和20美元的账单。

如果要完成所有模型所需的查询,大概需要38年和5000w美刀的账单,显然该方法是不可取的。

经验教训:没有一种廉价的方法可以同时解析25tb的数据。

使用Parquet优化

第一次失败后,优化策略是尝试对数据格式进行转化,所有TSV转换为Parquet文件。 Parquet文件适用于处理较大的数据集,因为它以列式存储方数据。因此每列都存储在其自己的内存/磁盘部分中,与行式文本文件不同。这样查询只需读取感兴趣的列内容即可。此外,它还会按列保留每个文件的值范围记录,因此,如果要查找的值不在列范围内,Spark不会耗费时间扫描文件。

这样工作运行AWS的大数据组件AWS Glue,将TSV数据转换为Parquet,并将新的Parquet文件连接到Athena。转化大概花了大约五个小时。但是,当运行查询时,它花费了大约相同的时间和但是数据流量小一点。由于Spark尝试优化作业只是将单个TSV块解压缩并将其放置在自己的Parquet块中。因为每个块都足以包含多个样本的完整记录,所以每个文件都包含其中的所有SNP,Spark必须打开所有文件才能提取需要的内容。

有趣的默认(和推荐)Parquet压缩类型:snappy不可拆分。因此,每个执行程序仍然处于解压缩和加载整个3.5gig数据集的任务中。

经验教训:小心Parquet文件大小和类型。

解决问题

为了解决这个问题需要做的就是对SNP列而不是个体上的数据进行排序。这将允许一定数量的数据仅包含少量SNP,而Parquet的智能仅开放式值范围内功能可以做到一点。然而,对跨群集分布的数十亿行数据进行排序并非易事。

最后悲剧发生了,AWS排序的组件Glue运行了2天后奔溃了,所有结果没了,AWS还不打算为该单退款。

经验教训:排序很难,特别是分布式数据。

尝试数据分区

另一个想法是将数据划分为染色体。这样可以将数据减少为更易于管理的块。通过在Glue脚本中添加一行:partition_by = "chr"到Spark导出功能将数据放入这些存储桶中。

不幸的是,事情也不顺利。因为染色体的大小不同,其中的数据量会各不相同。Spark发送给其计算Jobs的任务不均衡,并且由于某些节点提前完成并处于空闲状态而运行缓慢。当查询单个SNP时,数据不平衡再次引起问题:在较大的染色体中(也就是实际想要获取数据的地方),成本仅提高了数10倍甚至更多。

经验教训:Spark中的分区需要平衡。

细化分区

和上面简单13分区方法对比,Nika又做了一件极端的决定,对每个SNP上进行分区,保证了每个分区的大小相等。然而该方法是有问题的,在使用了Glue并添加partition_by ='snp',工作开始并运行。一天后,通过查询数据并没有写入S3,所以kill掉了这项任务。实际上Glue的做法是将中间文件写入隐藏的S3位置,都有20亿数据量。在通过1天,并且1000美刀的账单后才意识到了错误。

经验教训:永远不要尝试分250万个分区。

分区+排序

既然细化大分区的方法也不行,那么只好采用分区+排序的方法结合了。首先通过染色体上进行分区,然后对每个分区进行排序。理论上,这能使每个查询更快,因为期望的SNP数据将仅存在于给定区域内的一些Parquet块的范围内。

然而事实证明,分区数据的排序也需要大量的工作,最终切换到EMR自建集群,使用8个强大的AWS实例虚拟机(C5.4xl),结合Sparklyr构建更灵活的工作流程:

Sparklyr片段按chr分区并在分区中排序w/in,使用snp bin加入原始数据

原始数据

raw_data

group_by(chr) %>%

arrange(Position) %>%

Spark_write_Parquet(

path = DUMP_LOC,

mode = 'overwrite',

partition_by = c('chr')

)

但是,工作并没有解决。尝试了所有的调优技巧:增加了分配给查询的每个执行器的内存;使用了高ram节点类型;广播变量。但它总是绕过一半,然后执行器会慢慢开始失败,直到所有查询最终都会停止。

经验教训:排序仍然很难,调整Spark也是无济于事。

更有创意的尝试

每个SNP都有一个位置值。这是一个整数,对应它染色体上有多少个碱基。这是一种很好的组织数据的方法。一个想法是按照每条染色体的区域构建分区。Aka (positions 1 - 2000, 2001 - 4000, 等等)。问题是SNP沿染色体分布不均匀,因此大小会有很大差异。

解决方案是按位置排名。已经加载的数据进行了查询,以获得唯一SNP,它们的位置和染色体的列表。然后在每个染色体内进行分类,并将SNP捆绑到给定大小的区间中。例如1000 SNPS,这样就得到从SNP -> bin-in-chromosome的映射。

snp_to_bin <- unique_snps %>%

group_by(chr) %>%

arrange(position) %>%

mutate(

rank = 1:n()

bin = floor(rank/snps_per_bin)

) %>%

ungroup()

经验教训:定制数据需要定制解决方案。

使用Spark

目标是将这个小的(250万行)数据帧读入Spark,将其与原始数据连接,然后在新添加的bin列上进行分区。

data_w_bin <- raw_data %>%

left_join(sdf_broadcast(snp_to_bin), by ='snp_name') %>%

group_by(chr_bin) %>%

arrange(Position) %>%

Spark_write_Parquet(

path = DUMP_LOC,

mode = 'overwrite',

partition_by = c('chr_bin')

)

注意使用了sdf_broadcast(),这让Spark知道它应该将这个数据帧发送到所有节点。当数据很小并且所有任务都需要时,会很有用。

然而任务也没有成功。与排序尝试一样,作业运行一段时间完成加入任务,然后随着分区启动,执行程序就开始崩溃。

经验教训:Spark加速很快,但分区仍然很昂贵

正确尝试

引入AWK

截止目前,所有的Spark故障都是由于数据在群集周围乱序。解决这种乱序,可以通过一些预处理帮助它,可以尝试将原始文本数据拆分到染色体列上,这样就可以为Spark提供一些"预分区"数据。

在解决如何按列值拆分问题时候,引入了AWK,可以通过在脚本中执行写入而不是将结果发送到stdout来按列的值拆分文本文件。下面一个一个bash脚本测试该项工作,下载了一个gzip压缩文件,然后使用gzip将其解压缩,将其穿给awk。

gzip -dc path/to/chunk/file.gz |

awk -F '\t' \

'{print $1",..."$30">"chunked/"$chr"_chr"$15".csv"}'

有效,工作解决了。

经验教训:不要睡在基础知识上。有人可能在80年代解决了你的问题。

parallel并行执行

数据分割的过程有点慢。通过htop监控系统过程中,发现高配置的ec2实例只使用了单核和200 多M的内存,所以并行化执行分割任务是必须的选择,这个工具也很成熟那就是gnu parallel,用来并行任务的最佳选择。使用新的GNU并行工作流运行拆分效率非常好,但是将S3对象下载到磁盘有点慢并且没有完全并行化而导致瓶颈,优化方法:

1.发现可以直接将S3下载步骤加入管道中,跳过中间磁盘存储。这样可以避免将原始数据写入磁盘,也可以在AWS上使用更小且更便宜的存储。

2.使用aws configure set default.s3.max_concurrent_requests 50将AWS CLI使用的线程数增加到某个更大的大数(默认值为10)。

aws configure set default.s3.max_concurrent_requests 50

3.切换到网络速度优化的ec2实例,就是名称中带有n的那些。然而,使用带'n'实例导致的计算能力损失超过了下载速度的提高。

4.将gzip更换为pigz,这是一个并行的gzip工具,可以执行一优化操作来并行化解压缩。

结合这些优化,任务变得很快。通过提高下载速度和避免写入磁盘,现在能够在几个小时内处理整个5 TB的数据。

通过gnu-parallel,可以解压缩和分割19gig csv,就像可以下载一样快。甚至无法让Spark运行它。

经验教训:gnu parallel是神奇的,每个人都应该使用它。

使用新转换后的数据

处理过后,在S3以解压缩和半组织格式存储了数据,现在继续运行Spark查询了。确实最终在Spark上完成事情,分区Parquet文件并不是非常小(200KB左右)但数据到了它该到的地方。

经验教训:Spark喜欢未压缩的数据,不喜欢组合分区。

测试本地Spark查询

经验教训:Spark对于简单的工作来说是一个很大的开销。

随着数据加载到合理的格式,可以测试速度。通过设置了一个R脚本来启动本地Spark服务器,然后从给定的Parquet位置加载Spark数据帧。尝试加载所有数据,但由于某种原因无法让Sparklyr识别分区。

花费了29.415秒。比以前好多了。此外,无法通过启用缓存来加快速度,因为当将bin的Spark数据帧缓存在内存中时,Spark总是崩溃,即使数据集有50多G。

使用AWK关联数组

Nick还是使用了AWK关联数组来执行联合SNP -> bin表和我的原始数据,而不使用Spark。

通过在AWK脚本中使用了BEGIN块。

join_data.awk

while(getline ...)命令从bin csv加载所有行,并将第一列(SNP名称)设置为bin关联数组的键,将第二个值(bin)设置为值。然后,在主文件的每一行上运行{block}中,每一行都被发送到一个输出文件,该文件具有基于其bin的唯一名称:..._bin_"bin[$1]"_....

变量batch_num和chunk_id的对应管道给出的数据,这些数据确保并行运行的每个线程写入其自己的唯一文件来避免数据写入中的竞争条件。

所有原始数据拆分后的染色体文件夹,所以现在可以用另一个bash脚本来一次处理染色体并将更多分区数据发送回S3。

此脚本有两个并行部分:

第一个读入包含所需染色体数据的每个文件,并将它们分成多个线程,将其文件吐入其代表性区域。为了防止多个线程写入相同bin文件的竞争条件,AWK被传递它用于写入唯一位置的文件的名称,例如, chr_10_bin_52_batch_2_aa.csv这导致磁盘上有大量的小文件(为此使用了1TB EBS卷)。

第二个并行管道通过并将每个bin的单独文件合并到带有cat的单个csv中,并将它们发送给导出...

经验教训:AWK中的关联数组非常强大。

使用R语言

上面这个bash脚本的这一部分:...cat chunked/*_bin_{}_*.csv | ./upload_as_rds.R....。此行将bin的所有连接文件管道传输到以下R脚本中.处理

通过传递readr :: read_csv变量文件("stdin"),它将通过管道传输到R脚本的数据加载到数据帧中,然后使用aws.s3将其作为.rds文件直接写入s3。

Rds有点像Parquet的初级版本,没有柱状存储的细节。

即使工作流程中的R速度非常慢,整个流程也非常快。用于读取和写入数据的R部分相当优化。在单个平均大小的染色体上测试后,使用C5n.4xl实例在大约两个小时内完成了这项工作。

经验教训:以从R脚本中访问stdin和stdout,从而在管道中使用它。

S3的限制

事实证明,S3将给定文件的路径视为可以被认为是哈希表或基于文档的数据库的简单密钥。将"桶"视为表,每个文件都是一个条目。因为速度和效率对S3为亚马逊赚钱很重要,所以这个关键文件路径系统是超级优化的并不奇怪。为了优化查询,我们不需要做大量的get请求,希望查询速度很快。最后发现制作大约20k的bin文件效果最好。

经验教训:由于智能路径实现,S3可以处理大量文件。

格式兼容

"为什么你会使用专有的文件格式?",主要是为了优化加载速度(使用gzipped csvs加载大约7倍的时间)以及与工作流程的兼容性。 R可以轻松加载Parquet(或Arrow)文件,而不会产生Spark的开销。

如果最终需要将数据转换为另一种格式,而仍然拥有原始的原始文本数据,并且可以再次运行管道。

经验教训:过早优化存储方法是浪费时间的根本原因。

流程完成

既然有单个染色体工作的工作流程,为了处理所有染色体的数据。需要启动多个ec2实例来转换的所有数据,但不想让超级不平衡的工作负载(就像Spark从不平衡的分区中受到的影响)。为了解决这个问题,使用R.编写一个作业优化脚本。

首先,查询S3以确定每个染色体在存储方面的大小。

结果:

然后编写了一个函数,它将获取这个总大小信息,,然后拆分成num_jobs组并报告每个作业数据大小的变量。

结果:

这样所有工作接本上就可以搞定,将之前的bash脚本包装成一个大的for循环:

for DESIRED_CHR in "16" "9" "7" "21" "MT"

do

# 处理单个蛋白质的脚本

done

最后添加一个关机命令。

sudo shutdown -h now

使用AWS CLI来启动一堆实例,通过user_data选项将它们的作业的bash脚本传递给它们。它们自动运行然后自动关闭,这样不会多耗费一分的成本(云计算时代需要做的)。

aws ec2 run-instances ...\

--tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=<<job_name>>}]" \

--user-data file://<<job_script_loc>>

经验教训:不要试图优化工作,让计算机去做。

提供API

最后一步是简化实验室成员尽可能多地使用数据的过程。他提供一个简单的查询API。如果将来决定从使用.rds切换到Parquet文件,希望坑自动适配。他构建并记录了一个非常简单的接口其中包含一些用于访问数据的函数,以函数get_snp为中心。此外,他还构建了一个pkgdown站点,因此实验室成员可以轻松查看示例/文档。

经验教训:为最终用户保持API简单,并提供灵活性。

智能缓存。

由于这些数据的主要工作流程之一是同时在一堆SNP上运行相同的模型/分析,在为SNP提取数据时,整个bin的数据将保留并附加到返回的对象。这意味着如果运行新查询,则可以(可能)使用旧查询结果来加速它。

在构建软件包时,运行了很多基准来比较不同方法之间的速度,通过它可以让我们了解直觉中的误解。比如,dplyr::filter比使用基于索引的过滤来获取行要快得多,但使用索引语法从过滤后的数据帧中获取单个列要快得多。

经验教训:如果的数据设置良好,缓存将很容易!

结果

速度大大提高。典型的用例是扫描基因组的功能重要区域(例如基因)。之前,无法做到这一点之前(因为它花费太多的钱)。现在由于bin结构和缓存,每个SNP查询平均需要不到十分之一秒,并且数据使用量甚至不足以进行舍入的S3成本。

总结教训

最终的解决方案是定制的,几乎肯定不是最佳解决方案,但是反复试验的产物。

此外,如果你处于雇佣某人作为数据科学家的位置,请考虑这样一个事实,即擅长这些工具需要经验,而且经验需要资金。Nick很幸运,有资金支付这笔费用,但许多确定的人永远不会有机会,因为他们没有资金进行尝试。"大数据"工具是通才。如果你有时间,几乎可以肯定能够使用智能数据清理,存储和检索技术为你的问题编写更快的解决方案。当然,最终会归结为成本效益分析。

解析   数据   AWK
发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();