Обработка запросов json на Ruby + Ractor + Kafka
Довелось как-то пообщаться на Хабре с одним умником, который возомнил себя... уж не знаю кем возмнил, но сыпал терминами "GVL (GIL) Ruby", "CPU-bound", “IO-bound”. Таких "знатоков", к сожалению, полно, но это может произвести только лишь первое впечатление на незнающего работодателя. В общем, мне был брошен типа вызов:
Ну, давайте с относительно простой задачки начнём. Напишите тривиальный коллектор событий с фронта. Одна ручка, на которую можно прислать JSON (ориентировочно размером от 1 до 50 кб) и эти события надо сложить в Кафку с полным акком. Ориентировочная нагрузка: 10k событий в секунду.
Сомнения возникли в реальности такого механизма хотя бы в том, что уж больно какие-то нереальные (по нынешним временам - август 2025) нагрузки - обработка 10K x 50K ~= 500Mb/s. Пусть это так и останется чисто гипотетическим предположением о продобных нагрузках. И тем не менее, код сделан как раз с учётом больших нагрузок - Ractor, Kafka.
В общем, в результате изначально получился у меня вот такой код, но рабочий вариант чуть ниже. В этом коде всё хорошо, но проблема в том, что rdkafka работает с переменными класса, что не позволяет ему корректно работать в параллельных изолированных потоках Ractor. Если когда-нибудь исправят в gem rdkafka, то это будет вполне работоспособный код.
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
# Конфиг Kafka
RDKAFKA_CONFIG = {
  'bootstrap.servers'      => BROKERS,
  'acks'                   => 'all',
  'queue.buffering.max.ms' => '5',
  'batch.num.messages'     => '10000'
}
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor-ов
worker_index = 0
worker_mutex = Mutex.new
workers = Array.new(WORKERS) do | i |
  Ractor.new(i, RDKAFKA_CONFIG, TOPIC) do | wid, config, topic |
    producer = Rdkafka::Config.new(config).producer
    loop do
      msg = Ractor.receive
      break if msg == :shutdown # завершение работы
      begin
        # Парсим JSON с символами в ключах
        data = JSON.parse(msg, symbolize_names: true)
        # Фильтрация только разрешённых ключей
        # т.к. это всего лишь тестовый пример
        data.slice!(:type, :name, :a, :b)
        # Анализируем сообщения
        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"
        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"
        else
          puts "[Ractor #{wid}] Unrecognized event"
        end
        # Асинхронная отправка в Kafka без блокировки
        producer.produce_async(topic: topic, payload: msg)
        producer.poll(0) # Обработка колбэков
      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"
      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
      end
    end
  end
end
# Выводим количество обработанных событий в секунду
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end
# HTTP endpoint для приёма событий
post '/events' do
  body = request.body.read
  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?
  # Считаем количество запросов в секунду
  rps_mutex.synchronize { rps_count += 1 }
  # Используем round-robin для выбора работника
  worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
  workers[worker_idx].send(body)
  status 202
  'Accepted'
end
# Завершаем работу всех Ractor'ов при выходе из программы
at_exit do
  puts "[Main] Shutting down..."
  workers.each { | w | w.send(:shutdown) }
  workers.each(&:take) # ждём завершения
  puts "[Main] All workers stopped."
end
Замена rdkafka на ruby-kafka
Вполне рабочий вариант, но, вероятно, будет работать медленнее, но это компенсируется использованием Ractor и YJIT в Ruby. Используется gem ruby-kafka вместо gem rdkafka.
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'kafka' # используем gem ruby-kafka
require 'json'
require 'etc'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = ['localhost:9092'] # ruby-kafka ожидает массив
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor'ов
worker_index = 0
worker_mutex = Mutex.new
workers = Array.new(WORKERS) do | i |
  Ractor.new(i, BROKERS, TOPIC) do | wid, brokers, topic |
    # Создаём Kafka-клиент и producer внутри Ractor'а
    kafka = Kafka.new(
      seed_brokers: brokers,
      client_id: "collector_worker_#{wid}"
    )
    # Синхронный продюсер (для полного контроля и ACK=all)
    producer = kafka.sync_producer(
      acks: :all,
      max_retries: 3,
      retry_backoff: 100
    )
    loop do
      msg = Ractor.receive
      break if msg == :shutdown
      begin
        # Парсим JSON с символами в ключах
        data = JSON.parse(msg, symbolize_names: true)
        # Фильтрация только разрешённых ключей
        data.slice!(:type, :name, :a, :b)
        # Анализируем сообщения
        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"
        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"
        else
          puts "[Ractor #{wid}] Unrecognized event"
        end
        # Синхронная отправка (для гарантии ACK=all)
        # Можно использовать async_producer, но sync проще для контроля
        producer.deliver_message(msg, topic: topic)
      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"
      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
      ensure
        # Важно: закрыть продюсер при выходе
        producer&.close
      end
    end
  end
end
# RPS-мониторинг
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end
# HTTP endpoint для приёма событий
post '/events' do
  body = request.body.read
  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?
  # Считаем RPS
  rps_mutex.synchronize { rps_count += 1 }
  # Round-robin выбор Ractor'а
  worker_idx = worker_mutex.synchronize { (worker_index += 1) % WORKERS }
  workers[worker_idx].send(body)
  status 202
  'Accepted'
end
# Завершение работы
at_exit do
  puts "[Main] Shutting down..."
  # Остановить Ractor'ы
  workers.each { | w | w.send(:shutdown) }
  workers.each(&:take)
  puts "[Main] All workers stopped."
