Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .ocamlformat
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version = 0.26.1
version = 0.27.0
profile = conventional

ocaml-version = 4.08
ocaml-version = 5.3
break-infix = fit-or-vertical
parse-docstrings = true
indicate-multiline-delimiters = no
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Changed

- Use atomics in stats (#403, @clecat)
- Lock files are now opened with O_CLOEXEC flag (#394, @vect0r-vicall)
- Update to cmdliner.1.1.0 (#382, @MisterDA)
- Mirage support: optional dependency to unix (#396, @art-w)
Expand Down
11 changes: 11 additions & 0 deletions bench.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM ocaml/opam:debian-ocaml-5.3
RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam
RUN mkdir bench-dir && chown opam:opam bench-dir
WORKDIR bench-dir
RUN sudo chown opam .
COPY *.opam ./
RUN opam remote add origin https://github.com/ocaml/opam-repository.git && opam update
RUN opam pin -yn --with-version=dev .
RUN opam install -y --deps-only --with-test .
COPY . ./
ENTRYPOINT opam exec -- make bench
15 changes: 8 additions & 7 deletions bench/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ module Benchmark = struct
Write amplification in syscalls: %f@,\
Write amplification in bytes: %f@,\
Last 10 merges cumulated duration (μs): %f@\n\
Number of merges : %d@]" Mtime.Span.pp result.time result.ops_per_sec
result.mbs_per_sec result.read_amplification_calls
result.read_amplification_size result.write_amplification_calls
result.write_amplification_size result.merges_duration result.nb_merges
Number of merges : %d@]"
Mtime.Span.pp result.time result.ops_per_sec result.mbs_per_sec
result.read_amplification_calls result.read_amplification_size
result.write_amplification_calls result.write_amplification_size
result.merges_duration result.nb_merges
end

let make_bindings_pool nb_entries =
Expand Down Expand Up @@ -382,9 +383,9 @@ let pp_config fmt config =
Log size: %d@,\
Seed: %d@,\
Metrics: %b@,\
Sampling interval: %d@]" config.key_size config.value_size
config.nb_entries config.log_size config.seed config.with_metrics
config.sampling_interval
Sampling interval: %d@]"
config.key_size config.value_size config.nb_entries config.log_size
config.seed config.with_metrics config.sampling_interval

let cleanup root =
let files = [ "data"; "log"; "lock"; "log_async"; "merge" ] in
Expand Down
7 changes: 3 additions & 4 deletions bench/replay.ml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ module Encoding = struct
end

let decoded_seq_of_encoded_chan_with_prefixes :
'a. 'a Repr.ty -> in_channel -> 'a Seq.t =
'a. 'a Repr.ty -> in_channel -> 'a Seq.t =
fun repr channel ->
let decode_bin = Repr.decode_bin repr |> Repr.unstage in
let decode_prefix = Repr.(decode_bin int32 |> unstage) in
Expand Down Expand Up @@ -171,9 +171,8 @@ end
let hash_of_string = Repr.of_string Encoding.Hash.t

