Rails 异步处理

学习一种简单的异步处理用例的实现在Rails中利用一个示例应用程序中集成代码。

每天‬分享‬最新‬软件‬开发‬,Devops,敏捷‬,测试‬以及‬项目‬管理‬最新‬,最热门‬的‬文章‬,每天‬花‬3分钟‬学习‬何乐而不为‬,希望‬大家‬点赞‬,评论,加‬关注‬,你的‬支持‬是我‬最大‬的‬动力‬。下方抖音有我介绍自动化测试,以及google cloud 相关视频课程,欢迎观看。



当我登录到我的银行账户,希望我所有的账户交易的报告说,六个月或者一年,web应用程序表示,收到了我的请求,让我检查后的PDF报告。 一段时间后,我将能够下载报告。 这是一个异步处理的例子 。

在本文中,我描述了 Rails 中一个简单的异步处理用例的实现。我有一个名为“ mahrasa”的示例应用程序,它是 Mahboob Rails 示例应用程序的缩写,我已经将代码集成到其中。


用例

用户上传一个CSV文件的应用程序。 她得到一个消息说文件收到,正在处理。 显示一个链接,用户可以查看状态。 在后端,文件是异步处理和后处理状态页上的状态更新。

设计

Rails 有许多支持异步处理的 gems,其中一些是  工作延迟、 Resque、 sidekiq 和延迟。

Delayed 是一个多线程、 SQL 驱动的 ActiveJob 后端,Betterment 使用它每天处理数百万个后台作业。

它支持 postgres、 mysql 和 sqlite,其设计目标是:

可靠,具有协同事务作业队列和保证,至少执行一次

可伸缩,具有优化的拾取查询和并发作业执行

弹性,内置的重试机制,指数后退,失败的工作保存

可维护,具有健壮的仪器、持续监视和基于优先级的警报

为什么延迟?

延迟的 gem 是两个延迟工作和延迟工作活动记录的目标分支,将它们组合成一个单独的库。它是为 Betterment 中那些有操作需求的应用程序设计的,包括从 Betterment 的代码库中提取的许多特性,例如:

通过并发 Ruby 执行多线程作业

一个高度优化的,基于 SKIP LOCKED 的拾取查询(对 postgres)

通过一个新的监测过程内置仪表和连续监测

命名的优先级范围,缺省值为: Interactive、 : user _ visible、 : final 和: report

基于优先级的作业年龄、运行时和尝试的警报阈值

一个实验性的自动缩放度量,用于水平自动缩放器(我们使用 Kubernetes)

使用延迟特定行为扩展 ActiveJob 的自定义适配器


让我在这里补充一个免责声明,我还没有验证所有的索赔,所以这篇文章不是一个背书,你应该与延迟在您的申请。


延迟安装的步骤非常简单:

将以下内容添加到你的 Gemfile:

gem 'delayed'


  • 运行 包安装
  • 创建表 delayed_job
$ rails generate delayed:migration rails db:migrate


  • 添加以下行来配置/ application.rb:

Ruby

config.active_job.queue_adapter = :delayed 


插入数据 PostgreSQL psql快得多。 然而,mahrasa使用 数据库SQLite3 。 相当于psql SQLite3本身。 在编码SQLite3的工作之前,我决定检查其他数据插入方法,时间和他们理解他们的相对性能。 的选项是:

  1. 逐行插入数据。
  2. 使用csvsql复制文件到数据库中。
  3. 批量插入的行 ActiveRecord 进口。
  4. 使用SQLite3复制文件到数据库中。

对于每个选项,我写一个应用程序的工作,下面的细节:

ImportGdcJob

这个工作是选项1的实现。 它读取输入CSV文件在一个循环中,每一行,它在数据库中调用插入一个行 创建 环球数码创意的方法模型。

ImportGdcJob2

这个工作是选项2的实现。 csvsql需要一个标题行和列名称。 我的CSV文件没有标题行。 因此,这项工作首先创建一个temp.csv文件第一行有列名,然后将整个输入CSV文件。 然后运行该工具csvsql复制文件到数据库中。 你可以在Python安装csvsql工具包 csvkit

壳牌

$ pip install csvkit

ImportGdcJob3

这个工作是选择3的实现。 它与ActiveRecord-import批量插入数据的调用 输入 环球数码创意方法的模型类。

ImportGdcJob4

