diff --git a/src/common/util/messages.ml b/src/common/util/messages.ml index 7fe00a6708..6f49e215bd 100644 --- a/src/common/util/messages.ml +++ b/src/common/util/messages.ml @@ -8,6 +8,8 @@ module Category = MessageCategory open GobResult.Syntax +module Format = BatFormat + module Severity = struct @@ -195,10 +197,14 @@ let print ?(ppf= !formatter) (m: Message.t) = | Debug -> "white" (* non-bright white is actually some gray *) | Success -> "green" in - let pp_prefix = Format.dprintf "@{<%s>[%a]%a@}" severity_stag Severity.pp m.severity Tags.pp m.tags in + let pp_print_option ?(none = fun _ () -> ()) pp_v ppf = function + | None -> none ppf () + | Some v -> pp_v ppf v + in + let pp_prefix = (fun ppf -> Format.fprintf ppf "@{<%s>[%a]%a@}" severity_stag Severity.pp m.severity Tags.pp m.tags) in let pp_loc ppf = Format.fprintf ppf " @{(%a)@}" CilType.Location.pp in let pp_loc ppf loc = - Format.fprintf ppf "%a" (Format.pp_print_option pp_loc) (Option.map Location.to_cil loc) + Format.fprintf ppf "%a" (pp_print_option pp_loc) (Option.map Location.to_cil loc) in let pp_piece ppf piece = Format.fprintf ppf "@{<%s>%s@}%a" severity_stag (Piece.text_with_context piece) pp_loc piece.loc @@ -229,7 +235,7 @@ let print ?(ppf= !formatter) (m: Message.t) = let pp_quote ppf loc = if get_bool "warn.quote-code" then ( let pp_cut_quote ppf = Format.fprintf ppf "@,@[%a@,@]" pp_quote in - (Format.pp_print_option pp_cut_quote) ppf (Option.map Location.to_cil loc) + (pp_print_option pp_cut_quote) ppf (Option.map Location.to_cil loc) ) in let pp_piece ppf piece = Format.fprintf ppf "%a%a" pp_piece piece pp_quote piece.loc in diff --git a/src/solver/dune b/src/solver/dune index 805a6e060a..9a3a8ccc1d 100644 --- a/src/solver/dune +++ b/src/solver/dune @@ -5,6 +5,7 @@ (public_name goblint.solver) (libraries batteries.unthreaded + saturn goblint_std goblint_parallel goblint_logs diff --git a/src/solver/parallelStats.bak.ml b/src/solver/parallelStats.bak.ml new file mode 100644 index 0000000000..15a16f84ac --- /dev/null +++ b/src/solver/parallelStats.bak.ml @@ -0,0 +1,228 @@ +open Batteries +open GobConfig +open Goblint_constraint.ConstrSys + +module ParallelSolverStats (S:EqConstrSys) (HM:Hashtbl.S with type key = S.v) = +struct + open S + open Messages + + let cas_success = Atomic.make 0 + let cas_fail = Atomic.make 0 + + let cas_succsess_event () = Atomic.incr cas_success + let cas_fail_event () = Atomic.incr cas_fail + + (* (* This allows us to use a simple array to store thread specifics statistics *) *) + (* let maximal_number_of_threads = 10000 *) + (**) + (**) + (* let stack_d = Atomic.make 0 *) + (* let vars = Atomic.make 0 *) + (* let vars_by_thread = Array.init maximal_number_of_threads (fun _ -> Atomic.make 0) *) + (* let queries = Atomic.make 0 *) + (* let queries_by_thread = Array.init maximal_number_of_threads (fun _ -> Atomic.make 0) *) + (* let evals = Atomic.make 0 *) + (* let evals_by_thread = Array.init maximal_number_of_threads (fun _ -> Atomic.make 0) *) + (* let instant_returns = Atomic.make 0 *) + (* let thread_starts = Atomic.make 0 *) + (* let thread_creates = Atomic.make 0 *) + (* let thread_start_times = Array.init maximal_number_of_threads (fun _ -> 0.0) *) + (* let thread_end_times = Array.init maximal_number_of_threads (fun _ -> 0.0) *) + (**) + (* let active_threads = Atomic.make 0 *) + (* let first_thread_activation_time = Atomic.make 0.0 *) + (* let last_thread_activation_update_time = Atomic.make 0.0 *) + (* let total_thread_activation_time = Atomic.make 0.0 *) + (**) + (* let iterations = Atomic.make 0 *) + (**) + (* let updates = Atomic.make 0 *) + (* let updates_by_thread = Array.init maximal_number_of_threads (fun _ -> Atomic.make 0) *) + (**) + (* let full_trace = false *) + (* let start_c = 0 *) + (* let max_c : int ref = ref (-1) *) + (* let max_var : Var.t option ref = ref None *) + (**) + (* let histo = HM.create 1024 *) + (* let increase (v:Var.t) = *) + (* let set v c = *) + (* if not full_trace && (c > start_c && c > !max_c && not (GobOption.exists (Var.equal v) !max_var)) then begin *) + (* if tracing then trace "sol" "Switched tracing to %a" Var.pretty_trace v; *) + (* max_c := c; *) + (* max_var := Some v *) + (* end *) + (* in *) + (* try let c = HM.find histo v in *) + (* set v (c+1); *) + (* HM.replace histo v (c+1) *) + (* with Not_found -> begin *) + (* set v 1; *) + (* HM.add histo v 1 *) + (* end *) + (**) + (* let start_event () = () *) + (* let stop_event () = () *) + (**) + (* let new_var_event thread_id x = *) + (* Atomic.incr vars; *) + (* Atomic.incr vars_by_thread.(thread_id); *) + (* if tracing then trace "sol" "New %a" Var.pretty_trace x *) + (**) + (* let get_var_event x = *) + (* Atomic.incr queries; *) + (* if tracing && full_trace then trace "sol" "Querying %a" Var.pretty_trace x *) + (**) + (* let eval_rhs_event thread_id x = *) + (* if tracing && full_trace then trace "sol" "(Re-)evaluating %a" Var.pretty_trace x; *) + (* Atomic.incr evals; *) + (* Atomic.incr evals_by_thread.(thread_id); *) + (* if (get_bool "dbg.solver-progress") then (Atomic.incr stack_d; Logs.debug "%d" @@ Atomic.get stack_d) *) + (**) + (* let update_var_event thread_id x o n = *) + (* Atomic.incr updates; *) + (* Atomic.incr updates_by_thread.(thread_id); *) + (* if tracing then increase x; *) + (* if full_trace || (not (Dom.is_bot o) && GobOption.exists (Var.equal x) !max_var) then begin *) + (* if tracing then tracei "sol_max" "(%d) Update to %a" !max_c Var.pretty_trace x; *) + (* if tracing then traceu "sol_max" "%a" Dom.pretty_diff (n, o) *) + (* end *) + (**) + (* let instant_return_event () = *) + (* Atomic.incr instant_returns *) + (**) + (* let create_task_event thread_id = *) + (* Atomic.incr thread_creates *) + (**) + (* let rec thread_starts_solve_event thread_id = *) + (* Atomic.incr thread_starts; *) + (* let t = Unix.gettimeofday () in *) + (* thread_start_times.(thread_id) <- t *) + (**) + (* let rec thread_ends_solve_event thread_id = *) + (* let t = Unix.gettimeofday () in *) + (* thread_end_times.(thread_id) <- t *) + (**) + (* (* solvers can assign this to print solver specific statistics using their data structures *) *) + (* let print_solver_stats = ref (fun () -> ()) *) + (**) + (* (* this can be used in print_solver_stats *) *) + (* let ncontexts = ref 0 *) + (* let print_context_stats rho = *) + (* let histo = Hashtbl.create 13 in (* histogram: node id -> number of contexts *) *) + (* let str k = GobPretty.sprint S.Var.pretty_trace k in (* use string as key since k may have cycles which lead to exception *) *) + (* let is_fun k = match S.Var.node k with FunctionEntry _ -> true | _ -> false in (* only count function entries since other nodes in function will have leq number of contexts *) *) + (* HM.iter (fun k _ -> if is_fun k then Hashtbl.modify_def 0 (str k) ((+)1) histo) rho; *) + (* (* let max_k, n = Hashtbl.fold (fun k v (k',v') -> if v > v' then k,v else k',v') histo (Obj.magic (), 0) in *) *) + (* (* Logs.debug "max #contexts: %d for %s" n max_k; *) *) + (* ncontexts := Hashtbl.fold (fun _ -> (+)) histo 0; *) + (* let topn = 5 in *) + (* Logs.debug "Found %d contexts for %d functions. Top %d functions:" !ncontexts (Hashtbl.length histo) topn; *) + (* Hashtbl.to_list histo *) + (* |> List.sort (fun (_,n1) (_,n2) -> compare n2 n1) *) + (* |> List.take topn *) + (* |> List.iter @@ fun (k,n) -> Logs.debug "%d\tcontexts for %s" n k *) + (**) + (* let stats_csv = *) + (* let save_run_str = GobConfig.get_string "save_run" in *) + (* if save_run_str <> "" then ( *) + (* let save_run = Fpath.v save_run_str in *) + (* GobSys.mkdir_or_exists save_run; *) + (* Fpath.(to_string (save_run / "solver_stats.csv")) |> open_out |> Option.some *) + (* ) else None *) + (* let write_csv xs oc = output_string oc @@ String.concat ",\t" xs ^ "\n" *) + (**) + (* (* print generic and specific stats *) *) + (* let print_stats _ = *) + (* Logs.newline (); *) + (* (* print_endline "# Generic solver stats"; *) *) + (* Logs.info "runtime: %s" (GobSys.string_of_time ()); *) + (* Logs.info "vars: %d, evals: %d" (Atomic.get vars) (Atomic.get evals); *) + (* Logs.info "vars: %d" (Atomic.get vars); *) + (* Logs.info "queries: %d" (Atomic.get queries); *) + (**) + (* (* let threads_data = Seq.zip (Array.to_seq vars_by_thread) (Array.to_seq evals_by_thread) in *) *) + (* (* let threads_data = Seq.zip threads_data (Array.to_seq updates_by_thread) in *) *) + (* let threads_with_update = ref 0 in *) + (* let threads_with_anything = ref 0 in *) + (* (* The following is an important statistics, here left out for simplicity *) *) + (* (* Seq.iteri (fun i ((vars, evals), updates) -> *) *) + (* (* if (Atomic.get vars) <> 0 || (Atomic.get evals) > 0 || (Atomic.get updates) <> 0 then *) *) + (* (* begin *) *) + (* (* Logs.info "Thread %d: vars: %d, evals: %d, updates: %d" i (Atomic.get vars) (Atomic.get evals) (Atomic.get updates); *) *) + (* (* threads_with_anything := !threads_with_anything + 1; *) *) + (* (* end; *) *) + (* (* if (Atomic.get updates) <> 0 then threads_with_update := !threads_with_update + 1 *) *) + (* (* ) threads_data; *) *) + (* Logs.info "Threads with updates: %d" !threads_with_update; *) + (* Logs.info "Threads with anything: %d" !threads_with_anything; *) + (* Logs.info "Threads returned instantly: %d" (Atomic.get instant_returns); *) + (* Logs.info "Threads started: %d" (Atomic.get thread_starts); *) + (* Logs.info "Threads created: %d" (Atomic.get thread_creates); *) + (* Logs.info "Threads active: %d" (Atomic.get active_threads); *) + (**) + (* let non_zero_start_times = Array.filter (fun t -> t <> 0.0) thread_start_times in *) + (* let first_thread_start = Array.fold_left min infinity non_zero_start_times in *) + (* let current_time = Unix.gettimeofday () in *) + (* let total_runtime = Seq.zip (Array.to_seq thread_start_times) (Array.to_seq thread_end_times) *) + (* |> Seq.filter (fun (start, end_) -> start <> 0.0) *) + (* |> Seq.map (fun (start, end_) -> if end_ = 0.0 then (start, current_time) else (start, end_)) *) + (* |> Seq.fold_left (fun acc (start, end_) -> acc +. (end_ -. start)) 0.0 in *) + (* let walltime = current_time -. first_thread_start in *) + (* Logs.info "Total runtime: %f" total_runtime; *) + (* Logs.info "Walltime: %f" walltime; *) + (* let average_active_threads = total_runtime /. walltime in *) + (* Logs.info "Average active threads: %f" average_active_threads; *) + (**) + (**) + (**) + (**) + (* (* Logs.info "vars: %d" (Atomic.get vars); *) *) + (**) + (* (* Array.iteri (fun i v -> if Atomic.get v <> 0 then Logs.info "vars (%d): %d" i (Atomic.get v)) vars_by_thread; *) *) + (* (**) *) + (* (* Logs.info "evals: %d" (Atomic.get evals); *) *) + (* (* Array.iteri (fun i v -> if Atomic.get v <> 0 then Logs.info " evals (%d): %d" i (Atomic.get v)) evals_by_thread; *) *) + (* (**) *) + (* (* Logs.info "updates: %d" (Atomic.get updates); *) *) + (* (* jrray.iteri (fun i v -> Logs.info " updates (%d): %d" i (Atomic.get v)) updates_by_thread; *) *) + (**) + (* (* let first_thread_start = Atomic.get first_thread_activation_time in *) *) + (* (* let last_registered = Atomic.get last_thread_activation_update_time in *) *) + (* (* let average_thread_activation_time = Atomic.get total_thread_activation_time /. (last_registered -. first_thread_start) in *) *) + (* (* let total_time = last_registered -. first_thread_start in *) *) + (* (* Logs.info "average nr_threads: %f" (average_thread_activation_time); *) *) + (* (* Logs.info "Evaluations per CPU-second: %f" ((float_of_int @@ Atomic.get evals) /. average_thread_activation_time /. total_time); *) *) + (* (* Logs.info "Updates per CPU-second: %f" ((float_of_int @@ Atomic.get updates) /. average_thread_activation_time /. total_time); *) *) + (* (**) *) + (* (* Option.may (fun v -> ignore @@ Logs.info "max updates: %d for var %a" !max_c Var.pretty_trace v) !max_var; *) *) + (* (* Logs.newline (); *) *) + (* (* (* print_endline "# Solver specific stats"; *) *) *) + (* (* !print_solver_stats (); *) *) + (* (* Logs.newline (); *) *) + (* (* (* Timing.print (M.get_out "timing" Legacy.stdout) "Timings:\n"; *) *) *) + (* (* (* Gc.print_stat stdout; (* too verbose, slow and words instead of MB *) *) *) *) + (* (* let gc = GobGc.print_quick_stat Legacy.stderr in *) *) + (* (* Logs.newline (); *) *) + (* (* Option.may (write_csv [GobSys.string_of_time (); string_of_int !SolverStats.vars; string_of_int !SolverStats.evals; string_of_int !ncontexts; string_of_int gc.Gc.top_heap_words]) stats_csv *) *) + (**) + (* () *) + (* (* print_string "Do you want to continue? [Y/n]"; *) *) + (* (* flush stdout *) *) + (* (* if read_line () = "n" then raise Break *) *) + (**) + (* let () = *) + (* let write_header = write_csv ["runtime"; "vars"; "evals"; "contexts"; "max_heap"] (* TODO @ !solver_stats_headers *) in *) + (* Option.may write_header stats_csv; *) + (* (* call print_stats on dbg.solver-signal *) *) + (* Sys.set_signal (GobSys.signal_of_string (get_string "dbg.solver-signal")) (Signal_handle print_stats); *) + (* (* call print_stats every dbg.solver-stats-interval *) *) + (* Sys.set_signal Sys.sigvtalrm (Signal_handle print_stats); *) + (* (* https://ocaml.org/api/Unix.html#TYPEinterval_timer ITIMER_VIRTUAL is user time; sends sigvtalarm; ITIMER_PROF/sigprof is already used in Timeout.Unix.timeout *) *) + (* let ssi = get_int "dbg.solver-stats-interval" in *) + (* if ssi > 0 then *) + (* let it = float_of_int ssi in *) + (* ignore Unix.(setitimer ITIMER_VIRTUAL { it_interval = it; it_value = it }); *) +end + diff --git a/src/solver/parallelStats.ml b/src/solver/parallelStats.ml new file mode 100644 index 0000000000..ce9c56f334 --- /dev/null +++ b/src/solver/parallelStats.ml @@ -0,0 +1,40 @@ +open Batteries +open GobConfig +open Goblint_constraint.ConstrSys + +module ParallelSolverStats = +struct + open Messages + + let cas_success = Atomic.make 0 + let cas_fail = Atomic.make 0 + let nr_iterations = Atomic.make 0 + + let start_time = ref 0. + let end_time = ref 0. + + let solver_start_event () = + start_time := Unix.gettimeofday (); + Atomic.set cas_success 0; + Atomic.set cas_fail 0 + + let solver_end_event () = + end_time := Unix.gettimeofday () + + let start_iterate_event job_id = + Atomic.incr nr_iterations + + let cas_success_event () = Atomic.incr cas_success + let cas_fail_event () = Atomic.incr cas_fail + + let print_stats () = + Logs.info "Cas success: %d" (Atomic.get cas_success); + Logs.info "Cas fail: %d" (Atomic.get cas_fail); + + let duration = !end_time -. !start_time in + Logs.info "Solver duration: %.2f" duration; + + Logs.info "Iterations: %d" (Atomic.get nr_iterations); + +end + diff --git a/src/solver/postSolver.ml b/src/solver/postSolver.ml index 449e4e5bdd..ec0fce918f 100644 --- a/src/solver/postSolver.ml +++ b/src/solver/postSolver.ml @@ -349,3 +349,21 @@ module DemandEqIncrSolverFromEqSolver (Sol: GenericEqSolver): DemandEqIncrSolver Post.post xs vs vh; (vh, ()) end + + +module DemandEqIncrSolverFromDemandEqSolver (Sol: DemandEqSolver): DemandEqIncrSolver = + functor (Arg: IncrSolverArg) (S: DemandEqConstrSys) (VH: Hashtbl.S with type key = S.v) -> + struct + module EqSys = EqConstrSysFromDemandConstrSys (S) + module Sol = Sol (S) (VH) + module Post = MakeList (ListArgFromStdArg (EqSys) (VH) (Arg)) + + type marshal = unit + let copy_marshal () = () + let relift_marshal () = () + + let solve xs vs _ = + let vh = Sol.solve xs vs in + Post.post xs vs vh; + (vh, ()) + end diff --git a/src/solver/td_parallel_base.ml b/src/solver/td_parallel_base.ml new file mode 100644 index 0000000000..cd67de5ca6 --- /dev/null +++ b/src/solver/td_parallel_base.ml @@ -0,0 +1,403 @@ +(** Terminating, parallelized top-down solver with side effects. ([td_parallel_base]). *) + +(** Top-down solver that is parallelised with fine-grain-locked shared data + * + * The solver consists of multiple threads, that operate on the same data. + * The solvers starts with a single thread, and starts a new one at every `create` call it encounters. + * Create nodes are created by the analysis. For the purposes of this solver, they can be placed anywhere, + * however the solver benefits from having those at points where the analysis branches into mostly + * disjunt parts, such as thread creation in the analysed program. + * The starting points of the threads are memorized. If such a point is destabilized after thread + * termination, the thread is restarted. +*) +(* Options: + * - solvers.td3.parallel_domains (default: 0 - automatic selection): Maximal number of Domains that the solver can use in parallel. +*) + +open Batteries +open Goblint_constraint.ConstrSys +open Goblint_constraint.SolverTypes +open Goblint_parallel +open Saturn +open Messages + + +module Base : DemandEqSolver = + functor (S: DemandEqConstrSys) -> + functor (HM:Hashtbl.S with type key = S.v) -> + struct + open SolverBox.Warrow (S.Dom) + module VS = Set.Make (S.Var) + + (* TODO Introduce this module *) + open ParallelStats.ParallelSolverStats + + exception CasFailException + + (* Same as [Atomic.compare_and_set] but raises an exception on failure. *) + let cas r seen v = + if (Atomic.compare_and_set r seen v) then ( + cas_success_event (); + ) else ( + cas_fail_event (); + raise CasFailException + ) + (* if not (Atomic.compare_and_set r seen v) then raise CasFailException *) + + (** State for each unkown and a default factory. *) + module DefaultState = struct + type t = { + value: S.Dom.t; + infl: (S.Var.t, unit) Htbl.t; (** Unknowns influenced this unknown *) + wpoint: bool; (** Unkown is a widening point *) + stable: bool; (** Unknown is stable, i.e. its value is known unless its dependencies change *) + called: bool; (** Unknown is currently being solved by any thread *) + top_level: bool; (** Unknown is the starting point of a solver thread *) + } + let default () = { + value = S.Dom.bot (); + infl = Htbl.create ~hashed_type:(module S.Var) (); + called = false; + stable = false; + wpoint = false; + top_level = false; + } + let show s = + Printf.sprintf "{value: %s; infl: %d; wpoint: %b; stable: %b; called: %b; top_level: %b}" + (S.Dom.show s.value) (Htbl.length s.infl) s.wpoint s.stable s.called s.top_level + end + + (** Concurrency safe hashmap for the state of the unknowns. *) + module CM = Data.ConcurrentHashmap (S.Var) (DefaultState) (HM) + (* We need to keep track of this to avoid queueing multiple jobs for the same unknown. *) + let unknowns_with_running_jobs = Htbl.create () + let job_id_counter = (Atomic.make 1) + + let solve st vs = + solver_start_event (); + let nr_domains = GobConfig.get_int "solvers.td3.parallel_domains" in + let nr_domains = if nr_domains = 0 then (Domain.recommended_domain_count ()) else nr_domains in + + (* The argument of threadpool.create is the number of additional domains, hence -1 *) + (* This comes from domainslib *) + let pool = Threadpool.create (nr_domains-1) in + + let data = CM.create () + in + + (** Initialize or get the state for an unknown. + @param x The unknown to get the state for. + @param thread_id The id of the thread that is initializing the unknown. + @return true if the unknown was created, false otherwise. + *) + let init x = + let value, was_created = CM.find_create data x in + value + in + + (** Get the right-hand-side for an unknown. + @param x The unknown to get the rhs for. + @param get Function to return values for unknowns. + @param set Function to set values for unknowns. + @param create Function to handle create nodes/ initialize new solver threads. + @return The rhs for the unknown. + *) + let eq x get set create = + if tracing then trace "eq" "eq %a" S.Var.pretty_trace x; + match S.system x with + | None -> S.Dom.bot () + | Some f -> f get set create + in + + (** Check if the unknown is a global. + @param x The unknown to check. + @return true if the unknown is a global, false otherwise. + *) + let is_global x = S.system x = None in + + (** destabilizes vars from outer_w and their infl recursively. + If a variable was the root of a solver thread, a new thread is started for the variable. + @param prom The promises list to add new threads to. + @param outer_w The set of variables to destabilize. + *) + let rec destabilize prom outer_w = + let rec destab_single y = + try ( + let y_atom = CM.find data y in + let y_state = Atomic.get y_atom in + if not y_state.stable then ( + () + ) else if y_state.called then ( + (* If y is called, we do not need to destabilize, as it will happen after the value change anyway. *) + cas y_atom y_state {y_state with stable = false}; + if tracing then trace "destab" "stable remove %a (top_level:%b, called:%b)" S.Var.pretty_trace y y_state.top_level y_state.called; + ) else ( + let inner_w = y_state.infl in + cas y_atom y_state {y_state with stable = false}; + if tracing then trace "destab" "stable remove %a (top_level:%b, called:%b)" S.Var.pretty_trace y y_state.top_level y_state.called; + if y_state.top_level then create_task prom y; + destabilize prom inner_w + )) with CasFailException -> destab_single y + in + Htbl.remove_all outer_w |> Seq.map fst |> + Seq.iter destab_single + + (** Creates a task to solve for y + @param outer_prom The promises list to add the new task to. + @param y The variable to solve for. + *) + and create_task outer_prom y = + let work_fun () = + let job_id = Atomic.fetch_and_add job_id_counter 1 in + let y_atom = init y in + let s = Atomic.get y_atom in + if s.called then ( + (* instant_return_event () *) + ) else ( + let success = Atomic.compare_and_set y_atom s {s with called = true; stable = true; top_level = true} in + if success then ( + if tracing then trace "thread_pool" "starting task %d to iterate %a" job_id S.Var.pretty_trace y; + (* thread_starts_solve_event job_id; *) + let inner_prom = ref [] in + iterate None inner_prom y job_id y_atom; + (* This is safe to ignore, as it can only be removed by someone else. *) + (* For it to be added by another thread, it needs to be removed first *) + Htbl.try_remove unknowns_with_running_jobs y |> ignore; + (* thread_ends_solve_event job_id; *) + Threadpool.await_all pool (!inner_prom) + ) + (* Nothing to do if CAS fails, another thread is already working on this variable. *) + ) + in + if not (Htbl.mem unknowns_with_running_jobs y) then ( + if tracing then trace "create" "create_task %a" S.Var.pretty_trace y; + (* Here a failure can be ignored, as it is very rare and + a missing entry causes recalculation, not a unsound result. *) + Htbl.try_set unknowns_with_running_jobs y () |> ignore; + outer_prom := Threadpool.add_work pool work_fun :: (!outer_prom) + ) + + (** Iterates to solve for x (invoked from query to orig if present) + @param orig The variable whose query led to the iteration of x. + @param prom The promises list to add new tasks to. + @param x The variable to solve for. + @param job_id The id of the thread that is solving for x. + @param x_atom The atomic reference to the state of x, to prevent unnecessary lookups. + *) + and iterate orig prom x job_id x_atom = (* ~(inner) solve in td3*) + + (** Get the value for y, triggering an iteration if necessary, and performing a lookup otherwise. + @param x The unknown whose query led to the query for y, so that the infl of y can be updated. + @param y The unknown to get the value for. + @return The value of y. + *) + let rec query x y = (* ~eval in td3 *) + (* Query with atomics: if anything is changed, query is repeated and the initial call *) + (* has no side effects. Thus, imitating that the query just happend in a later point in time.*) + try ( + let y_atom = init y in + (* get_var_event y; *) + let y_state = Atomic.get y_atom in + if tracing then trace "query" "%d entering query for %a; stable %b; called %b" job_id S.Var.pretty_trace y y_state.stable y_state.called; + ignore @@ Htbl.try_add y_state.infl x (); + + if y_state.called then ( + if tracing then trace "infl" "add_infl %a %a" S.Var.pretty_trace y S.Var.pretty_trace x; + cas y_atom y_state {y_state with wpoint=true}; + y_state.value + ) + else if y_state.stable then ( + (Atomic.get y_atom).value + ) else ( + if is_global y then ( + if tracing then trace "infl" "add_infl %a %a" S.Var.pretty_trace y S.Var.pretty_trace x; + cas y_atom y_state {y_state with stable = true}; + y_state.value + ) else ( + if tracing then trace "infl" "add_infl %a %a" S.Var.pretty_trace y S.Var.pretty_trace x; + cas y_atom y_state {y_state with stable = true; called=true}; + if tracing then trace "called" "called %a" S.Var.pretty_trace y; + iterate (Some x) prom y job_id y_atom; + (Atomic.get y_atom).value + ) + ) ) with CasFailException -> query x y + in + + (** Apply a side effect to y + @param x The variable that caused the side effect. + @param y The variable to side-effect. + @param d The value to side-effect y with. + *) + let rec side x y d = + assert (is_global y); + try ( + let y_atom = init y in + let s = Atomic.get y_atom in + if tracing then trace "side" "%d side to %a from %a" job_id S.Var.pretty_trace y S.Var.pretty_trace x; + if tracing then trace "side-v" "%d side to %a (wpx: %b) from %a ## value: %a" job_id S.Var.pretty_trace y s.wpoint S.Var.pretty_trace x S.Dom.pretty d; + let old = s.value in + if S.Dom.leq d old then ( + () + ) else ( + let widen a b = + if tracing then trace "sidew" "%d side widen %a" job_id S.Var.pretty_trace y; + S.Dom.widen a (S.Dom.join a b) + in + if tracing then trace "update" "%d side update %a with \n\t%a" job_id S.Var.pretty_trace x S.Dom.pretty (widen old d); + let w = s.infl in + let new_s = {s with value = (widen old d); stable = true} in + cas y_atom s new_s; + if tracing then trace "destab" "destabilize %a" S.Var.pretty_trace y; + (* update_var_event job_id y old (new_s.value); *) + destabilize prom w + )) with CasFailException -> side x y d + in + + (** Handle create nodes + @param x The variable that caused the create node. + @param y The variable to create a solver task for. + *) + let create x y = (* create called from x on y *) + if tracing then trace "create" "create from td_parallel_base was executed from %a on %a" S.Var.pretty_trace x S.Var.pretty_trace y; + create_task prom y + in + + (* begining of iteration to update the value for x *) + start_iterate_event job_id; + assert (not @@ is_global x); + let x_state = Atomic.get x_atom in + + if tracing then trace "iter" "%d iterate %a, stable: %b, wpoint: %b" job_id S.Var.pretty_trace x x_state.stable x_state.wpoint; + let x_is_widening_point = x_state.wpoint in (* if x becomes a wpoint during eq, checking this will delay widening until next iterate *) + (* eval_rhs_event job_id x; *) + let value_from_rhs = eq x (query x) (side x) (create x) in + let x_state = Atomic.get x_atom in + let old_value = x_state.value in + let new_value = (* value after box operator (if wp: widening) *) + if not x_is_widening_point then + value_from_rhs + else (if tracing then trace "wpoint" "box widening %a" S.Var.pretty_trace x; box old_value value_from_rhs) + in + + if S.Dom.equal new_value old_value then ( + if x_state.stable then ( + (match orig with + | Some z -> ignore @@ Htbl.try_add x_state.infl z () + | None -> ()); + let x_state_new = {x_state with called = false; wpoint = false} in + try (cas x_atom x_state x_state_new; + if tracing then trace "called" "uncalled (A) %a" S.Var.pretty_trace x; + ) with CasFailException -> (iterate[@tailcall]) orig prom x job_id x_atom; + ) else ( + let x_state_new = {x_state with stable = true} in + (* No need to track cas success, as we will iterate again anyway. *) + ignore @@ Atomic.compare_and_set x_atom x_state x_state_new; + if tracing then trace "iter" "iterate still unstable %a" S.Var.pretty_trace x; + (iterate[@tailcall]) orig prom x job_id x_atom + ) + ) else ( + (* value has changed *) + if tracing then trace "update" "%d iterate update %a with \n\t%a" job_id S.Var.pretty_trace x S.Dom.pretty new_value; + let x_state_new = {x_state with value = new_value} in + try ( + cas x_atom x_state x_state_new; + if tracing then trace "destab" "destabilize %a" S.Var.pretty_trace x; + (* update_var_event job_id x old_value new_value; *) + destabilize prom x_state.infl; + + let rec finalize () = + let x_state = Atomic.get x_atom in + + if x_state.stable then ( + (match orig with + | Some z -> ignore @@ Htbl.try_add x_state.infl z () + | None -> ()); + let new_s = {x_state with called = false} in + try (cas x_atom x_state new_s; + if tracing then trace "called" "uncalled (B) %a" S.Var.pretty_trace x; + ) + with CasFailException -> (finalize[@tailcall]) () + ) else ( + let new_s = {x_state with stable = true} in + (* Here we cannot use the exception, because the handling would *) + (* break the tail-recursion in the call to iterate *) + let success = Atomic.compare_and_set x_atom x_state new_s in + if success then ( + if tracing then trace "iter" "iterate changed %a" S.Var.pretty_trace x; + (iterate[@tailcall]) orig prom x job_id x_atom + ) else (finalize[@tailcall]) () + ) in + finalize (); + ) with CasFailException -> (iterate[@tailcall]) orig prom x job_id x_atom; + ) in + + let set_start (x,d) = + let x_atom = init x in + let s = Atomic.get x_atom in + Atomic.set x_atom {s with value = d; stable = true} + in + + (* beginning of main solve *) + + (* start_event (); *) + List.iter set_start st; + + List.iter (fun x -> ignore @@ init x ) vs; + (* If we have multiple start variables vs, we might solve v1, then while solving v2 we + side some global which v1 depends on with a new value. + Then v1 is no longer stable and we have to solve it again. *) + let i = ref 0 in + let rec solver () = + incr i; + let unstable_vs = List.filter (fun v -> not (Atomic.get @@ CM.find data v).stable) vs in + if unstable_vs <> [] then ( + if Logs.Level.should_log Debug then ( + if !i = 1 then Logs.newline (); + Logs.debug "Unstable solver start vars in %d. phase:" !i; + List.iter (fun v -> Logs.debug "\t%a" S.Var.pretty_trace v) unstable_vs; + Logs.newline (); + flush_all (); + ); + List.iter (fun x -> + if tracing then trace "multivar" "solving for %a" S.Var.pretty_trace x; + Threadpool.run pool (fun () -> + let promises = ref [] in + create_task promises x; + Threadpool.await_all pool (!promises) + ); + ) unstable_vs; + solver (); + ) + in + solver (); + Threadpool.finished_with pool; + solver_end_event (); + + (* After termination, only those variables are stable which are + * - reachable from any of the queried variables vs, or + * - effected by side-effects and have no constraints on their own (this should be the case for all of our analyses). *) + (* print_stats (); *) + (* stop_event (); *) + + let data_ht = CM.to_hashtbl data in + let wpoint = HM.map (fun _ (s: DefaultState.t) -> s.wpoint) data_ht in + + if GobConfig.get_bool "dbg.print_wpoints" then ( + Logs.newline (); + Logs.debug "Widening points:"; + HM.iter (fun k wp -> if wp then Logs.debug "%a" S.Var.pretty_trace k) wpoint; + Logs.newline (); + ); + + print_stats (); + (* TODO reenable *) + (* if GobConfig.get_bool "dbg.timing.enabled" then LHM.print_stats data; *) + + let solution = HM.map (fun _ (s: DefaultState.t) -> s.value) data_ht in + Logs.info "Solver finished with %d unknowns." (HM.length solution); + Logs.info "Number of jobs: %d" (Atomic.get job_id_counter); + solution + end + +let () = + Selector.add_solver ("td_parallel_base", (module PostSolver.DemandEqIncrSolverFromDemandEqSolver (Base))) diff --git a/src/util/parallel/data.ml b/src/util/parallel/data.ml index b409c94490..bd44766bec 100644 --- a/src/util/parallel/data.ml +++ b/src/util/parallel/data.ml @@ -1,4 +1,5 @@ open Batteries +open Saturn module type DefaultType = sig type t @@ -69,7 +70,7 @@ end (* This is a custom implementation, because we leave out operations that we do not need to enable a more efficient implementation. *) -module ConcurrentHashmap = +module OwnConcurrentHashmap = functor (H: Hashtbl.HashedType) -> functor (D: DefaultType) -> functor (HM:Hashtbl.S with type key = H.t) -> @@ -191,3 +192,39 @@ module ConcurrentHashmap = Seq.iter (fun (k, v) -> HM.add ht k (Atomic.get v)) seq; ht end + + +module SaturnConcurrentHashmap (H: Hashtbl.HashedType) (D: DefaultType) (HM:Hashtbl.S with type key = H.t) = struct + type t = (H.t, D.t Atomic.t) Htbl.t + type key = H.t + type value = D.t Atomic.t + + let create () = Htbl.create ~hashed_type:(module H) () + + let to_seq = Htbl.to_seq + let to_list hm = to_seq hm |> List.of_seq + + let find_option = Htbl.find_opt + let find = Htbl.find_exn + let mem = Htbl.mem + + let find_create (hm : t) (key : H.t) = + let found_val = Htbl.find_opt hm key in + match found_val with + | Some found_val -> (found_val, false) + | None -> begin + let new_val = Atomic.make @@ D.default () in + let added = Htbl.try_add hm key new_val in + if added then (new_val, true) else (Htbl.find_exn hm key, false) + end + + let to_hashtbl hm = + let ht = HM.create 10 in + let seq = to_seq hm in + Seq.iter (fun (k, v) -> HM.add ht k (Atomic.get v)) seq; + ht + +end + + +module ConcurrentHashmap = OwnConcurrentHashmap diff --git a/src/util/parallel/data.mli b/src/util/parallel/data.mli index 84430f6b3c..6d95c17da8 100644 --- a/src/util/parallel/data.mli +++ b/src/util/parallel/data.mli @@ -21,7 +21,7 @@ sig val to_list : t -> (key * value) list val to_seq : t -> (key * value) Seq.t - val to_seq_values : t -> value Seq.t + (* val to_seq_values : t -> value Seq.t *) val to_hashtbl : t -> D.t HM.t val find_option : t -> key -> value option