每天分享最新软件开发,Devops,敏捷,测试以及项目管理最新,最热门的文章,每天花3分钟学习何乐而不为,希望大家点赞,评论,加关注,你的支持是我最大的动力。下方抖音有我介绍自动化测试,以及google cloud 相关视频课程,欢迎观看。
当我登录到我的银行账户,希望我所有的账户交易的报告说,六个月或者一年,web应用程序表示,收到了我的请求,让我检查后的PDF报告。 一段时间后,我将能够下载报告。 这是一个异步处理的例子 。
在本文中,我描述了 Rails 中一个简单的异步处理用例的实现。我有一个名为“ mahrasa”的示例应用程序,它是 Mahboob Rails 示例应用程序的缩写,我已经将代码集成到其中。
用户上传一个CSV文件的应用程序。 她得到一个消息说文件收到,正在处理。 显示一个链接,用户可以查看状态。 在后端,文件是异步处理和后处理状态页上的状态更新。
Rails 有许多支持异步处理的 gems,其中一些是 工作延迟、 Resque、 sidekiq 和延迟。
Delayed 是一个多线程、 SQL 驱动的 ActiveJob 后端,Betterment 使用它每天处理数百万个后台作业。
它支持 postgres、 mysql 和 sqlite,其设计目标是:
可靠,具有协同事务作业队列和保证,至少执行一次
可伸缩,具有优化的拾取查询和并发作业执行
弹性,内置的重试机制,指数后退,失败的工作保存
可维护,具有健壮的仪器、持续监视和基于优先级的警报
为什么延迟?
延迟的 gem 是两个延迟工作和延迟工作活动记录的目标分支,将它们组合成一个单独的库。它是为 Betterment 中那些有操作需求的应用程序设计的,包括从 Betterment 的代码库中提取的许多特性,例如:
通过并发 Ruby 执行多线程作业
一个高度优化的,基于 SKIP LOCKED 的拾取查询(对 postgres)
通过一个新的监测过程内置仪表和连续监测
命名的优先级范围,缺省值为: Interactive、 : user _ visible、 : final 和: report
基于优先级的作业年龄、运行时和尝试的警报阈值
一个实验性的自动缩放度量,用于水平自动缩放器(我们使用 Kubernetes)
使用延迟特定行为扩展 ActiveJob 的自定义适配器
让我在这里补充一个免责声明,我还没有验证所有的索赔,所以这篇文章不是一个背书,你应该与延迟在您的申请。
延迟安装的步骤非常简单:
将以下内容添加到你的 Gemfile:
gem 'delayed'$ rails generate delayed:migration rails db:migrateRuby
config.active_job.queue_adapter = :delayed
插入数据 PostgreSQL psql快得多。 然而,mahrasa使用 数据库SQLite3 。 相当于psql SQLite3本身。 在编码SQLite3的工作之前,我决定检查其他数据插入方法,时间和他们理解他们的相对性能。 的选项是:
对于每个选项,我写一个应用程序的工作,下面的细节:
这个工作是选项1的实现。 它读取输入CSV文件在一个循环中,每一行,它在数据库中调用插入一个行 创建 环球数码创意的方法模型。
这个工作是选项2的实现。 csvsql需要一个标题行和列名称。 我的CSV文件没有标题行。 因此,这项工作首先创建一个temp.csv文件第一行有列名,然后将整个输入CSV文件。 然后运行该工具csvsql复制文件到数据库中。 你可以在Python安装csvsql工具包 csvkit 。
壳牌
$ pip install csvkit这个工作是选择3的实现。 它与ActiveRecord-import批量插入数据的调用 输入 环球数码创意方法的模型类。
这个工作是实施选项4。 它执行一个系统中运行SQLite3传递给它一个shell脚本作为输入。 创建一个shell脚本 temp_table 和导入CSV文件输入数据。 然后插入的数据 temp_table 成 global_daily_cumulative 。 这个路由的数据通过 temp_table 负责在初选中自动生成id列,SQLite3不处理。
运行和测试这些工作的过程如下:
Ruby
AsyncOperation.where(id: job.arguments.first[:id]).update(:status => "processed")Ruby
$ rake delayed:workRuby
$ rails c brbr> ImportGdcJob2.perform_later 下面的截图显示的输出 ImportGdcJob2 分别在终端1和终端2。
自 ImportGdcJob 是一个逐行插入到数据库中,我知道这将是非常缓慢,所以我跑它只有1000行。 预期的订单的执行时间是:
工作 | 时间(秒) |
ImportGdcJob (只有1000行) | 144.53 |
ImportGdcJob2 | 58.43 |
ImportGdcJob3 | 6.02 |
ImportGdcJob4 | 4.91 |
因为它是最快的,第四个选项是更好的选择。 控制器的工作被称为作为一个异步操作,如以下代码所示:
Ruby
def importbr # copy uploaded file app/jobs directorybr FileUtils.cp(File.new(params[:csvfile].tempfile),br "#{Rails.root}/app/jobs/global_daily_cumulative.csv")brbr # insert async_operations rowbr @filename = params[:csvfile].original_filenamebr @ao = AsyncOperation.new(:op_type => 'Import CSV',br :filename => @filename,br :status => :enqueued)brbr # enqueue the jobbr if @ao.savebr ImportGdcJob4.perform_later(id: @ao.id)br endbr br render :ackbrend视图显示一个链接来检查数据插入的状态工作。
$ git clone https://github.com/mh-github/mahrasa.git -b delayed1$ cd mahrasa$ rvm install 3.1.2$ rvm use 3.1.2Ruby
$ bundle install壳牌
$ bin/rails db:migrate RAILS_ENV=developmentSQL
CREATE TABLE "global_daily_cumulative" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, "date" TEXT, "place" TEXT, "confirmed" INTEGER, "deaths" INTEGER, "recovered" INTEGER);壳牌
$ rails s$ cd mahrasa$ rake delayed:work视图,当该工作队列:
视图处理后的工作:
你可以检查数据库中的CSV文件行数是一样的表中的记录计数。
sqlite> select count(*) from global_daily_cumulative;158987“延迟”这个词有一个不幸的负面含义。当我第一次听到“延迟作业”这个词的时候,我认为这些作业是缓慢而低效的作业,它们受到低效代码的影响,必须在服务器/数据库级进行调优,甚至需要检查代码。后来,我意识到它们实际上是什么。这些只是异步执行的对象,“延迟”这个词被用作形容词,因为它们使用的库名为“ delayed_job”
使用可用的 gem 并将它们计时到示例工作负载。可能发生的情况是,它们之间的速度差异并不那么关键。对于真正的大容量处理,您可能必须首先使用 RabbitMQ,然后使用 ApacheKafka。
| 留言与评论(共有 0 条评论) “” |