diff --git a/src/index.ml b/src/index.ml index 3183b579..eb746b3d 100644 --- a/src/index.ml +++ b/src/index.ml @@ -303,7 +303,7 @@ struct hook `Before_offset_read; let h = IO.Header.get log.io in hook `After_offset_read; - if t.generation <> h.generation then ( + if t.generation < h.generation then ( (* If the generation has changed, then we need to reload both the [index] and the [log]. The new generation is the one on disk. *) Log.debug (fun l -> @@ -323,13 +323,14 @@ struct l "[%s] new entries detected, reading log from disk" (Filename.basename t.root)); sync_log_entries ~min:log_offset log) - else + else if log_offset = h.offset then (* Here the disk offset should be equal to the known one. A smaller offset should not be possible, because that would mean a [clear] or [merge] occurred, which should have changed the generation. *) (* TODO: Handle the "impossible" case differently? *) Log.debug (fun l -> l "[%s] no changes detected" (Filename.basename t.root)) + else assert false (** {1 Find and Mem}*) diff --git a/test/unix/concurrent.ml b/test/unix/concurrent.ml new file mode 100644 index 00000000..cebaec5f --- /dev/null +++ b/test/unix/concurrent.ml @@ -0,0 +1,154 @@ +module Hook = Index.Private.Hook +open Common + +let ( // ) = Filename.concat +let root = "_tests" // "unix.concurrent" +let src = Logs.Src.create "tests.unix.concurrent" ~doc:"Tests" + +module Log = (val Logs.src_log src : Logs.LOG) + +module Context = Common.Make_context (struct + let root = root +end) + +(* Helpers for Unix pipe *) +let write input = + let m = Bytes.of_string (string_of_int (Unix.getpid ())) in + let n = Unix.write input m 0 (Bytes.length m) in + assert (n = Bytes.length m) + +let read output = + let buff = Bytes.create 5 in + match Unix.read output buff 0 5 with + | 0 -> Alcotest.fail "Something wrong when reading from the pipe" + | n -> int_of_string (Bytes.to_string (Bytes.sub buff 0 n)) + +let ignore_pid (_ : int) = () + +let wait pid = + let pid', status = Unix.waitpid [ Unix.WUNTRACED ] pid in + if pid <> pid' then + Alcotest.failf "I'm %d, expecting child %d, but got %d instead" + (Unix.getpid ()) pid pid'; + match status with + | Unix.WEXITED 0 -> Log.debug (fun l -> l "Child %d finished work" pid) + | _ -> Alcotest.failf "Child %d died unexpectedly" pid + +let lsof () = + let name = "/tmp/" in + let pid = string_of_int (Unix.getpid ()) in + let fd_file = name ^ "tmp_" ^ pid in + let lsof_command = "lsof -a -s -p " ^ pid ^ " >> " ^ fd_file in + match Unix.system lsof_command with + | Unix.WEXITED 0 -> () + | Unix.WEXITED _ -> + failwith "failing `lsof` command. Is `lsof` installed on your system?" + | Unix.WSIGNALED _ | Unix.WSTOPPED _ -> + failwith "`lsof` command was interrupted" + +module WriteBatch = struct + let nb_workers = 1 + let nb_batch_writes = 1000 + let batch_size = 5 + + let populate_array () = + Array.init (batch_size * nb_batch_writes) (fun _ -> + let k = Key.v () in + let v = Value.v () in + (k, v)) + + let test_find_present ~ro t arr batch = + let start_ = (batch - 1) * batch_size in + let end_ = (batch_size * batch) - 1 in + Fmt.epr "%b read %d from %d to %d\n%!" ro batch start_ end_; + for i = start_ to end_ do + let k, v = arr.(i) in + let v' = Index.find t k in + if not (v = v') then raise Not_found + done + + let test_find_absent ~ro t arr batch = + let start_ = (batch - 1) * batch_size in + let end_ = (batch_size * batch) - 1 in + Fmt.epr "%b read absent %d from %d to %d\n%!" ro batch start_ end_; + for i = start_ to end_ do + let k, _ = arr.(i) in + match Index.find t k with + | exception Not_found -> () + | _ -> Alcotest.failf "Found %s key after clear " k + done + + let add_values t arr batch = + Index.clear t; + let start_ = (batch - 1) * batch_size in + let end_ = (batch_size * batch) - 1 in + Fmt.epr "add values %d to %d\n%!" start_ end_; + for i = start_ to end_ do + let k, v = arr.(i) in + Index.replace t k v + done; + Index.flush t + + let worker input_write output_read name arr = + let r = Index.v ~fresh:false ~log_size:4 ~readonly:true name in + for i = 1 to nb_batch_writes do + write input_write; + ignore (read output_read); + Index.sync r; + test_find_present ~ro:true r arr i; + Log.debug (fun l -> l "RO read by %d" (Unix.getpid ())) + done; + write input_write; + ignore (read output_read); + Index.sync r; + test_find_absent ~ro:true r arr nb_batch_writes + + let concurrent_reads () = + let output_write, input_write = Unix.pipe () + and output_read, input_read = Unix.pipe () in + let root = Context.fresh_name "empty_index" in + let arr = populate_array () in + match Unix.fork () with + | 0 -> + Log.debug (fun l -> l "I'm %d" (Unix.getpid ())); + worker input_write output_read root arr; + exit 0 + | pid -> + Log.debug (fun l -> + l "I'm main process %d, created %d" (Unix.getpid ()) pid); + let rw = Index.v ~fresh:true ~log_size:4 root in + for i = 1 to nb_batch_writes do + Log.debug (fun l -> l "Starting batch nb %d" i); + if i mod 100 = 0 then lsof (); + for _ = 0 to nb_workers - 1 do + let pid = read output_write in + Log.debug (fun l -> l "Ack from %d" pid) + done; + add_values rw arr i; + for _ = 0 to nb_workers - 1 do + write input_read + done; + test_find_present ~ro:false rw arr i; + Log.debug (fun l -> l "Write from rw index") + done; + for _ = 0 to nb_workers - 1 do + let pid = read output_write in + Log.debug (fun l -> l "Ack from %d" pid) + done; + Index.clear rw; + for _ = 0 to nb_workers - 1 do + write input_read + done; + wait pid; + Unix.close input_write; + Unix.close output_write; + Unix.close input_read; + Unix.close output_read + + let check_os () = if Sys.os_type = "Win32" then () else concurrent_reads () + let tests = [ ("concurrent reads and writes", `Quick, check_os) ] +end + +let () = + (* Common.report (); *) + Alcotest.run ~verbose:true "concurrent" [ ("write batch", WriteBatch.tests) ] diff --git a/test/unix/dune b/test/unix/dune index c850fd8d..b9dd232e 100644 --- a/test/unix/dune +++ b/test/unix/dune @@ -1,5 +1,5 @@ (tests - (names main force_merge io_array) + (names main force_merge io_array concurrent) (package index) (libraries index index.unix alcotest fmt logs logs.fmt re stdlib-shims threads.posix repr semaphore-compat optint)) diff --git a/test/unix/main.ml b/test/unix/main.ml index 0368ade0..00f75031 100644 --- a/test/unix/main.ml +++ b/test/unix/main.ml @@ -592,6 +592,54 @@ module Readonly = struct Alcotest.(check int) "reloadings of log async per merge" 1 !reload_log_async; Index.await t |> check_completed + let race_log_async () = + let* Context.{ rw; clone; _ } = Context.with_empty_index () in + let ro = clone ~readonly:true () in + let merge = Semaphore.make false in + let sync = Semaphore.make false in + let merge2 = Semaphore.make false in + let merge_hook1 = + I.Private.Hook.v @@ function + | `Before -> + Semaphore.acquire merge; + Semaphore.release sync + | `After_clear -> Index.sync ro + | _ -> () + in + let merge_hook2 = + I.Private.Hook.v @@ function + | `Before -> + Semaphore.acquire merge2; + Semaphore.acquire merge + | _ -> () + in + let k1, v1 = (Key.v (), Value.v ()) in + let k2, v2 = (Key.v (), Value.v ()) in + let k3, v3 = (Key.v (), Value.v ()) in + let k4, v4 = (Key.v (), Value.v ()) in + Index.replace rw k1 v1; + Index.flush rw; + let t = Index.try_merge_aux ~force:true ~hook:merge_hook1 rw in + Semaphore.release merge; + Log.debug (fun l -> l "read log async"); + Index.replace rw k2 v2; + Index.flush rw; + Index.sync ro; + Index.check_binding ro k2 v2; + Semaphore.acquire sync; + Index.await t |> check_completed; + Index.replace rw k3 v3; + Index.flush rw; + let t = Index.try_merge_aux ~force:true ~hook:merge_hook2 rw in + Semaphore.release merge2; + Log.debug (fun l -> l "XXX read log async after second merge"); + Index.replace rw k4 v4; + Index.flush rw; + Index.sync ro; + Index.check_binding ro k4 v4; + Semaphore.release merge; + Index.await t |> check_completed + let tests = [ ("add", `Quick, readonly); @@ -618,6 +666,7 @@ module Readonly = struct `Quick, readonly_sync_and_merge_clear ); ("reload log and log async", `Quick, reload_log_async); + ("race log async", `Quick, race_log_async); ] end @@ -899,6 +948,35 @@ module Throttle = struct ] end +module BatchReads = struct + let add_values ?(batch_size = 10) t tbl = + let rec loop i = + if i = 0 then Index.flush t + else + let k = Key.v () in + let v = Value.v () in + Index.replace t k v; + Hashtbl.replace tbl k v; + loop (i - 1) + in + loop batch_size + + let readonly_reads_rw_writes () = + let nb_batch_writes = 3 in + let* Context.{ rw; tbl; clone; _ } = Context.with_full_index () in + let ro = clone ~readonly:true () in + check_equivalence ro tbl; + + for _i = 0 to nb_batch_writes do + add_values rw tbl; + (* check_equivalence rw tbl; *) + Index.sync ro; + check_equivalence ro tbl + done + + let tests = [ ("reads and writes", `Quick, readonly_reads_rw_writes) ] +end + let () = Common.report (); Alcotest.run "index.unix" @@ -912,4 +990,5 @@ let () = ("filter", Filter.tests); ("flush_callback", Flush_callback.tests); ("throttle", Throttle.tests); + ("batch", BatchReads.tests); ]