Skip to content

Rails Integration for OpenSearch Profiling

Thomas Witt edited this page Aug 13, 2025 · 1 revision

Inspired by the riak integration, I wanted to have better overviews for my non-Activerecord app which uses opensearch about what queries are actually being issued:

# config/initializers/opensearch_profiler.rb

# Instrument OpenSearch::Client calls so they appear in Rack::MiniProfiler's SQL Summary.
# Works with both standard OpenSearch::Client and OpenSearch::Aws::Sigv4Client.

begin
  require 'rack-mini-profiler'
rescue LoadError
  # Rack::MiniProfiler not present; no-op.
end

begin
  require 'opensearch'
rescue LoadError
  # OpenSearch not present; no-op.
end

require 'json'

if defined?(::Rack::MiniProfiler) && defined?(::OpenSearch)
  Rails.application.config.after_initialize do
    # Avoid double-prepend on reload
    module RackMiniProfilerOpenSearch
      module Instrumentation
        # Intercept perform_request to profile OpenSearch operations
        #
        # @param method [String] HTTP method (GET, POST, PUT, DELETE)
        # @param path [String] Request path (e.g., "/_search", "/index/_doc/123")
        # @param params [Hash] Query parameters
        # @param body [Hash, String, nil] Request body
        # @param headers [Hash, nil] Request headers
        # @return [OpenSearch::Transport::Transport::Response]
        def perform_request(method, path, params = {}, body = nil, headers = nil)
          return super unless mp_should_measure?

          # Parse operation type and details from path and body
          operation_label = build_opensearch_label(method, path, params, body)

          # Wrap in a step for the timeline view AND record SQL timing
          ::Rack::MiniProfiler.step("OpenSearch: #{operation_label}") do
            start_mono = Process.clock_gettime(Process::CLOCK_MONOTONIC)

            # Execute the actual OpenSearch request
            response = super

            elapsed_ms = mp_elapsed_ms(start_mono)

            # Extract response metrics
            response_info = extract_response_info(response, path)

            # Build the final label with response metrics
            sql_text = "#{operation_label} #{response_info}".strip

            # Record as SQL timing (shows in SQL Summary)
            begin
              ::Rack::MiniProfiler.record_sql(sql_text, elapsed_ms, nil)
            rescue => e
              Rails.logger.debug { "OpenSearch profiling record_sql failed: #{e.class}: #{e.message}" }
            end

            response
          end
        rescue => e
          # Preserve original exceptions from OpenSearch
          raise e
        end

        private

        # True if MiniProfiler is currently measuring this request.
        #
        # @return [Boolean]
        def mp_should_measure?
          c = ::Rack::MiniProfiler.current
          !!(c && c.measure && c.current_timer)
        end

        # Converts a monotonic start time to elapsed milliseconds.
        #
        # @param start_mono [Float] Monotonic clock reading captured before the operation.
        # @return [Float] Elapsed time in milliseconds with 0.1ms precision.
        def mp_elapsed_ms(start_mono)
          ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_mono) * 1000.0).round(1)
        end

        # Builds a concise, informative label for the OpenSearch operation.
        #
        # @param method [String] HTTP method
        # @param path [String] Request path
        # @param params [Hash] Query parameters
        # @param body [Hash, String, nil] Request body
        # @return [String]
        def build_opensearch_label(method, path, params, body)
          # Extract index name from path
          index_name = extract_index_name(path)

          # Determine operation type
          operation = detect_operation(method, path)

          # Build base label
          label = operation.to_s
          label += " [#{index_name}]" if index_name && index_name != '_all'

          # Add operation-specific details
          case operation
          when :search
            label += extract_search_details(body, params)
          when :multi_search, :msearch
            label += extract_multi_search_details(body)
          when :bulk
            label += extract_bulk_details(body)
          when :index, :create, :update
            doc_id = extract_doc_id(path)
            label += " id:#{truncate_value(doc_id, 20)}" if doc_id
          when :delete_by_query
            label += extract_delete_by_query_details(body)
          end

          label
        rescue => e
          Rails.logger.debug { "OpenSearch profiling label build failed: #{e.class}: #{e.message}" }
          "#{method} #{path}"
        end

        # Extract index name from the request path
        #
        # @param path [String]
        # @return [String, nil]
        def extract_index_name(path)
          # Common patterns:
          # /index_name/_search
          # /index_name/_doc/id
          # /index_name/_bulk
          # /_search (all indices)
          return nil if path.start_with?('/_')

          parts = path.split('/')
          parts[1] if parts[1] && !parts[1].start_with?('_')
        end

        # Detect the operation type from method and path
        #
        # @param method [String]
        # @param path [String]
        # @return [Symbol]
        def detect_operation(method, path)
          case path
          when /_search$/
            :search
          when /_msearch$/
            :multi_search
          when /_bulk$/
            :bulk
          when /_delete_by_query$/
            :delete_by_query
          when /_update$/
            :update
          when /_create$/
            :create
          when /_refresh$/
            :refresh
          when /_count$/
            :count
          else
            case method.upcase
            when 'PUT', 'POST'
              path.include?('/_doc/') || path.include?('/_create/') ? :index : :unknown
            when 'DELETE'
              :delete
            when 'GET'
              path.include?('/_doc/') ? :get : :unknown
            else
              :unknown
            end
          end
        end

        # Extract document ID from path
        #
        # @param path [String]
        # @return [String, nil]
        def extract_doc_id(path)
          # Pattern: /index/_doc/ID or /index/_create/ID
          if match = path.match(%r{/_(?:doc|create)/([^/?]+)})
            match[1]
          end
        end

        # Extract search query details
        #
        # @param body [Hash, String, nil]
        # @param params [Hash]
        # @return [String]
        def extract_search_details(body, params)
          return '' unless body

          body_hash = parse_body(body)
          return '' unless body_hash.is_a?(Hash)

          details = []

          # Extract query details
          if query = body_hash['query'] || body_hash[:query]
            query_desc = describe_query(query)
            details << "query:#{query_desc}" if query_desc
          end

          # Extract aggregations
          if aggs = body_hash['aggs'] || body_hash[:aggs] || body_hash['aggregations'] || body_hash[:aggregations]
            agg_desc = describe_aggregations(aggs)
            details << "aggs:#{agg_desc}" if agg_desc
          end

          # Extract size and from
          size = body_hash['size'] || body_hash[:size]
          from = body_hash['from'] || body_hash[:from]
          details << "size:#{size}" if size
          details << "from:#{from}" if from && from > 0

          details.empty? ? '' : " (#{details.join(', ')})"
        end

        # Describe a query structure concisely
        #
        # @param query [Hash]
        # @return [String, nil]
        def describe_query(query)
          return nil unless query.is_a?(Hash)

          # Handle common query types
          if query['match'] || query[:match]
            field_value = (query['match'] || query[:match]).to_a.first
            return "{match:{#{field_value[0]}:#{truncate_value(field_value[1].to_s, 20)}}}" if field_value
          end

          if query['term'] || query[:term]
            field_value = (query['term'] || query[:term]).to_a.first
            return "{term:{#{field_value[0]}:#{truncate_value(field_value[1].to_s, 20)}}}" if field_value
          end

          if query['bool'] || query[:bool]
            bool_query = query['bool'] || query[:bool]
            clauses = []
            clauses << 'must' if bool_query['must'] || bool_query[:must]
            clauses << 'should' if bool_query['should'] || bool_query[:should]
            clauses << 'filter' if bool_query['filter'] || bool_query[:filter]
            clauses << 'must_not' if bool_query['must_not'] || bool_query[:must_not]
            return "{bool:[#{clauses.join(',')}]}" unless clauses.empty?
          end

          if query['query_string'] || query[:query_string]
            qs = query['query_string'] || query[:query_string]
            q = qs['query'] || qs[:query]
            return "{query_string:#{truncate_value(q.to_s, 30)}}" if q
          end

          if query['range'] || query[:range]
            field = (query['range'] || query[:range]).keys.first
            return "{range:#{field}}" if field
          end

          # For other query types, just show the type
          query_type = query.keys.first
          "{#{query_type}}"
        rescue
          nil
        end

        # Describe aggregations concisely
        #
        # @param aggs [Hash]
        # @return [String, nil]
        def describe_aggregations(aggs)
          return nil unless aggs.is_a?(Hash)

          agg_types = []
          aggs.each do |_name, agg_def|
            next unless agg_def.is_a?(Hash)

            # Find the aggregation type (first key that's not 'aggs' or 'aggregations')
            agg_type = agg_def.keys.find { |k| !['aggs', 'aggregations', :aggs, :aggregations].include?(k) }

            if agg_type
              if agg_type.to_s == 'terms' && (field = agg_def[agg_type]['field'] || agg_def[agg_type][:field])
                agg_types << "terms:#{field}"
              else
                agg_types << agg_type.to_s
              end
            end
          end

          agg_types.empty? ? nil : "{#{agg_types.join(',')}}"
        rescue
          nil
        end

        # Extract multi-search details
        #
        # @param body [String, Array, nil]
        # @return [String]
        def extract_multi_search_details(body)
          return '' unless body

          # Multi-search body is newline-delimited JSON
          if body.is_a?(String)
            lines = body.split("\n").reject(&:empty?)
            # Every pair of lines is a search (header + body)
            query_count = lines.size / 2
            return " [#{query_count} queries]"
          elsif body.is_a?(Array)
            # Sometimes it might be an array of requests
            return " [#{body.size} queries]"
          end

          ''
        end

        # Extract bulk operation details
        #
        # @param body [String, Array, nil]
        # @return [String]
        def extract_bulk_details(body)
          return '' unless body

          operations = { index: 0, create: 0, update: 0, delete: 0 }

          if body.is_a?(String)
            # Bulk body is newline-delimited JSON
            body.split("\n").each do |line|
              next if line.empty?

              begin
                doc = JSON.parse(line)
                operations[:index] += 1 if doc['index'] || doc[:index]
                operations[:create] += 1 if doc['create'] || doc[:create]
                operations[:update] += 1 if doc['update'] || doc[:update]
                operations[:delete] += 1 if doc['delete'] || doc[:delete]
              rescue
                # Skip malformed lines
              end
            end
          elsif body.is_a?(Array)
            body.each do |item|
              next unless item.is_a?(Hash)
              operations[:index] += 1 if item['index'] || item[:index]
              operations[:create] += 1 if item['create'] || item[:create]
              operations[:update] += 1 if item['update'] || item[:update]
              operations[:delete] += 1 if item['delete'] || item[:delete]
            end
          end

          ops = []
          operations.each { |type, count| ops << "#{count} #{type}" if count > 0 }
          ops.empty? ? '' : " [#{ops.join(', ')}]"
        end

        # Extract delete_by_query details
        #
        # @param body [Hash, String, nil]
        # @return [String]
        def extract_delete_by_query_details(body)
          return '' unless body

          body_hash = parse_body(body)
          return '' unless body_hash.is_a?(Hash)

          if query = body_hash['query'] || body_hash[:query]
            query_desc = describe_query(query)
            return " query:#{query_desc}" if query_desc
          end

          ''
        end

        # Extract response information
        #
        # @param response [OpenSearch::Transport::Transport::Response]
        # @param path [String]
        # @return [String]
        def extract_response_info(response, path)
          return '' unless response && response.body

          body = parse_body(response.body)
          return '' unless body.is_a?(Hash)

          info = []

          # Extract took time (search operations)
          if took = body['took'] || body[:took]
            info << "took:#{took}ms"
          end

          # Extract hit count (search operations)
          if hits = body['hits'] || body[:hits]
            total = hits['total'] || hits[:total]
            if total.is_a?(Hash)
              total = total['value'] || total[:value]
            end
            info << "#{total} hits" if total
          end

          # Extract created/updated/deleted counts (bulk/delete_by_query)
          if body['created'] || body[:created]
            info << "created:#{body['created'] || body[:created]}"
          end
          if body['updated'] || body[:updated]
            info << "updated:#{body['updated'] || body[:updated]}"
          end
          if body['deleted'] || body[:deleted]
            info << "deleted:#{body['deleted'] || body[:deleted]}"
          end

          # Add response size
          size = response_size_bytes(response)
          if size > 0
            size_str = if size < 1024
                "#{size} bytes"
            elsif size < 1024 * 1024
                "#{(size / 1024.0).round(1)}KB"
            else
                "#{(size / (1024.0 * 1024.0)).round(1)}MB"
            end
            info << "[#{size_str}]"
          end

          info.empty? ? '' : "(#{info.join(', ')})"
        rescue => e
          Rails.logger.debug { "OpenSearch profiling response info failed: #{e.class}: #{e.message}" }
          ''
        end

        # Parse body which could be JSON string or Hash
        #
        # @param body [String, Hash, nil]
        # @return [Hash, nil]
        def parse_body(body)
          case body
          when Hash
            body
          when String
            JSON.parse(body)
          else
            nil
          end
        rescue
          nil
        end

        # Calculate response size in bytes
        #
        # @param response [Object]
        # @return [Integer]
        def response_size_bytes(response)
          if response.respond_to?(:body)
            body = response.body
            case body
            when String
              body.bytesize
            when Hash
              JSON.generate(body).bytesize
            else
              0
            end
          else
            0
          end
        rescue
          0
        end

        # Truncate long values for display
        #
        # @param val [Object]
        # @param max [Integer]
        # @return [String]
        def truncate_value(val, max = 40)
          s = val.to_s
          return s if s.length <= max
          s[0, max - 3] + '...'
        end
      end
    end

    # Prepend to both OpenSearch::Client and OpenSearch::Transport::Client
    # to ensure we catch all requests regardless of which client type is used
    unless ::OpenSearch::Client.ancestors.include?(RackMiniProfilerOpenSearch::Instrumentation)
      ::OpenSearch::Client.prepend(RackMiniProfilerOpenSearch::Instrumentation)
    end

    if defined?(::OpenSearch::Transport::Client) &&
       !::OpenSearch::Transport::Client.ancestors.include?(RackMiniProfilerOpenSearch::Instrumentation)
      ::OpenSearch::Transport::Client.prepend(RackMiniProfilerOpenSearch::Instrumentation)
    end

    Rails.logger.info 'Rack::MiniProfiler OpenSearch profiling enabled'
  end
end

Clone this wiki locally