Conditional Queue

Episode #498 by Teacher's Avatar David Kimura

Summary

In this episode we look at processing background jobs in a way that can conditionally send the work to be processed to different queues. This can be very powerful in situations where you need to process the job in different ways or send requests to different APIs.
rails background processing solid queue 18:23

Chapters

  • Introduction (0:00)
  • Configuring the database and Solid Queue (3:48)
  • Starting with the view and controller (4:54)
  • Moving logic to the background job (6:41)
  • Creating different queues (7:37)
  • Getting rid of magic numbers (9:34)
  • Switching between queues (10:21)
  • Testing out the queue switching (11:09)
  • Organizing logic (12:42)
  • Demo (14:58)
  • Final thoughts (17:03)

Resources

Source code - https://github.com/driftingruby/498-conditional-queue

Honeybadger is a performance monitoring and error tracking tool that combines the best monitoring features into one simple interface that works with all the frameworks you use and comes with fantastic support from a small team of passionate developers. With error tracking, performance and uptime monitoring, log management, dashboards, and more, Honeybadger has everything you need to gain real-time insights into the health of your applications.Keep your apps healthy and your customers happy with Honeybadger! It’s free to get started, and setup takes less than five minutes. Start monitoring today β†’
Download Source Code

Summary

# Terminal
rails g job expensive_background_job
mkdir app/models/gpus
touch app/models/gpus/processor.rb
touch app/models/gpus/external_processor.rb
bin/dev | grep -E "βœ…|🏁"

# config/database.yml
default: &default
  adapter: sqlite3
  pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 50 } %>
  timeout: 5000

development:
  primary:
    <<: *default
    database: storage/development.sqlite3
  queue:
    <<: *default
    database: storage/production_queue.sqlite3
    migrations_paths: db/queue_migrate

# config/puma.rb
# Run the Solid Queue supervisor inside of Puma for single-server deployments
plugin :solid_queue

# config/environments/development.rb
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

# app/views/welcome/index.html.erb
<%= link_to "Queue 1 Job", root_path(jobs: 1), "data-turbo-prefetch": false %>
<%= link_to "Queue 10 Jobs", root_path(jobs: 10), "data-turbo-prefetch": false %>

# app/controllers/welcome_controller.rb
class WelcomeController < ApplicationController
  def index
    if params[:jobs]
      params[:jobs].to_i.times { ExpensiveBackgroundJob.perform_later(rand) }
    end
  end
end

# app/jobs/expensive_background_job.rb
class ExpensiveBackgroundJob < ApplicationJob
  queue_as do
    gpu_queue_full? ? :gpuexternal : :gpu
  end

  def perform(some_record)
    if self.queue_name == "gpu"
      Gpus::Processor.execute(some_record)
    elsif self.queue_name == "gpuexternal"
      Gpus::ExternalProcessor.execute(some_record)
    end
  end
end

# app/jobs/application_job.rb
class ApplicationJob < ActiveJob::Base
  # Automatically retry jobs that encountered a deadlock
  # retry_on ActiveRecord::Deadlocked

  # Most jobs are safe to ignore if the underlying records are no longer available
  # discard_on ActiveJob::DeserializationError

  def gpu_queue_full?
    active_jobs = SolidQueue::ClaimedExecution
      .joins(:job)
      .where(solid_queue_jobs: { queue_name: "gpu" })
      .count

    active_jobs >= GPU_QUEUE_SIZE
  end
end

# config/queue.yml
default: &default
  dispatchers:
    - polling_interval: 1
      batch_size: 500
  workers:
    - queues: ["default"]
      threads: 3
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1
    - queues: ["gpu"]
      threads: <%= GPU_QUEUE_SIZE %>
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1
    - queues: ["gpuexternal"]
      threads: 15
      processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
      polling_interval: 0.1

# config/initializers/solid_queue_variables.rb
GPU_QUEUE_SIZE = 3

# app/models/gpus/base.rb
class Gpus::Base
end

# app/models/gpus/processor.rb
class Gpus::Processor < Gpus::Base
  def self.execute(some_record)
    Rails.logger.info "βœ… gpu #{some_record}"
    sleep 5
    Rails.logger.info "🏁 gpu #{some_record}"
  end
end

# app/models/gpus/external_processor.rb
class Gpus::ExternalProcessor < Gpus::Base
  def self.execute(some_record)
    Rails.logger.info "βœ… gpuexternal #{some_record}"
    sleep 5
    Rails.logger.info "🏁 gpuexternal #{some_record}"
  end
end