diff --git a/.ocamlformat b/.ocamlformat index 1749d9a7..4cc35758 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,7 +1,7 @@ -version = 0.26.1 +version = 0.27.0 profile = conventional -ocaml-version = 4.08 +ocaml-version = 5.3 break-infix = fit-or-vertical parse-docstrings = true indicate-multiline-delimiters = no diff --git a/CHANGES.md b/CHANGES.md index 8dd8636a..f4a268ca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Changed +- Use atomics in stats (#403, @clecat) - Lock files are now opened with O_CLOEXEC flag (#394, @vect0r-vicall) - Update to cmdliner.1.1.0 (#382, @MisterDA) - Mirage support: optional dependency to unix (#396, @art-w) diff --git a/bench.Dockerfile b/bench.Dockerfile new file mode 100644 index 00000000..4e7284cb --- /dev/null +++ b/bench.Dockerfile @@ -0,0 +1,11 @@ +FROM ocaml/opam:debian-ocaml-5.3 +RUN sudo ln -sf /usr/bin/opam-2.1 /usr/bin/opam +RUN mkdir bench-dir && chown opam:opam bench-dir +WORKDIR bench-dir +RUN sudo chown opam . +COPY *.opam ./ +RUN opam remote add origin https://github.com/ocaml/opam-repository.git && opam update +RUN opam pin -yn --with-version=dev . +RUN opam install -y --deps-only --with-test . +COPY . ./ +ENTRYPOINT opam exec -- make bench diff --git a/bench/bench.ml b/bench/bench.ml index 6f16f4e3..5bac911c 100644 --- a/bench/bench.ml +++ b/bench/bench.ml @@ -133,10 +133,11 @@ module Benchmark = struct Write amplification in syscalls: %f@,\ Write amplification in bytes: %f@,\ Last 10 merges cumulated duration (μs): %f@\n\ - Number of merges : %d@]" Mtime.Span.pp result.time result.ops_per_sec - result.mbs_per_sec result.read_amplification_calls - result.read_amplification_size result.write_amplification_calls - result.write_amplification_size result.merges_duration result.nb_merges + Number of merges : %d@]" + Mtime.Span.pp result.time result.ops_per_sec result.mbs_per_sec + result.read_amplification_calls result.read_amplification_size + result.write_amplification_calls result.write_amplification_size + result.merges_duration result.nb_merges end let make_bindings_pool nb_entries = @@ -382,9 +383,9 @@ let pp_config fmt config = Log size: %d@,\ Seed: %d@,\ Metrics: %b@,\ - Sampling interval: %d@]" config.key_size config.value_size - config.nb_entries config.log_size config.seed config.with_metrics - config.sampling_interval + Sampling interval: %d@]" + config.key_size config.value_size config.nb_entries config.log_size + config.seed config.with_metrics config.sampling_interval let cleanup root = let files = [ "data"; "log"; "lock"; "log_async"; "merge" ] in diff --git a/bench/replay.ml b/bench/replay.ml index 86feb426..76ee1add 100644 --- a/bench/replay.ml +++ b/bench/replay.ml @@ -88,7 +88,7 @@ module Encoding = struct end let decoded_seq_of_encoded_chan_with_prefixes : - 'a. 'a Repr.ty -> in_channel -> 'a Seq.t = + 'a. 'a Repr.ty -> in_channel -> 'a Seq.t = fun repr channel -> let decode_bin = Repr.decode_bin repr |> Repr.unstage in let decode_prefix = Repr.(decode_bin int32 |> unstage) in @@ -171,9 +171,8 @@ end let hash_of_string = Repr.of_string Encoding.Hash.t module Bench_suite - (Store : S - with type key = Encoding.Hash.t - and type value = Int63.t * int * char) = + (Store : + S with type key = Encoding.Hash.t and type value = Int63.t * int * char) = struct let key_to_hash k = match hash_of_string k with diff --git a/index-bench.opam b/index-bench.opam index 76b0b39e..f59f83f7 100644 --- a/index-bench.opam +++ b/index-bench.opam @@ -6,7 +6,7 @@ license: "MIT" homepage: "https://github.com/mirage/index" bug-reports: "https://github.com/mirage/index/issues" depends: [ - "ocaml" {>= "4.03.0"} + "ocaml" {>= "5.0.0"} "cmdliner" {>= "1.1.0"} "dune" {>= "2.7.0"} "fmt" diff --git a/index.opam b/index.opam index 47a6a8dc..82d96462 100644 --- a/index.opam +++ b/index.opam @@ -19,7 +19,7 @@ build: [ ] depends: [ - "ocaml" {>= "4.08.0"} + "ocaml" {>= "5.0.0"} "dune" {>= "2.7.0"} "optint" {>= "0.1.0"} "repr" {>= "0.6.0"} diff --git a/src/data.ml b/src/data.ml index b35f9909..52fd20bb 100644 --- a/src/data.ml +++ b/src/data.ml @@ -12,7 +12,8 @@ module type Key = sig (** The equality function for keys. *) val hash : t -> int - (** Note: Unevenly distributed hash functions may result in performance drops. *) + (** Note: Unevenly distributed hash functions may result in performance drops. + *) val hash_size : int (** The number of bits necessary to encode the maximum output value of diff --git a/src/index.ml b/src/index.ml index 6dde17a4..1c62a3f3 100644 --- a/src/index.ml +++ b/src/index.ml @@ -34,7 +34,8 @@ exception RO_not_allowed (** Raised whenever a read-only instance performs a write action. *) exception RW_not_allowed -(** Raised whenever a read-write instance performs a reserved read-only action. *) +(** Raised whenever a read-write instance performs a reserved read-only action. +*) exception Closed (** Raised whenever a closed instance is used. *) @@ -167,7 +168,8 @@ struct (** The log_async file contains bindings added concurrently to a [merge] operation. It is only present when a merge is ongoing. *) mutable open_instances : int; - (** The number of open instances that are shared through the [Cache.t]. *) + (** The number of open instances that are shared through the [Cache.t]. + *) mutable lru : Lru.t; writer_lock : IO.Lock.t option; (** A lock that prevents multiple RW instances to be open at the same @@ -175,7 +177,8 @@ struct sync_lock : Semaphore.t; (** A lock that prevents multiple [sync] to happen at the same time. *) merge_lock : Semaphore.t; - (** A lock that prevents multiple [merges] to happen at the same time. *) + (** A lock that prevents multiple [merges] to happen at the same time. + *) rename_lock : Semaphore.t; (** A lock used to protect a critical bit when finalizing a [merge] operation. All operations should be guarded by this lock. *) @@ -303,7 +306,8 @@ struct if (* the generation has changed *) h.generation > Int63.succ old_generation - || (* the last sync was done between clear(log) and clear(log_async) *) + || + (* the last sync was done between clear(log) and clear(log_async) *) (h.generation = Int63.succ old_generation && h.offset = Int63.zero) then ( (* close the file .*) @@ -324,9 +328,9 @@ struct "[%s] log_async IO header monotonicity violated during sync:@,\ \ offset: %a -> %a@,\ \ generation: %a -> %a@,\ - Reloading the log to compensate." (Filename.basename t.root) - Int63.pp old_offset Int63.pp h.offset Int63.pp old_generation - Int63.pp h.generation); + Reloading the log to compensate." + (Filename.basename t.root) Int63.pp old_offset Int63.pp h.offset + Int63.pp old_generation Int63.pp h.generation); Log_file.reload log) (** Syncs the [index] of the instance by checking on-disk changes. *) @@ -446,7 +450,8 @@ struct Search.interpolation_search (IOArray.v index.io) key ~low ~high (** Finds the value associated to [key] in [t]. In order, checks in - [log_async] (in memory), then [log] (in memory), then [index] (on disk). *) + [log_async] (in memory), then [log] (in memory), then [index] (on disk). + *) let find_instance t key = let find_if_exists ~name ~find db = match db with diff --git a/src/index_intf.ml b/src/index_intf.ml index 0ed8d11e..2921765d 100644 --- a/src/index_intf.ml +++ b/src/index_intf.ml @@ -60,10 +60,10 @@ module type S = sig (including both automatic flushing and explicit calls to {!flush} or {!close}). - This can be used to ensure certain pre-conditions are met before - bindings are persisted to disk. (For instance, if the index bindings are - pointers into another data-structure [d], it may be necessary to flush - [d] first to avoid creating dangling pointers.) + This can be used to ensure certain pre-conditions are met before bindings + are persisted to disk. (For instance, if the index bindings are pointers + into another data-structure [d], it may be necessary to flush [d] first to + avoid creating dangling pointers.) @param cache used for instance sharing. @param fresh whether an existing index should be overwritten. @param read_only whether read-only mode is enabled for this index. diff --git a/src/stats.ml b/src/stats.ml index 195a7f43..a4d0ae7a 100644 --- a/src/stats.ml +++ b/src/stats.ml @@ -1,62 +1,65 @@ open! Import -type t = { - mutable bytes_read : int; - mutable nb_reads : int; - mutable bytes_written : int; - mutable nb_writes : int; - mutable nb_merge : int; - mutable merge_durations : float list; - mutable nb_replace : int; - mutable replace_durations : float list; - mutable nb_sync : int; - mutable time_sync : float; - mutable lru_hits : int; - mutable lru_misses : int; +type _t = { + bytes_read : int Atomic.t; + nb_reads : int Atomic.t; + bytes_written : int Atomic.t; + nb_writes : int Atomic.t; + nb_merge : int Atomic.t; + merge_durations : float list Atomic.t; + nb_replace : int Atomic.t; + replace_durations : float list Atomic.t; + nb_sync : int Atomic.t; + time_sync : float Atomic.t; + lru_hits : int Atomic.t; + lru_misses : int Atomic.t; } let fresh_stats () = { - bytes_read = 0; - nb_reads = 0; - bytes_written = 0; - nb_writes = 0; - nb_merge = 0; - merge_durations = []; - nb_replace = 0; - replace_durations = []; - nb_sync = 0; - time_sync = 0.0; - lru_hits = 0; - lru_misses = 0; + bytes_read = Atomic.make 0; + nb_reads = Atomic.make 0; + bytes_written = Atomic.make 0; + nb_writes = Atomic.make 0; + nb_merge = Atomic.make 0; + merge_durations = Atomic.make []; + nb_replace = Atomic.make 0; + replace_durations = Atomic.make []; + nb_sync = Atomic.make 0; + time_sync = Atomic.make 0.0; + lru_hits = Atomic.make 0; + lru_misses = Atomic.make 0; } let stats = fresh_stats () -let get () = stats let reset_stats () = - stats.bytes_read <- 0; - stats.nb_reads <- 0; - stats.bytes_written <- 0; - stats.nb_writes <- 0; - stats.nb_merge <- 0; - stats.merge_durations <- []; - stats.nb_replace <- 0; - stats.replace_durations <- []; - stats.nb_sync <- 0; - stats.time_sync <- 0.0; - stats.lru_hits <- 0; - stats.lru_misses <- 0 - -let incr_bytes_read n = stats.bytes_read <- stats.bytes_read + n -let incr_bytes_written n = stats.bytes_written <- stats.bytes_written + n -let incr_nb_reads () = stats.nb_reads <- succ stats.nb_reads -let incr_nb_writes () = stats.nb_writes <- succ stats.nb_writes -let incr_nb_merge () = stats.nb_merge <- succ stats.nb_merge -let incr_nb_replace () = stats.nb_replace <- succ stats.nb_replace -let incr_nb_sync () = stats.nb_sync <- succ stats.nb_sync -let incr_nb_lru_hits () = stats.lru_hits <- succ stats.lru_hits -let incr_nb_lru_misses () = stats.lru_misses <- succ stats.lru_misses + Atomic.set stats.bytes_read 0; + Atomic.set stats.nb_reads 0; + Atomic.set stats.bytes_written 0; + Atomic.set stats.nb_writes 0; + Atomic.set stats.nb_merge 0; + Atomic.set stats.merge_durations []; + Atomic.set stats.nb_replace 0; + Atomic.set stats.replace_durations []; + Atomic.set stats.nb_sync 0; + Atomic.set stats.time_sync 0.0; + Atomic.set stats.lru_hits 0; + Atomic.set stats.lru_misses 0 + +let incr_bytes_read n = + Atomic.set stats.bytes_read (Atomic.get stats.bytes_read + n) + +let incr_bytes_written n = + Atomic.set stats.bytes_written (Atomic.get stats.bytes_written + n) + +let incr_nb_reads () = Atomic.incr stats.nb_reads +let incr_nb_writes () = Atomic.incr stats.nb_writes +let incr_nb_merge () = Atomic.incr stats.nb_merge +let incr_nb_replace () = Atomic.incr stats.nb_replace +let incr_nb_sync () = Atomic.incr stats.nb_sync +let incr_nb_lru_hits () = Atomic.incr stats.lru_hits +let incr_nb_lru_misses () = Atomic.incr stats.lru_misses let add_read n = incr_bytes_read n; @@ -66,6 +69,37 @@ let add_write n = incr_bytes_written n; incr_nb_writes () +type t = { + bytes_read : int; + nb_reads : int; + bytes_written : int; + nb_writes : int; + nb_merge : int; + merge_durations : float list; + nb_replace : int; + replace_durations : float list; + nb_sync : int; + time_sync : float; + lru_hits : int; + lru_misses : int; +} + +let get () = + { + bytes_read = Atomic.get stats.bytes_read; + nb_reads = Atomic.get stats.nb_reads; + bytes_written = Atomic.get stats.bytes_written; + nb_writes = Atomic.get stats.nb_writes; + nb_merge = Atomic.get stats.nb_merge; + merge_durations = Atomic.get stats.merge_durations; + nb_replace = Atomic.get stats.nb_replace; + replace_durations = Atomic.get stats.replace_durations; + nb_sync = Atomic.get stats.nb_sync; + time_sync = Atomic.get stats.time_sync; + lru_hits = Atomic.get stats.lru_hits; + lru_misses = Atomic.get stats.lru_misses; + } + module Make (Clock : Platform.CLOCK) = struct let replace_timer = ref (Clock.counter ()) let nb_replace = ref 0 @@ -78,7 +112,8 @@ module Make (Clock : Platform.CLOCK) = struct if !nb_replace = sampling_interval then ( let span = Clock.count !replace_timer in let average = Mtime.span_to_us span /. float_of_int !nb_replace in - stats.replace_durations <- average :: stats.replace_durations; + Atomic.set stats.replace_durations + (average :: Atomic.get stats.replace_durations); replace_timer := Clock.counter (); nb_replace := 0) @@ -86,11 +121,12 @@ module Make (Clock : Platform.CLOCK) = struct let timer = Clock.counter () in f (); let span = Clock.count timer in - stats.time_sync <- Mtime.span_to_us span + Atomic.set stats.time_sync (Mtime.span_to_us span) let drop_head l = if List.length l >= 10 then List.tl l else l let add_merge_duration span = let span = Mtime.span_to_us span in - stats.merge_durations <- drop_head stats.merge_durations @ [ span ] + Atomic.set stats.merge_durations + (drop_head (Atomic.get stats.merge_durations) @ [ span ]) end diff --git a/src/stats.mli b/src/stats.mli index a1d3ead3..b18d43f3 100644 --- a/src/stats.mli +++ b/src/stats.mli @@ -1,18 +1,18 @@ open! Import type t = { - mutable bytes_read : int; - mutable nb_reads : int; - mutable bytes_written : int; - mutable nb_writes : int; - mutable nb_merge : int; - mutable merge_durations : float list; - mutable nb_replace : int; - mutable replace_durations : float list; - mutable nb_sync : int; - mutable time_sync : float; - mutable lru_hits : int; - mutable lru_misses : int; + bytes_read : int; + nb_reads : int; + bytes_written : int; + nb_writes : int; + nb_merge : int; + merge_durations : float list; + nb_replace : int; + replace_durations : float list; + nb_sync : int; + time_sync : float; + lru_hits : int; + lru_misses : int; } (** The type for stats for an index I. diff --git a/src/unix/buffer.mli b/src/unix/buffer.mli index 6275a380..ead9ef5c 100644 --- a/src/unix/buffer.mli +++ b/src/unix/buffer.mli @@ -32,10 +32,12 @@ val clear : t -> unit val add_substring : t -> string -> off:int -> len:int -> unit (** [add_substring t s ~off ~len] appends the substring - [s.(off) .. s.(off + len - 1)] at the end of [t], resizing [t] if necessary. *) + [s.(off) .. s.(off + len - 1)] at the end of [t], resizing [t] if necessary. +*) val add_string : t -> string -> unit -(** [add_string t s] appends [s] at the end of [t], resizing [t] if necessary. *) +(** [add_string t s] appends [s] at the end of [t], resizing [t] if necessary. +*) val write_with : (string -> int -> int -> unit) -> t -> unit (** [write_with writer t] uses [writer] to write the contents of [t]. [writer] diff --git a/test/unix/main.ml b/test/unix/main.ml index 9393d90f..e53c5efe 100644 --- a/test/unix/main.ml +++ b/test/unix/main.ml @@ -922,7 +922,8 @@ module Filter = struct (* Filtering should also affect the in-memory LRU. *) let lru_size = 10 - (** Test that all bindings are kept when using [filter] with a true predicate. *) + (** Test that all bindings are kept when using [filter] with a true predicate. + *) let filter_none () = let* Context.{ rw; tbl; _ } = Context.with_full_index ~lru_size () in Index.filter rw (fun _ -> true);