Transcribing with Artificial Intelligence

Episode #395 by Teacher's Avatar David Kimura

Summary

In this episode, we look at creating an audio transcription service which allows files uploaded from Active Storage to be transcribed with Artificial Intelligence. However, there are a lot of considerations around the approach from both a performance and thread safety perspectives.
rails artificial intelligence turbo background processing 36:25

Chapters

  • Introduction (0:00)
  • Setting up the basic Rails app (4:21)
  • Creating a background job (5:53)
  • Setting up Sidekiq (9:52)
  • Creating the Transcriber service (Ruby) (12:07)
  • Quick overview of the flow (17:46)
  • Installing Python and dependencies (19:15)
  • Creating the Transcriber service (Python) (21:21)
  • Demo (29:04)
  • Real time updates with Turbo (29:46)
  • Demo where no model cache exists (33:39)
  • Final thoughts (35:16)

Resources

Source - https://github.com/driftingruby/395-transcribing-with-artificial-intelligence

This episode is sponsored by Honeybadger

Update: Check out https://www.driftingruby.com/episodes/text-to-image-with-machine-learning for a much better approach to making the python script thread safe and more stable.
Download Source Code

Summary

# Terminal
bin/rails g scaffold projects name transcription:text status:integer
bin/rails active_storage:install
bin/rails g job ProcessTranscription
bundle add sidekiq

asdf plugin add python
asdf install python 3.10.9
brew install ffmpeg
pip install torch openai-whisper

# db/migrate/20230402012647_create_projects.rb
t.integer :status, default: 0

# app/models/project.rb
class Project < ApplicationRecord
  has_one_attached :file

  enum status: {
    pending: 0,
    processing: 1,
    failed: 2,
    completed: 3
  }

  broadcasts
end

# app/views/projects/_form.html.erb
<div class="mb-3">
  <%= form.label :file, class: 'form-label' %>
  <%= form.file_field :file, class: 'form-control' %>
</div>

# app/controllers/projects_controller.rb
def create
  @project = Project.new(project_params)
  if @project.save
    ProcessTranscriptionJob.perform_later(@project.id)
    redirect_to @project, notice: "Project was successfully created."
  else
    render :new, status: :unprocessable_entity
  end
end
def update
  if @project.update(project_params)
    @project.pending!
    ProcessTranscriptionJob.perform_later(@project.id)
    redirect_to @project, notice: "Project was successfully updated."
  else
    render :edit, status: :unprocessable_entity
  end
end

def project_params
  params.require(:project).permit(:name, :transcription, :file)
end

# app/jobs/process_transcription_job.rb
class ProcessTranscriptionJob < ApplicationJob
  queue_as :transcriber

  def perform(project_id)
    return unless project = Project.find_by(id: project_id)
    return unless project.file.attached?
    return unless project.pending? || project.failed?

    project.processing!
    File.binwrite(Rails.root.join("tmp", project_id.to_s), project.file.download)

    transcription = TRANSCRIBER.transcribe_audio(Rails.root.join("tmp", project_id.to_s))
    if transcription && project.update(transcription: transcription)
      project.completed!
    else
      project.failed!
      ProcessTranscriptionJob.set(wait: 10.seconds).perform_later(project_id)
    end
  rescue Transcriber::NotAvailable
    project.failed!
    ProcessTranscriptionJob.set(wait: 10.seconds).perform_later(project_id)
  ensure
    FileUtils.rm_rf(Rails.root.join("tmp", project_id.to_s))
  end
end

# Procfile.dev
web: unset PORT && bin/rails server
worker: sidekiq -C config/sidekiq_default.yml
transcriber: WHISPER=true sidekiq -C config/sidekiq_transcriber.yml
js: yarn build --watch
css: yarn build:css --watch

# config/sidekiq_default.yml
:concurrency: 4
:queues:
  - default

# config/sidekiq_transcriber.yml
:concurrency: 1
:queues:
  - transcriber

# config/application.rb
config.active_job.queue_adapter = :sidekiq

# config/initializers/transcriber.rb
require "open3"

class Transcriber
  class NotAvailable < StandardError; end

  def initialize
    return unless ENV["WHISPER"] == 'true'
    @stdin, @stdout, @stderr, @wait_thr = Open3.popen3("python -u #{Rails.root.join("lib", "main.py")}")
  end

  def transcribe_audio(audio_file)
    raise Transcriber::NotAvailable unless @stdin
    @stdin.puts(audio_file)
    output = ""
    while line = @stdout.gets
      break if line.strip == "___TRANSCRIPTION_END___"
      output += line
    end
    output.strip
  rescue Errno::EPIPE
    @stdin, @stdout, @stderr, @wait_thr = Open3.popen3("python -u #{Rails.root.join("lib", "main.py")}")
    retry
  end
end

TRANSCRIBER = Transcriber.new

# lib/main.py
import sys
import torch
import whisper
import warnings
from torch.multiprocessing import Process, Queue

warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = whisper.load_model("small.en").to(device)

def transcribe_audio(audio_file, output_queue):
  result = model.transcribe(audio_file)
  output_queue.put(result["text"])

def start_model_server(input_queue, output_queue):
  while True:
    audio_file = input_queue.get()
    if audio_file is None:
      break
    transcribe_audio(audio_file, output_queue)

if __name__ == "__main__":
  if torch.multiprocessing.get_start_method(allow_none=True) is None:
    torch.multiprocessing.set_start_method("spawn")

  input_queue = Queue()
  output_queue = Queue()
  model_process = Process(target=start_model_server, args=(input_queue, output_queue))
  model_process.start()

  while True:
    input_file = sys.stdin.readline().strip()
    if not input_file or input_file == "exit":
      input_queue.put(None)
      break
    input_queue.put(input_file)
    print(output_queue.get(), flush=True)
    print("___TRANSCRIPTION_END___", flush=True)

# app/views/projects/_project.html.erb
<%= turbo_stream_from project %>

<div id="<%= dom_id project %>" class="scaffold_record">

  <div class="container my-5">
    <div class="progress" style="height: 40px; border-radius: 20px; overflow: hidden;">
      <div class="progress-bar <%= ["pending", "processing", "completed"].include?(project.status) ? "bg-success" : "bg-secondary" %>" role="progressbar" style="width: 25%">
        <span class="d-flex justify-content-center align-items-center h-100 text-white fw-bold">Pending</span>
      </div>
      <div class="progress-bar <%= ["processing", "completed"].include?(project.status) ? "bg-success" : "bg-secondary" %>" role="progressbar" style="width: 50%">
        <span class="d-flex justify-content-center align-items-center h-100 text-white fw-bold">Processing</span>
      </div>
      <% if project.failed? %>
        <div class="progress-bar bg-danger" role="progressbar" style="width: 25%">
          <span class="d-flex justify-content-center align-items-center h-100 text-white fw-bold">Failed</span>
        </div>
      <% else %>
        <div class="progress-bar <%= ["completed"].include?(project.status) ? "bg-success" : "bg-secondary" %>" role="progressbar" style="width: 25%">
          <span class="d-flex justify-content-center align-items-center h-100 text-white fw-bold">Completed</span>
        </div>
      <% end %>
    </div>
  </div>

  <p>
    <strong>Name:</strong>
    <%= project.name %>
  </p>

  <p>
    <strong>Transcription:</strong>
    <%= project.transcription %>
  </p>

  <p>
    <strong>Status:</strong>
    <%= project.status %>
  </p>

</div>