module Bench_suite
(Store : S
with type key = Encoding.Hash.t
and type value = Int63.t * int * char) =
(Store :
S with type key = Encoding.Hash.t and type value = Int63.t * int * char) =
struct
let key_to_hash k =
match hash_of_string k with
Expand Down
2 changes: 1 addition & 1 deletion index-bench.opam
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license: "MIT"
homepage: "https://github.com/mirage/index"
bug-reports: "https://github.com/mirage/index/issues"
depends: [
"ocaml" {>= "4.03.0"}
"ocaml" {>= "5.0.0"}
"cmdliner" {>= "1.1.0"}
"dune" {>= "2.7.0"}
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion index.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ build: [
]

depends: [
"ocaml" {>= "4.08.0"}
"ocaml" {>= "5.0.0"}
"dune" {>= "2.7.0"}
"optint" {>= "0.1.0"}
"repr" {>= "0.6.0"}
Expand Down
3 changes: 2 additions & 1 deletion src/data.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ module type Key = sig
(** The equality function for keys. *)

val hash : t -> int
(** Note: Unevenly distributed hash functions may result in performance drops. *)
(** Note: Unevenly distributed hash functions may result in performance drops.
*)

val hash_size : int
(** The number of bits necessary to encode the maximum output value of
Expand Down
21 changes: 13 additions & 8 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ exception RO_not_allowed
(** Raised whenever a read-only instance performs a write action. *)

exception RW_not_allowed
(** Raised whenever a read-write instance performs a reserved read-only action. *)
(** Raised whenever a read-write instance performs a reserved read-only action.
*)

exception Closed
(** Raised whenever a closed instance is used. *)
Expand Down Expand Up @@ -167,15 +168,17 @@ struct
(** The log_async file contains bindings added concurrently to a [merge]
operation. It is only present when a merge is ongoing. *)
mutable open_instances : int;
(** The number of open instances that are shared through the [Cache.t]. *)
(** The number of open instances that are shared through the [Cache.t].
*)
mutable lru : Lru.t;
writer_lock : IO.Lock.t option;
(** A lock that prevents multiple RW instances to be open at the same
time. *)
sync_lock : Semaphore.t;
(** A lock that prevents multiple [sync] to happen at the same time. *)
merge_lock : Semaphore.t;
(** A lock that prevents multiple [merges] to happen at the same time. *)
(** A lock that prevents multiple [merges] to happen at the same time.
*)
rename_lock : Semaphore.t;
(** A lock used to protect a critical bit when finalizing a [merge]
operation. All operations should be guarded by this lock. *)
Expand Down Expand Up @@ -303,7 +306,8 @@ struct
if
(* the generation has changed *)
h.generation > Int63.succ old_generation
|| (* the last sync was done between clear(log) and clear(log_async) *)
||
(* the last sync was done between clear(log) and clear(log_async) *)
(h.generation = Int63.succ old_generation && h.offset = Int63.zero)
then (
(* close the file .*)
Expand All @@ -324,9 +328,9 @@ struct
"[%s] log_async IO header monotonicity violated during sync:@,\
\ offset: %a -> %a@,\
\ generation: %a -> %a@,\
Reloading the log to compensate." (Filename.basename t.root)
Int63.pp old_offset Int63.pp h.offset Int63.pp old_generation
Int63.pp h.generation);
Reloading the log to compensate."
(Filename.basename t.root) Int63.pp old_offset Int63.pp h.offset
Int63.pp old_generation Int63.pp h.generation);
Log_file.reload log)

(** Syncs the [index] of the instance by checking on-disk changes. *)
Expand Down Expand Up @@ -446,7 +450,8 @@ struct
Search.interpolation_search (IOArray.v index.io) key ~low ~high

(** Finds the value associated to [key] in [t]. In order, checks in
[log_async] (in memory), then [log] (in memory), then [index] (on disk). *)
[log_async] (in memory), then [log] (in memory), then [index] (on disk).
*)
let find_instance t key =
let find_if_exists ~name ~find db =
match db with
Expand Down
8 changes: 4 additions & 4 deletions src/index_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ module type S = sig
(including both automatic flushing and explicit calls to {!flush} or
{!close}).

This can be used to ensure certain pre-conditions are met before
bindings are persisted to disk. (For instance, if the index bindings are
pointers into another data-structure [d], it may be necessary to flush
[d] first to avoid creating dangling pointers.)
This can be used to ensure certain pre-conditions are met before bindings
are persisted to disk. (For instance, if the index bindings are pointers
into another data-structure [d], it may be necessary to flush [d] first to
avoid creating dangling pointers.)
@param cache used for instance sharing.
@param fresh whether an existing index should be overwritten.
@param read_only whether read-only mode is enabled for this index.
Expand Down
138 changes: 87 additions & 51 deletions src/stats.ml
Original file line number Diff line number Diff line change
@@ -1,62 +1,65 @@
open! Import

type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
mutable nb_merge : int;
mutable merge_durations : float list;
mutable nb_replace : int;
mutable replace_durations : float list;
mutable nb_sync : int;
mutable time_sync : float;
mutable lru_hits : int;
mutable lru_misses : int;
type _t = {
bytes_read : int Atomic.t;
nb_reads : int Atomic.t;
bytes_written : int Atomic.t;
nb_writes : int Atomic.t;
nb_merge : int Atomic.t;
merge_durations : float list Atomic.t;
nb_replace : int Atomic.t;
replace_durations : float list Atomic.t;
nb_sync : int Atomic.t;
time_sync : float Atomic.t;
lru_hits : int Atomic.t;
lru_misses : int Atomic.t;
}

let fresh_stats () =
{
bytes_read = 0;
nb_reads = 0;
bytes_written = 0;
nb_writes = 0;
nb_merge = 0;
merge_durations = [];
nb_replace = 0;
replace_durations = [];
nb_sync = 0;
time_sync = 0.0;
lru_hits = 0;
lru_misses = 0;
bytes_read = Atomic.make 0;
nb_reads = Atomic.make 0;
bytes_written = Atomic.make 0;
nb_writes = Atomic.make 0;
nb_merge = Atomic.make 0;
merge_durations = Atomic.make [];
nb_replace = Atomic.make 0;
replace_durations = Atomic.make [];
nb_sync = Atomic.make 0;
time_sync = Atomic.make 0.0;
lru_hits = Atomic.make 0;
lru_misses = Atomic.make 0;
}

let stats = fresh_stats ()
let get () = stats

let reset_stats () =
stats.bytes_read <- 0;
stats.nb_reads <- 0;
stats.bytes_written <- 0;
stats.nb_writes <- 0;
stats.nb_merge <- 0;
stats.merge_durations <- [];
stats.nb_replace <- 0;
stats.replace_durations <- [];
stats.nb_sync <- 0;
stats.time_sync <- 0.0;
stats.lru_hits <- 0;
stats.lru_misses <- 0

