-
Notifications
You must be signed in to change notification settings - Fork 423
Rails Integration for OpenSearch Profiling
Thomas Witt edited this page Aug 13, 2025
·
1 revision
Inspired by the riak integration, I wanted to have better overviews for my non-Activerecord app which uses opensearch about what queries are actually being issued:
# config/initializers/opensearch_profiler.rb
# Instrument OpenSearch::Client calls so they appear in Rack::MiniProfiler's SQL Summary.
# Works with both standard OpenSearch::Client and OpenSearch::Aws::Sigv4Client.
begin
require 'rack-mini-profiler'
rescue LoadError
# Rack::MiniProfiler not present; no-op.
end
begin
require 'opensearch'
rescue LoadError
# OpenSearch not present; no-op.
end
require 'json'
if defined?(::Rack::MiniProfiler) && defined?(::OpenSearch)
Rails.application.config.after_initialize do
# Avoid double-prepend on reload
module RackMiniProfilerOpenSearch
module Instrumentation
# Intercept perform_request to profile OpenSearch operations
#
# @param method [String] HTTP method (GET, POST, PUT, DELETE)
# @param path [String] Request path (e.g., "/_search", "/index/_doc/123")
# @param params [Hash] Query parameters
# @param body [Hash, String, nil] Request body
# @param headers [Hash, nil] Request headers
# @return [OpenSearch::Transport::Transport::Response]
def perform_request(method, path, params = {}, body = nil, headers = nil)
return super unless mp_should_measure?
# Parse operation type and details from path and body
operation_label = build_opensearch_label(method, path, params, body)
# Wrap in a step for the timeline view AND record SQL timing
::Rack::MiniProfiler.step("OpenSearch: #{operation_label}") do
start_mono = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# Execute the actual OpenSearch request
response = super
elapsed_ms = mp_elapsed_ms(start_mono)
# Extract response metrics
response_info = extract_response_info(response, path)
# Build the final label with response metrics
sql_text = "#{operation_label} #{response_info}".strip
# Record as SQL timing (shows in SQL Summary)
begin
::Rack::MiniProfiler.record_sql(sql_text, elapsed_ms, nil)
rescue => e
Rails.logger.debug { "OpenSearch profiling record_sql failed: #{e.class}: #{e.message}" }
end
response
end
rescue => e
# Preserve original exceptions from OpenSearch
raise e
end
private
# True if MiniProfiler is currently measuring this request.
#
# @return [Boolean]
def mp_should_measure?
c = ::Rack::MiniProfiler.current
!!(c && c.measure && c.current_timer)
end
# Converts a monotonic start time to elapsed milliseconds.
#
# @param start_mono [Float] Monotonic clock reading captured before the operation.
# @return [Float] Elapsed time in milliseconds with 0.1ms precision.
def mp_elapsed_ms(start_mono)
((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_mono) * 1000.0).round(1)
end
# Builds a concise, informative label for the OpenSearch operation.
#
# @param method [String] HTTP method
# @param path [String] Request path
# @param params [Hash] Query parameters
# @param body [Hash, String, nil] Request body
# @return [String]
def build_opensearch_label(method, path, params, body)
# Extract index name from path
index_name = extract_index_name(path)
# Determine operation type
operation = detect_operation(method, path)
# Build base label
label = operation.to_s
label += " [#{index_name}]" if index_name && index_name != '_all'
# Add operation-specific details
case operation
when :search
label += extract_search_details(body, params)
when :multi_search, :msearch
label += extract_multi_search_details(body)
when :bulk
label += extract_bulk_details(body)
when :index, :create, :update
doc_id = extract_doc_id(path)
label += " id:#{truncate_value(doc_id, 20)}" if doc_id
when :delete_by_query
label += extract_delete_by_query_details(body)
end
label
rescue => e
Rails.logger.debug { "OpenSearch profiling label build failed: #{e.class}: #{e.message}" }
"#{method} #{path}"
end
# Extract index name from the request path
#
# @param path [String]
# @return [String, nil]
def extract_index_name(path)
# Common patterns:
# /index_name/_search
# /index_name/_doc/id
# /index_name/_bulk
# /_search (all indices)
return nil if path.start_with?('/_')
parts = path.split('/')
parts[1] if parts[1] && !parts[1].start_with?('_')
end
# Detect the operation type from method and path
#
# @param method [String]
# @param path [String]
# @return [Symbol]
def detect_operation(method, path)
case path
when /_search$/
:search
when /_msearch$/
:multi_search
when /_bulk$/
:bulk
when /_delete_by_query$/
:delete_by_query
when /_update$/
:update
when /_create$/
:create
when /_refresh$/
:refresh
when /_count$/
:count
else
case method.upcase
when 'PUT', 'POST'
path.include?('/_doc/') || path.include?('/_create/') ? :index : :unknown
when 'DELETE'
:delete
when 'GET'
path.include?('/_doc/') ? :get : :unknown
else
:unknown
end
end
end
# Extract document ID from path
#
# @param path [String]
# @return [String, nil]
def extract_doc_id(path)
# Pattern: /index/_doc/ID or /index/_create/ID
if match = path.match(%r{/_(?:doc|create)/([^/?]+)})
match[1]
end
end
# Extract search query details
#
# @param body [Hash, String, nil]
# @param params [Hash]
# @return [String]
def extract_search_details(body, params)
return '' unless body
body_hash = parse_body(body)
return '' unless body_hash.is_a?(Hash)
details = []
# Extract query details
if query = body_hash['query'] || body_hash[:query]
query_desc = describe_query(query)
details << "query:#{query_desc}" if query_desc
end
# Extract aggregations
if aggs = body_hash['aggs'] || body_hash[:aggs] || body_hash['aggregations'] || body_hash[:aggregations]
agg_desc = describe_aggregations(aggs)
details << "aggs:#{agg_desc}" if agg_desc
end
# Extract size and from
size = body_hash['size'] || body_hash[:size]
from = body_hash['from'] || body_hash[:from]
details << "size:#{size}" if size
details << "from:#{from}" if from && from > 0
details.empty? ? '' : " (#{details.join(', ')})"
end
# Describe a query structure concisely
#
# @param query [Hash]
# @return [String, nil]
def describe_query(query)
return nil unless query.is_a?(Hash)
# Handle common query types
if query['match'] || query[:match]
field_value = (query['match'] || query[:match]).to_a.first
return "{match:{#{field_value[0]}:#{truncate_value(field_value[1].to_s, 20)}}}" if field_value
end
if query['term'] || query[:term]
field_value = (query['term'] || query[:term]).to_a.first
return "{term:{#{field_value[0]}:#{truncate_value(field_value[1].to_s, 20)}}}" if field_value
end
if query['bool'] || query[:bool]
bool_query = query['bool'] || query[:bool]
clauses = []
clauses << 'must' if bool_query['must'] || bool_query[:must]
clauses << 'should' if bool_query['should'] || bool_query[:should]
clauses << 'filter' if bool_query['filter'] || bool_query[:filter]
clauses << 'must_not' if bool_query['must_not'] || bool_query[:must_not]
return "{bool:[#{clauses.join(',')}]}" unless clauses.empty?
end
if query['query_string'] || query[:query_string]
qs = query['query_string'] || query[:query_string]
q = qs['query'] || qs[:query]
return "{query_string:#{truncate_value(q.to_s, 30)}}" if q
end
if query['range'] || query[:range]
field = (query['range'] || query[:range]).keys.first
return "{range:#{field}}" if field
end
# For other query types, just show the type
query_type = query.keys.first
"{#{query_type}}"
rescue
nil
end
# Describe aggregations concisely
#
# @param aggs [Hash]
# @return [String, nil]
def describe_aggregations(aggs)
return nil unless aggs.is_a?(Hash)
agg_types = []
aggs.each do |_name, agg_def|
next unless agg_def.is_a?(Hash)
# Find the aggregation type (first key that's not 'aggs' or 'aggregations')
agg_type = agg_def.keys.find { |k| !['aggs', 'aggregations', :aggs, :aggregations].include?(k) }
if agg_type
if agg_type.to_s == 'terms' && (field = agg_def[agg_type]['field'] || agg_def[agg_type][:field])
agg_types << "terms:#{field}"
else
agg_types << agg_type.to_s
end
end
end
agg_types.empty? ? nil : "{#{agg_types.join(',')}}"
rescue
nil
end
# Extract multi-search details
#
# @param body [String, Array, nil]
# @return [String]
def extract_multi_search_details(body)
return '' unless body
# Multi-search body is newline-delimited JSON
if body.is_a?(String)
lines = body.split("\n").reject(&:empty?)
# Every pair of lines is a search (header + body)
query_count = lines.size / 2
return " [#{query_count} queries]"
elsif body.is_a?(Array)
# Sometimes it might be an array of requests
return " [#{body.size} queries]"
end
''
end
# Extract bulk operation details
#
# @param body [String, Array, nil]
# @return [String]
def extract_bulk_details(body)
return '' unless body
operations = { index: 0, create: 0, update: 0, delete: 0 }
if body.is_a?(String)
# Bulk body is newline-delimited JSON
body.split("\n").each do |line|
next if line.empty?
begin
doc = JSON.parse(line)
operations[:index] += 1 if doc['index'] || doc[:index]
operations[:create] += 1 if doc['create'] || doc[:create]
operations[:update] += 1 if doc['update'] || doc[:update]
operations[:delete] += 1 if doc['delete'] || doc[:delete]
rescue
# Skip malformed lines
end
end
elsif body.is_a?(Array)
body.each do |item|
next unless item.is_a?(Hash)
operations[:index] += 1 if item['index'] || item[:index]
operations[:create] += 1 if item['create'] || item[:create]
operations[:update] += 1 if item['update'] || item[:update]
operations[:delete] += 1 if item['delete'] || item[:delete]
end
end
ops = []
operations.each { |type, count| ops << "#{count} #{type}" if count > 0 }
ops.empty? ? '' : " [#{ops.join(', ')}]"
end
# Extract delete_by_query details
#
# @param body [Hash, String, nil]
# @return [String]
def extract_delete_by_query_details(body)
return '' unless body
body_hash = parse_body(body)
return '' unless body_hash.is_a?(Hash)
if query = body_hash['query'] || body_hash[:query]
query_desc = describe_query(query)
return " query:#{query_desc}" if query_desc
end
''
end
# Extract response information
#
# @param response [OpenSearch::Transport::Transport::Response]
# @param path [String]
# @return [String]
def extract_response_info(response, path)
return '' unless response && response.body
body = parse_body(response.body)
return '' unless body.is_a?(Hash)
info = []
# Extract took time (search operations)
if took = body['took'] || body[:took]
info << "took:#{took}ms"
end
# Extract hit count (search operations)
if hits = body['hits'] || body[:hits]
total = hits['total'] || hits[:total]
if total.is_a?(Hash)
total = total['value'] || total[:value]
end
info << "#{total} hits" if total
end
# Extract created/updated/deleted counts (bulk/delete_by_query)
if body['created'] || body[:created]
info << "created:#{body['created'] || body[:created]}"
end
if body['updated'] || body[:updated]
info << "updated:#{body['updated'] || body[:updated]}"
end
if body['deleted'] || body[:deleted]
info << "deleted:#{body['deleted'] || body[:deleted]}"
end
# Add response size
size = response_size_bytes(response)
if size > 0
size_str = if size < 1024
"#{size} bytes"
elsif size < 1024 * 1024
"#{(size / 1024.0).round(1)}KB"
else
"#{(size / (1024.0 * 1024.0)).round(1)}MB"
end
info << "[#{size_str}]"
end
info.empty? ? '' : "(#{info.join(', ')})"
rescue => e
Rails.logger.debug { "OpenSearch profiling response info failed: #{e.class}: #{e.message}" }
''
end
# Parse body which could be JSON string or Hash
#
# @param body [String, Hash, nil]
# @return [Hash, nil]
def parse_body(body)
case body
when Hash
body
when String
JSON.parse(body)
else
nil
end
rescue
nil
end
# Calculate response size in bytes
#
# @param response [Object]
# @return [Integer]
def response_size_bytes(response)
if response.respond_to?(:body)
body = response.body
case body
when String
body.bytesize
when Hash
JSON.generate(body).bytesize
else
0
end
else
0
end
rescue
0
end
# Truncate long values for display
#
# @param val [Object]
# @param max [Integer]
# @return [String]
def truncate_value(val, max = 40)
s = val.to_s
return s if s.length <= max
s[0, max - 3] + '...'
end
end
end
# Prepend to both OpenSearch::Client and OpenSearch::Transport::Client
# to ensure we catch all requests regardless of which client type is used
unless ::OpenSearch::Client.ancestors.include?(RackMiniProfilerOpenSearch::Instrumentation)
::OpenSearch::Client.prepend(RackMiniProfilerOpenSearch::Instrumentation)
end
if defined?(::OpenSearch::Transport::Client) &&
!::OpenSearch::Transport::Client.ancestors.include?(RackMiniProfilerOpenSearch::Instrumentation)
::OpenSearch::Transport::Client.prepend(RackMiniProfilerOpenSearch::Instrumentation)
end
Rails.logger.info 'Rack::MiniProfiler OpenSearch profiling enabled'
end
end