3636 dw ,
3737 pw ,
3838 rw ,
39+ search_queries ,
40+ query_step_interval ,
41+ start_time ,
3942 keylist_length ,
4043 preloaded_keys ,
4144 timeout_general ,
@@ -79,6 +82,8 @@ new(Id) ->
7982 RW = basho_bench_config :get (riakc_pb_rw , Replies ),
8083 PW = basho_bench_config :get (riakc_pb_pw , Replies ),
8184 PR = basho_bench_config :get (riakc_pb_pr , Replies ),
85+ SearchQs = basho_bench_config :get (riakc_pb_search_queries , []),
86+ SearchQStepIval = basho_bench_config :get (query_step_interval , 60 ),
8287 Bucket = basho_bench_config :get (riakc_pb_bucket , <<" test" >>),
8388 KeylistLength = basho_bench_config :get (riakc_pb_keylist_length , 1000 ),
8489 PreloadedKeys = basho_bench_config :get (
@@ -99,6 +104,9 @@ new(Id) ->
99104 dw = DW ,
100105 rw = RW ,
101106 pw = PW ,
107+ search_queries = SearchQs ,
108+ query_step_interval = SearchQStepIval ,
109+ start_time = erlang :now (),
102110 keylist_length = KeylistLength ,
103111 preloaded_keys = PreloadedKeys ,
104112 timeout_general = get_timeout_general (),
@@ -230,6 +238,34 @@ run(listkeys, _KeyGen, _ValueGen, State) ->
230238 {error , Reason } ->
231239 {error , Reason , State }
232240 end ;
241+ run (search , _KeyGen , _ValueGen , # state {search_queries = SearchQs }= State ) ->
242+ [{Index , Query , Options }|_ ] = SearchQs ,
243+
244+ NewState = State # state {search_queries = roll_list (SearchQs )},
245+
246+ case riakc_pb_socket :search (NewState # state .pid , Index , Query , Options , NewState # state .timeout_read ) of
247+ {ok , _Results } ->
248+ {ok , NewState };
249+ {error , Reason } ->
250+ {error , Reason , NewState }
251+ end ;
252+ run (search_interval , _KeyGen , _ValueGen , # state {search_queries = SearchQs , start_time = StartTime , query_step_interval = Interval }= State ) ->
253+ [{Index , Query , Options }|_ ] = SearchQs ,
254+
255+ Now = erlang :now (),
256+ case timer :now_diff (Now , StartTime ) of
257+ _MicroSec when _MicroSec > (Interval * 1000000 ) ->
258+ NewState = State # state {search_queries = roll_list (SearchQs ),start_time = Now };
259+ _MicroSec ->
260+ NewState = State
261+ end ,
262+
263+ case riakc_pb_socket :search (NewState # state .pid , Index , Query , Options , NewState # state .timeout_read ) of
264+ {ok , _Results } ->
265+ {ok , NewState };
266+ {error , Reason } ->
267+ {error , Reason , NewState }
268+ end ;
233269run (mr_bucket_erlang , _KeyGen , _ValueGen , State ) ->
234270 mapred (State , State # state .bucket , ? ERLANG_MR );
235271run (mr_bucket_js , _KeyGen , _ValueGen , State ) ->
@@ -336,6 +372,9 @@ make_keylist(Bucket, KeyGen, Count) ->
336372 [{Bucket , list_to_binary (KeyGen ())}
337373 |make_keylist (Bucket , KeyGen , Count - 1 )].
338374
375+ roll_list (List ) ->
376+ [lists :last (List ) | lists :sublist (List , length (List ) - 1 )].
377+
339378mapred_valgen (_Id , MaxRand ) ->
340379 fun () ->
341380 list_to_binary (integer_to_list (random :uniform (MaxRand )))
0 commit comments