let incr_bytes_read n = stats.bytes_read <- stats.bytes_read + n
let incr_bytes_written n = stats.bytes_written <- stats.bytes_written + n
let incr_nb_reads () = stats.nb_reads <- succ stats.nb_reads
let incr_nb_writes () = stats.nb_writes <- succ stats.nb_writes
let incr_nb_merge () = stats.nb_merge <- succ stats.nb_merge
let incr_nb_replace () = stats.nb_replace <- succ stats.nb_replace
let incr_nb_sync () = stats.nb_sync <- succ stats.nb_sync
let incr_nb_lru_hits () = stats.lru_hits <- succ stats.lru_hits
let incr_nb_lru_misses () = stats.lru_misses <- succ stats.lru_misses
Atomic.set stats.bytes_read 0;
Atomic.set stats.nb_reads 0;
Atomic.set stats.bytes_written 0;
Atomic.set stats.nb_writes 0;
Atomic.set stats.nb_merge 0;
Atomic.set stats.merge_durations [];
Atomic.set stats.nb_replace 0;
Atomic.set stats.replace_durations [];
Atomic.set stats.nb_sync 0;
Atomic.set stats.time_sync 0.0;
Atomic.set stats.lru_hits 0;
Atomic.set stats.lru_misses 0

let incr_bytes_read n =
Atomic.set stats.bytes_read (Atomic.get stats.bytes_read + n)

let incr_bytes_written n =
Atomic.set stats.bytes_written (Atomic.get stats.bytes_written + n)

let incr_nb_reads () = Atomic.incr stats.nb_reads
let incr_nb_writes () = Atomic.incr stats.nb_writes
let incr_nb_merge () = Atomic.incr stats.nb_merge
let incr_nb_replace () = Atomic.incr stats.nb_replace
let incr_nb_sync () = Atomic.incr stats.nb_sync
let incr_nb_lru_hits () = Atomic.incr stats.lru_hits
let incr_nb_lru_misses () = Atomic.incr stats.lru_misses

let add_read n =
incr_bytes_read n;
Expand All @@ -66,6 +69,37 @@ let add_write n =
incr_bytes_written n;
incr_nb_writes ()

type t = {
bytes_read : int;
nb_reads : int;
bytes_written : int;
nb_writes : int;
nb_merge : int;
merge_durations : float list;
nb_replace : int;
replace_durations : float list;
nb_sync : int;
time_sync : float;
lru_hits : int;
lru_misses : int;
}

let get () =
{
bytes_read = Atomic.get stats.bytes_read;
nb_reads = Atomic.get stats.nb_reads;
bytes_written = Atomic.get stats.bytes_written;
nb_writes = Atomic.get stats.nb_writes;
nb_merge = Atomic.get stats.nb_merge;
merge_durations = Atomic.get stats.merge_durations;
nb_replace = Atomic.get stats.nb_replace;
replace_durations = Atomic.get stats.replace_durations;
nb_sync = Atomic.get stats.nb_sync;
time_sync = Atomic.get stats.time_sync;
lru_hits = Atomic.get stats.lru_hits;
lru_misses = Atomic.get stats.lru_misses;
}

module Make (Clock : Platform.CLOCK) = struct
let replace_timer = ref (Clock.counter ())
let nb_replace = ref 0
Expand All @@ -78,19 +112,21 @@ module Make (Clock : Platform.CLOCK) = struct
if !nb_replace = sampling_interval then (
let span = Clock.count !replace_timer in
let average = Mtime.span_to_us span /. float_of_int !nb_replace in
stats.replace_durations <- average :: stats.replace_durations;
Atomic.set stats.replace_durations
(average :: Atomic.get stats.replace_durations);
replace_timer := Clock.counter ();
nb_replace := 0)

let sync_with_timer f =
let timer = Clock.counter () in
f ();
let span = Clock.count timer in
stats.time_sync <- Mtime.span_to_us span
Atomic.set stats.time_sync (Mtime.span_to_us span)

let drop_head l = if List.length l >= 10 then List.tl l else l

let add_merge_duration span =
let span = Mtime.span_to_us span in
stats.merge_durations <- drop_head stats.merge_durations @ [ span ]
Atomic.set stats.merge_durations
(drop_head (Atomic.get stats.merge_durations) @ [ span ])
end
24 changes: 12 additions & 12 deletions src/stats.mli
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
open! Import

type t = {
mutable bytes_read : int;
mutable nb_reads : int;
mutable bytes_written : int;
mutable nb_writes : int;
mutable nb_merge : int;
mutable merge_durations : float list;
mutable nb_replace : int;
mutable replace_durations : float list;
mutable nb_sync : int;
mutable time_sync : float;
mutable lru_hits : int;
mutable lru_misses : int;
bytes_read : int;
nb_reads : int;
bytes_written : int;
nb_writes : int;
nb_merge : int;
merge_durations : float list;
nb_replace : int;
replace_durations : float list;
nb_sync : int;
time_sync : float;
lru_hits : int;
lru_misses : int;
}
(** The type for stats for an index I.

Expand Down
Loading