Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- Changed the implementation of the write-ahead log to significantly reduce its
memory usage (at the cost of some additional disk IO). (#355)

- Separate the IO stats recorded in the Raw module, from the index ones. (#353)

# 1.4.1 (2021-07-16)

## Fixed
Expand Down
83 changes: 47 additions & 36 deletions bench/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ let src =
in
let open Stats in
let tags = Tags.[] in
let data t =
let data (t, bytes_read, bytes_written) =
Data.v
[
int "bytes_read" t.bytes_read;
int "bytes_written" t.bytes_written;
int "bytes_read" bytes_read;
int "bytes_written" bytes_written;
int "merge" t.nb_merge;
int "replace" t.nb_replace;
head "replace_durations" t.replace_durations;
Expand Down Expand Up @@ -58,12 +58,6 @@ module Context = struct
end
end

let with_stats f =
Stats.reset_stats ();
let _, duration = Common.with_timer f in
let stats = Stats.get () in
(duration, stats)

module Mtime = struct
include Mtime

Expand All @@ -75,7 +69,7 @@ module Mtime = struct
end
end

module Benchmark = struct
module Make_benchmark (Io_stats : Index.Platform.IO_STATS) = struct
type result = {
time : Mtime.Span.t;
ops_per_sec : float;
Expand All @@ -90,21 +84,27 @@ module Benchmark = struct
}
[@@deriving to_yojson]

let with_stats f =
Stats.reset ();
Io_stats.reset_all ();
let _, duration = Common.with_timer f in
let stats = Stats.get () in
let raw = Io_stats.get () in
(duration, stats, raw)

let run ~nb_entries f =
let time, stats = with_stats (fun () -> f ()) in
let time, stats, (raw : Io_stats.t) = with_stats (fun () -> f ()) in
let time_sec = Mtime.Span.to_s time in
let nb_entriesf = float_of_int nb_entries in
let entry_sizef = float_of_int entry_size in
let read_amplification_size =
float_of_int stats.bytes_read /. (entry_sizef *. nb_entriesf)
float_of_int raw.bytes_read /. (entry_sizef *. nb_entriesf)
in
let read_amplification_calls = float_of_int stats.nb_reads /. nb_entriesf in
let read_amplification_calls = float_of_int raw.nb_reads /. nb_entriesf in
let write_amplification_size =
float_of_int stats.bytes_written /. (entry_sizef *. nb_entriesf)
in
let write_amplification_calls =
float_of_int stats.nb_writes /. nb_entriesf
float_of_int raw.bytes_written /. (entry_sizef *. nb_entriesf)
in
let write_amplification_calls = float_of_int raw.nb_writes /. nb_entriesf in
let ops_per_sec = nb_entriesf /. time_sec in
let mbs_per_sec = entry_sizef *. nb_entriesf /. 1_048_576. /. time_sec in
let replace_durations = stats.replace_durations in
Expand Down Expand Up @@ -151,13 +151,18 @@ let sorted_bindings_pool = ref [||]

module Index_lib = Index

module Index = struct
module Index =
Index_unix.Private.Make (Context.Key) (Context.Value) (Index.Cache.Noop)
module Index =
Index_unix.Private.Make (Context.Key) (Context.Value) (Index.Cache.Noop)

module Io_stats = Index.Io_stats
module Benchmark = Make_benchmark (Io_stats)

module Index_bench = struct
let add_metrics =
let no_tags x = x in
fun () -> Metrics.add src no_tags (fun m -> m (Stats.get ()))
fun () ->
Metrics.add src no_tags (fun m ->
m (Stats.get (), Io_stats.bytes_read (), Io_stats.bytes_written ()))

let write ~with_metrics ?(with_flush = false) ?sampling_interval bindings rw =
Array.iter
Expand Down Expand Up @@ -336,23 +341,27 @@ module Index = struct
end

let list_benches () =
let pp_bench ppf b = Fmt.pf ppf "%s\t-- %s" b.Index.name b.synopsis in
Index.suite |> Fmt.(pr "%a" (list ~sep:Fmt.(const string "\n") pp_bench))
let pp_bench ppf b = Fmt.pf ppf "%s\t-- %s" b.Index_bench.name b.synopsis in
Index_bench.suite
|> Fmt.(pr "%a" (list ~sep:Fmt.(const string "\n") pp_bench))

let schedule p s =
let todos = List.map fst in
let init = ref (s |> List.map (fun b -> (p b.Index.name, b))) in
let init = ref (s |> List.map (fun b -> (p b.Index_bench.name, b))) in
let apply_dep s =
let deps =
s
|> List.fold_left
(fun acc (todo, b) ->
if todo then
match b.Index.dependency with Some s -> s :: acc | None -> acc
match b.Index_bench.dependency with
| Some s -> s :: acc
| None -> acc
else acc)
[]
in
s |> List.map (fun (todo, b) -> (todo || List.mem b.Index.name deps, b))
s
|> List.map (fun (todo, b) -> (todo || List.mem b.Index_bench.name deps, b))
in
let next = ref (apply_dep !init) in
while todos !init <> todos !next do
Expand Down Expand Up @@ -389,14 +398,14 @@ let pp_config fmt config =
let cleanup root =
let files = [ "data"; "log"; "lock"; "log_async"; "merge" ] in
List.iter
(fun (b : Index.suite_elt) ->
(fun (b : Index_bench.suite_elt) ->
let dir = root // b.name // "index" in
List.iter
(fun file ->
let file = dir // file in
if Sys.file_exists file then Unix.unlink file)
files)
Index.suite
Index_bench.suite

let init config =
Printexc.record_backtrace true;
Expand All @@ -413,8 +422,8 @@ let init config =

let print fmt (config, results) =
let pp_bench fmt (b, result) =
Format.fprintf fmt "@[<v 4>%s@,%a@]" b.Index.synopsis Benchmark.pp_result
result
Format.fprintf fmt "@[<v 4>%s@,%a@]" b.Index_bench.synopsis
Benchmark.pp_result result
in
Format.fprintf fmt "@[<v 4>Configuration:@,%a@,@]@,@[<v 4>Results:@,%a@]@."
pp_config config
Expand All @@ -433,7 +442,7 @@ let print_json fmt (config, results) =
(fun (b, result) ->
`Assoc
[
("name", `String b.Index.name);
("name", `String b.Index_bench.name);
("metrics", Benchmark.result_to_yojson result);
])
results) );
Expand All @@ -443,8 +452,10 @@ let print_json fmt (config, results) =

let get_suite_list minimal_flag =
if minimal_flag then
List.filter (fun bench -> bench.Index.speed = `Quick) Index.suite
else Index.suite
List.filter
(fun bench -> bench.Index_bench.speed = `Quick)
Index_bench.suite
else Index_bench.suite

let repeat n f l =
let rec aux i acc = if i = n then acc else aux (i + 1) (f l :: acc) in
Expand All @@ -460,7 +471,7 @@ let mean l =
(fun acc bresult ->
List.fold_left2
(fun acc (tsm, resultm) (ts, result) ->
assert (Index.(tsm.name = ts.name));
assert (Index_bench.(tsm.name = ts.name));
( tsm,
Benchmark.
{
Expand Down Expand Up @@ -534,12 +545,12 @@ let run filter root output seed with_metrics log_size nb_entries nb_exec json
current_suite
|> schedule name_filter
|> repeat nb_exec
(List.map (fun (b : Index.suite_elt) ->
(List.map (fun (b : Index_bench.suite_elt) ->
let name =
match b.dependency with None -> b.name | Some name -> name
in
let result =
Index.run ~with_metrics ~nb_entries ~log_size ~root ~name
Index_bench.run ~with_metrics ~nb_entries ~log_size ~root ~name
~fresh:b.fresh ~readonly:b.readonly b.benchmark
in
(b, result)))
Expand Down
2 changes: 2 additions & 0 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct
include Stats
end

module Io_stats = Platform.Io_stats

module IO = struct
include Io.Extend (IO)

Expand Down
5 changes: 5 additions & 0 deletions src/index_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ module type S = sig
include Checks.S
(** @inline *)
end

module Io_stats : sig
include Platform.IO_STATS
(** @inline *)
end
end

module Private_types = struct
Expand Down
30 changes: 30 additions & 0 deletions src/platform.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,39 @@ module type THREAD = sig
(** Re-schedule the calling thread without suspending it. *)
end

module type RAW_STATS = sig
(** Stats for IO per file descriptor. *)

type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
}

val fresh_stats : unit -> t
val reset : t -> unit
end

module type IO_STATS = sig
(** Accumulated stats for all IO handlers in Index. *)

include RAW_STATS

val get : unit -> t
val get_by_file : string -> t
val get_all : unit -> (string * t) list
val reset_all : unit -> unit
val bytes_read : unit -> int
val bytes_written : unit -> int
val nb_reads : unit -> int
val nb_writes : unit -> int
end

module type S = sig
module IO : IO
module Semaphore : SEMAPHORE
module Thread : THREAD
module Clock : CLOCK
module Io_stats : IO_STATS
end
67 changes: 42 additions & 25 deletions src/stats.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
open! Import

type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
mutable nb_merge : int;
mutable merge_durations : float list;
mutable nb_replace : int;
Expand All @@ -15,10 +11,6 @@ type t = {

let fresh_stats () =
{
bytes_read = 0;
nb_reads = 0;
bytes_written = 0;
nb_writes = 0;
nb_merge = 0;
merge_durations = [];
nb_replace = 0;
Expand All @@ -30,34 +22,18 @@ let fresh_stats () =
let stats = fresh_stats ()
let get () = stats

let reset_stats () =
stats.bytes_read <- 0;
stats.nb_reads <- 0;
stats.bytes_written <- 0;
stats.nb_writes <- 0;
let reset () =
stats.nb_merge <- 0;
stats.merge_durations <- [];
stats.nb_replace <- 0;
stats.replace_durations <- [];
stats.nb_sync <- 0;
stats.time_sync <- 0.0

let incr_bytes_read n = stats.bytes_read <- stats.bytes_read + n
let incr_bytes_written n = stats.bytes_written <- stats.bytes_written + n
let incr_nb_reads () = stats.nb_reads <- succ stats.nb_reads
let incr_nb_writes () = stats.nb_writes <- succ stats.nb_writes
let incr_nb_merge () = stats.nb_merge <- succ stats.nb_merge
let incr_nb_replace () = stats.nb_replace <- succ stats.nb_replace
let incr_nb_sync () = stats.nb_sync <- succ stats.nb_sync

let add_read n =
incr_bytes_read n;
incr_nb_reads ()

let add_write n =
incr_bytes_written n;
incr_nb_writes ()

module Make (Clock : Platform.CLOCK) = struct
let replace_timer = ref (Clock.counter ())
let nb_replace = ref 0
Expand Down Expand Up @@ -86,3 +62,44 @@ module Make (Clock : Platform.CLOCK) = struct
let span = Mtime.Span.to_us span in
stats.merge_durations <- drop_head stats.merge_durations @ [ span ]
end

module Io_stats (R : Platform.RAW_STATS) = struct
include R

let tbl : (string, t) Hashtbl.t = Hashtbl.create 13

let get_by_file file =
try Hashtbl.find tbl file
with Not_found ->
let stats = R.fresh_stats () in
Hashtbl.add tbl file stats;
stats

let get_all () =
Hashtbl.fold (fun file stats acc -> (file, stats) :: acc) tbl []

let bytes_read () =
Hashtbl.fold (fun _file stats acc -> stats.R.bytes_read + acc) tbl 0

let bytes_written () =
Hashtbl.fold (fun _file stats acc -> stats.R.bytes_written + acc) tbl 0

let nb_reads () =
Hashtbl.fold (fun _file stats acc -> stats.R.nb_reads + acc) tbl 0

let nb_writes () =
Hashtbl.fold (fun _file stats acc -> stats.R.nb_writes + acc) tbl 0

let get () =
let acc = R.fresh_stats () in
Hashtbl.iter
(fun _file stats ->
acc.bytes_read <- stats.bytes_read + acc.bytes_read;
acc.R.nb_reads <- stats.R.nb_reads + acc.R.nb_reads;
acc.R.bytes_written <- stats.R.bytes_written + acc.R.bytes_written;
acc.R.nb_writes <- stats.R.nb_writes + acc.R.nb_writes)
tbl;
acc

let reset_all () = Hashtbl.iter (fun _file stats -> R.reset stats) tbl
end
Loading