这个工作是实施选项4。 它执行一个系统中运行SQLite3传递给它一个shell脚本作为输入。 创建一个shell脚本 temp_table 和导入CSV文件输入数据。 然后插入的数据 temp_table global_daily_cumulative 。 这个路由的数据通过 temp_table 负责在初选中自动生成id列,SQLite3不处理。

运行和测试这些工作的过程如下:

  • 运行的步骤,创建表 如何运行 部分。
  • 在ImportGdcJob4。 rb,评论:

Ruby

AsyncOperation.where(id: job.arguments.first[:id]).update(:status => "processed")


  • 在一个终端,开始 工作:

Ruby

$ rake delayed:work


  • 在另一个终端,启动Rails控制台和调用工作的类名。

Ruby

$ rails c brbr> ImportGdcJob2.perform_later 


下面的截图显示的输出 ImportGdcJob2 分别在终端1和终端2。

ImportGdcJob 是一个逐行插入到数据库中,我知道这将是非常缓慢,所以我跑它只有1000行。 预期的订单的执行时间是:

工作

时间(秒)

ImportGdcJob

(只有1000行)

144.53

ImportGdcJob2

58.43

ImportGdcJob3

6.02

ImportGdcJob4

4.91

集成到Rails

因为它是最快的,第四个选项是更好的选择。 控制器的工作被称为作为一个异步操作,如以下代码所示:

Ruby

def importbr    # copy uploaded file app/jobs directorybr    FileUtils.cp(File.new(params[:csvfile].tempfile),br                 "#{Rails.root}/app/jobs/global_daily_cumulative.csv")brbr    # insert async_operations rowbr    @filename = params[:csvfile].original_filenamebr    @ao       = AsyncOperation.new(:op_type => 'Import CSV',br                                   :filename => @filename,br                                   :status => :enqueued)brbr    # enqueue the jobbr    if @ao.savebr        ImportGdcJob4.perform_later(id: @ao.id)br    endbr    br    render :ackbrend


视图显示一个链接来检查数据插入的状态工作。

如何运行

  • 我克隆存储库:
$ git clone https://github.com/mh-github/mahrasa.git -b delayed1


  • 进入项目文件夹:
$ cd mahrasa


  • 确保您安装了Ruby 3.1.2并使用它。
$ rvm install 3.1.2$ rvm use 3.1.2


  • 安装的gem:

Ruby

$ bundle install


  • 运行 数据库迁移 :

壳牌

$ bin/rails db:migrate RAILS_ENV=development


  • 创建表:执行以下命令从数据库SQLite3提示或像SQLite浏览器或IDE DBeaver 。

SQL

CREATE TABLE "global_daily_cumulative" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, "date" TEXT, "place" TEXT, "confirmed" INTEGER, "deaths" INTEGER, "recovered" INTEGER);


  • 取消注释行:如果你有注释行# 7 ImportGdc4。 rb测试工作,取消它。
  • 运行服务器:

壳牌

$ rails s


  • 在另一个终端,开始 rake命令。
$ cd mahrasa$ rake delayed:work


  • 在浏览器中访问该应用程序在http://localhost: 3000。
  • 点击链接“上传global_daily_cumulative.csv。”
  • 单击按钮“选择文件”。
  • 在文件浏览器,导航到文件夹中 mahrasa /测试 并选择文件 global_daily_cumulative.csv
  • 您将看到一条消息,文件收到了,给一个链接来检查工作的状态。 如果你点击链接进入状态页,知道工作的现状。

视图,当该工作队列:

视图处理后的工作:

你可以检查数据库中的CSV文件行数是一样的表中的记录计数。

sqlite> select count(*) from global_daily_cumulative;158987


最终的想法

“延迟”这个词有一个不幸的负面含义。当我第一次听到“延迟作业”这个词的时候,我认为这些作业是缓慢而低效的作业,它们受到低效代码的影响,必须在服务器/数据库级进行调优,甚至需要检查代码。后来,我意识到它们实际上是什么。这些只是异步执行的对象,“延迟”这个词被用作形容词,因为它们使用的库名为“ delayed_job

使用可用的 gem 并将它们计时到示例工作负载。可能发生的情况是,它们之间的速度差异并不那么关键。对于真正的大容量处理,您可能必须首先使用 RabbitMQ,然后使用 ApacheKafka。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章