end
Рабочий вариант
...но оказалось, что rdkafka использует переменные класса (что само по себе является небезопасной практикой - даже Matz упоминал об их нежелательном использовании в Ruby), поэтому было принято решение вынести работу с rdkafka в основной поток:
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
require 'sinatra'
require 'rdkafka'
require 'json'
require 'etc'
require 'thread'
require 'mutex_m'
set :bind, '0.0.0.0'
set :port, 4567
BROKERS = 'localhost:9092'
TOPIC = 'events'
WORKERS = Etc.nprocessors
MAX_SIZE = 60_000
MAX_QUEUE_SIZE = 10_000
# Конфиг Kafka
RDKAFKA_CONFIG = {
  'bootstrap.servers'      => BROKERS,
  'acks'                   => 'all',
  'queue.buffering.max.ms' => '5',
  'batch.num.messages'     => '10000'
}
# Первая очередь: для передачи событий в Ractor'ы (backpressure)
input_queue = SizedQueue.new(MAX_QUEUE_SIZE)
# Вторая очередь: для отправки обработанных данных в Kafka
events_queue = SizedQueue.new(MAX_QUEUE_SIZE)
# Главный producer (в main потоке)
producer = Rdkafka::Config.new(RDKAFKA_CONFIG).producer
# Запускаем фоновую отправку из main потока
# Поток отправки сообщений в Kafka (решает проблему Ractor::IsolationError в rdkafka)
# потому как rdkafka какого-то чёрта использует переменные класса, хотя давно
# всем известно, что их использование не рекомендует даже Matz
# поэтому работа с kafka будет производится в главном потоке
Thread.new do
  loop do
    msg = events_queue.pop
    break if msg == :flush
    begin
      producer.produce_async(topic: TOPIC, payload: msg)
      producer.poll(0)
    rescue => e
      warn "[Kafka Thread] Error: #{e.class} #{e.message}"
    end
  end
end
# Счётчик обработанных сообщений
rps_count = 0
rps_mutex = Mutex.new
# Пул Ractor'ов: читают из input_queue, обрабатывают, отправляют в events_queue
workers = Array.new(WORKERS) do | i |
  Ractor.new(i, input_queue, events_queue) do | wid, input, output |
    loop do
      msg = input.pop # читаем из очереди ввода
      break if msg == :shutdown
      begin
        data = JSON.parse(msg, symbolize_names: true)
        data.slice!(:type, :name, :a, :b)
        case data
        in { type: 'greeting', name: String => name }
          puts "[Ractor #{wid}] Greeting for #{name}"
        in { type: 'sum', a: Integer => a, b: Integer => b }
          puts "[Ractor #{wid}] Sum = #{a + b}"
        else
          puts "[Ractor #{wid}] Unrecognized event"
        end
        output << msg # в очередь для отправки в kafka
      rescue JSON::ParserError
        warn "[Ractor #{wid}] Invalid JSON"
      rescue => e
        warn "[Ractor #{wid}] Error: #{e.class} #{e.message}"
      end
    end
  end
end
# RPS-мониторинг
Thread.new do
  loop do
    sleep 1
    count = rps_mutex.synchronize { rps_count.tap { rps_count = 0 } }
    puts "[RPS] #{count} events/sec"
  end
end
# HTTP endpoint для приёма событий
post '/events' do
  body = request.body.read
  halt 413, 'Payload too large' if body.bytesize > MAX_SIZE
  halt 400, 'Empty payload' if body.empty?
  rps_mutex.synchronize { rps_count += 1 }
  begin
    # контроль переполнения
    input_queue.push(body, exception: true)
  rescue ThreadError
    halt 429, 'Too many requests'
  end
  status 202
  'Accepted'
end
# Завершение работы
at_exit do
  puts "[Main] Shutting down..."
  # 1. Отправить сигнал завершения всем Ractor'ам
  workers.each { | worker | worker.send(:shutdown) }
  # 2. Дождаться завершения всех Ractor'ов
  workers.each(&:take)
  # 3. Отправить сигнал завершения потоку Kafka
  events_queue << :flush
  # 4. Дождаться отправки всех сообщений
  producer&.flush(5000)
  puts "[Main] All workers stopped."
end
И в дополнение к нему ещё и тест нагрузки:
# encoding: utf-8
# frozen_string_literal: true
#
# @author ESV Corp. © 08.2025
# test_load.rb
require 'net/http'
require 'json'
require 'uri'
require 'benchmark'
# Конфигурация теста
URL = URI.parse('http://localhost:4567/events') # Указываем правильный адрес сервера
EVENT_SIZE = 2000                               # Размер тела запроса (в байтах) - регулируем по нуждам
TOTAL_REQUESTS = 10_000                         # Количество запросов для теста
CONCURRENCY = 100                               # Число потоков, которые будут посылать запросы
# Сгенерируем случайный JSON для запроса
def generate_random_event
  {
    type: 'greeting',
    name: ('a'..'z').to_a.sample(10).join
  }.to_json
end
# Функция для отправки одного запроса
def send_request(event)
  http = Net::HTTP.new(URL.host, URL.port)
  request = Net::HTTP::Post.new(URL.path, {'Content-Type' => 'application/json'})
  request.body = event
  http.request(request)
end
# Тест нагрузки
Benchmark.bm do | x |
  x.report("load test:") do
    # Используем потоки для параллельной отправки запросов
    threads = Array.new(CONCURRENCY) do
      Thread.new do
        TOTAL_REQUESTS / CONCURRENCY.times do
          event = generate_random_event
          send_request(event)
        end
      end
    end
    threads.each(&:join) # Ждём завершения всех потоков
  end
end
Этот код – просто пример использвания Ruby с Ractor (параллельная обработка) и Kafka, но, мало ли, вдруг кому и пригодится для реальных проектов. Рекомендую использовать Ruby с YJIT для более быстрого исполнения программы.