Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ struct
hook `Before_offset_read;
let h = IO.Header.get log.io in
hook `After_offset_read;
if t.generation <> h.generation then (
if t.generation < h.generation then (
(* If the generation has changed, then we need to reload both the
[index] and the [log]. The new generation is the one on disk. *)
Log.debug (fun l ->
Expand All @@ -323,13 +323,14 @@ struct
l "[%s] new entries detected, reading log from disk"
(Filename.basename t.root));
sync_log_entries ~min:log_offset log)
else
else if log_offset = h.offset then
(* Here the disk offset should be equal to the known one. A smaller
offset should not be possible, because that would mean a [clear] or
[merge] occurred, which should have changed the generation. *)
(* TODO: Handle the "impossible" case differently? *)
Log.debug (fun l ->
l "[%s] no changes detected" (Filename.basename t.root))
else assert false

(** {1 Find and Mem}*)

Expand Down
154 changes: 154 additions & 0 deletions test/unix/concurrent.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
module Hook = Index.Private.Hook
open Common

let ( // ) = Filename.concat
let root = "_tests" // "unix.concurrent"
let src = Logs.Src.create "tests.unix.concurrent" ~doc:"Tests"

module Log = (val Logs.src_log src : Logs.LOG)

module Context = Common.Make_context (struct
let root = root
end)

(* Helpers for Unix pipe *)
let write input =
let m = Bytes.of_string (string_of_int (Unix.getpid ())) in
let n = Unix.write input m 0 (Bytes.length m) in
assert (n = Bytes.length m)

let read output =
let buff = Bytes.create 5 in
match Unix.read output buff 0 5 with
| 0 -> Alcotest.fail "Something wrong when reading from the pipe"
| n -> int_of_string (Bytes.to_string (Bytes.sub buff 0 n))

let ignore_pid (_ : int) = ()

let wait pid =
let pid', status = Unix.waitpid [ Unix.WUNTRACED ] pid in
if pid <> pid' then
Alcotest.failf "I'm %d, expecting child %d, but got %d instead"
(Unix.getpid ()) pid pid';
match status with
| Unix.WEXITED 0 -> Log.debug (fun l -> l "Child %d finished work" pid)
| _ -> Alcotest.failf "Child %d died unexpectedly" pid

let lsof () =
let name = "/tmp/" in
let pid = string_of_int (Unix.getpid ()) in
let fd_file = name ^ "tmp_" ^ pid in
let lsof_command = "lsof -a -s -p " ^ pid ^ " >> " ^ fd_file in
match Unix.system lsof_command with
| Unix.WEXITED 0 -> ()
| Unix.WEXITED _ ->
failwith "failing `lsof` command. Is `lsof` installed on your system?"
| Unix.WSIGNALED _ | Unix.WSTOPPED _ ->
failwith "`lsof` command was interrupted"

module WriteBatch = struct
let nb_workers = 1
let nb_batch_writes = 1000
let batch_size = 5

let populate_array () =
Array.init (batch_size * nb_batch_writes) (fun _ ->
let k = Key.v () in
let v = Value.v () in
(k, v))

let test_find_present ~ro t arr batch =
let start_ = (batch - 1) * batch_size in
let end_ = (batch_size * batch) - 1 in
Fmt.epr "%b read %d from %d to %d\n%!" ro batch start_ end_;
for i = start_ to end_ do
let k, v = arr.(i) in
let v' = Index.find t k in
if not (v = v') then raise Not_found
done

let test_find_absent ~ro t arr batch =
let start_ = (batch - 1) * batch_size in
let end_ = (batch_size * batch) - 1 in
Fmt.epr "%b read absent %d from %d to %d\n%!" ro batch start_ end_;
for i = start_ to end_ do
let k, _ = arr.(i) in
match Index.find t k with
| exception Not_found -> ()
| _ -> Alcotest.failf "Found %s key after clear " k
done

let add_values t arr batch =
Index.clear t;
let start_ = (batch - 1) * batch_size in
let end_ = (batch_size * batch) - 1 in
Fmt.epr "add values %d to %d\n%!" start_ end_;
for i = start_ to end_ do
let k, v = arr.(i) in
Index.replace t k v
done;
Index.flush t

let worker input_write output_read name arr =
let r = Index.v ~fresh:false ~log_size:4 ~readonly:true name in
for i = 1 to nb_batch_writes do
write input_write;
ignore (read output_read);
Index.sync r;
test_find_present ~ro:true r arr i;
Log.debug (fun l -> l "RO read by %d" (Unix.getpid ()))
done;
write input_write;
ignore (read output_read);
Index.sync r;
test_find_absent ~ro:true r arr nb_batch_writes

let concurrent_reads () =
let output_write, input_write = Unix.pipe ()
and output_read, input_read = Unix.pipe () in
let root = Context.fresh_name "empty_index" in
let arr = populate_array () in
match Unix.fork () with
| 0 ->
Log.debug (fun l -> l "I'm %d" (Unix.getpid ()));
worker input_write output_read root arr;
exit 0
| pid ->
Log.debug (fun l ->
l "I'm main process %d, created %d" (Unix.getpid ()) pid);
let rw = Index.v ~fresh:true ~log_size:4 root in
for i = 1 to nb_batch_writes do
Log.debug (fun l -> l "Starting batch nb %d" i);
if i mod 100 = 0 then lsof ();
for _ = 0 to nb_workers - 1 do
let pid = read output_write in
Log.debug (fun l -> l "Ack from %d" pid)
done;
add_values rw arr i;
for _ = 0 to nb_workers - 1 do
write input_read
done;
test_find_present ~ro:false rw arr i;
Log.debug (fun l -> l "Write from rw index")
done;
for _ = 0 to nb_workers - 1 do
let pid = read output_write in
Log.debug (fun l -> l "Ack from %d" pid)
done;
Index.clear rw;
for _ = 0 to nb_workers - 1 do
write input_read
done;
wait pid;
Unix.close input_write;
Unix.close output_write;
Unix.close input_read;
Unix.close output_read

let check_os () = if Sys.os_type = "Win32" then () else concurrent_reads ()
let tests = [ ("concurrent reads and writes", `Quick, check_os) ]
end

let () =
(* Common.report (); *)
Alcotest.run ~verbose:true "concurrent" [ ("write batch", WriteBatch.tests) ]
2 changes: 1 addition & 1 deletion test/unix/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(tests
(names main force_merge io_array)
(names main force_merge io_array concurrent)
(package index)
(libraries index index.unix alcotest fmt logs logs.fmt re stdlib-shims
threads.posix repr semaphore-compat optint))
79 changes: 79 additions & 0 deletions test/unix/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,54 @@ module Readonly = struct
Alcotest.(check int) "reloadings of log async per merge" 1 !reload_log_async;
Index.await t |> check_completed

let race_log_async () =
let* Context.{ rw; clone; _ } = Context.with_empty_index () in
let ro = clone ~readonly:true () in
let merge = Semaphore.make false in
let sync = Semaphore.make false in
let merge2 = Semaphore.make false in
let merge_hook1 =
I.Private.Hook.v @@ function
| `Before ->
Semaphore.acquire merge;
Semaphore.release sync
| `After_clear -> Index.sync ro
| _ -> ()
in
let merge_hook2 =
I.Private.Hook.v @@ function
| `Before ->
Semaphore.acquire merge2;
Semaphore.acquire merge
| _ -> ()
in
let k1, v1 = (Key.v (), Value.v ()) in
let k2, v2 = (Key.v (), Value.v ()) in
let k3, v3 = (Key.v (), Value.v ()) in
let k4, v4 = (Key.v (), Value.v ()) in
Index.replace rw k1 v1;
Index.flush rw;
let t = Index.try_merge_aux ~force:true ~hook:merge_hook1 rw in
Semaphore.release merge;
Log.debug (fun l -> l "read log async");
Index.replace rw k2 v2;
Index.flush rw;
Index.sync ro;
Index.check_binding ro k2 v2;
Semaphore.acquire sync;
Index.await t |> check_completed;
Index.replace rw k3 v3;
Index.flush rw;
let t = Index.try_merge_aux ~force:true ~hook:merge_hook2 rw in
Semaphore.release merge2;
Log.debug (fun l -> l "XXX read log async after second merge");
Index.replace rw k4 v4;
Index.flush rw;
Index.sync ro;
Index.check_binding ro k4 v4;
Semaphore.release merge;
Index.await t |> check_completed

let tests =
[
("add", `Quick, readonly);
Expand All @@ -618,6 +666,7 @@ module Readonly = struct
`Quick,
readonly_sync_and_merge_clear );
("reload log and log async", `Quick, reload_log_async);
("race log async", `Quick, race_log_async);
]
end

Expand Down Expand Up @@ -899,6 +948,35 @@ module Throttle = struct
]
end

module BatchReads = struct
let add_values ?(batch_size = 10) t tbl =
let rec loop i =
if i = 0 then Index.flush t
else
let k = Key.v () in
let v = Value.v () in
Index.replace t k v;
Hashtbl.replace tbl k v;
loop (i - 1)
in
loop batch_size

let readonly_reads_rw_writes () =
let nb_batch_writes = 3 in
let* Context.{ rw; tbl; clone; _ } = Context.with_full_index () in
let ro = clone ~readonly:true () in
check_equivalence ro tbl;

for _i = 0 to nb_batch_writes do
add_values rw tbl;
(* check_equivalence rw tbl; *)
Index.sync ro;
check_equivalence ro tbl
done

let tests = [ ("reads and writes", `Quick, readonly_reads_rw_writes) ]
end

let () =
Common.report ();
Alcotest.run "index.unix"
Expand All @@ -912,4 +990,5 @@ let () =
("filter", Filter.tests);
("flush_callback", Flush_callback.tests);
("throttle", Throttle.tests);
("batch", BatchReads.tests);
]