Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions test/unix/concurrent_ro.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
open Common

let index_name = Filename.concat "_tests" "unix.concurrent_ro"

let nb_workers = 2

let nb_batch_writes = 2

let batch_size = 5

let test_find_present t tbl =
Hashtbl.iter
(fun k v ->
match Index.find t k with
| exception Not_found ->
Alcotest.failf "Wrong insertion: %s key is missing." k
| v' ->
if not (v = v') then
Alcotest.failf "Wrong insertion: %s value is missing." v)
tbl

let add_values t tbl =
let rec loop i =
if i = 0 then Index.flush t
else
let k = Key.v () in
let v = Value.v () in
Index.replace t k v;
Hashtbl.replace tbl k v;
loop (i - 1)
in
loop batch_size

let write input =
try
let m = Bytes.of_string (string_of_int (Unix.getpid ())) in
ignore (Unix.write input m 0 (Bytes.length m))
with Unix.Unix_error (n, f, arg) ->
failwith (f ^ arg ^ Unix.error_message n)

let read output =
let buff = Bytes.create 5 in
match Unix.read output buff 0 5 with
| 0 -> failwith "Something wrong when reading from the pipe"
| n -> int_of_string (Bytes.to_string (Bytes.sub buff 0 n))

let worker input_write output_read tbl =
let r = Index.v ~fresh:false ~readonly:true ~log_size index_name in
test_find_present r tbl;
for _i = 0 to nb_batch_writes do
write input_write;
ignore (read output_read);
test_find_present r tbl;
Logs.debug (fun l -> l "Read from ro index by %d" (Unix.getpid ()))
done

let concurrent_reads () =
let output_write, input_write = Unix.pipe ()
and output_read, input_read = Unix.pipe () in
let tbl = tbl index_name in
let w = Index.v ~fresh:false ~log_size index_name in
match Unix.fork () with
| 0 ->
for _i = 0 to nb_workers - 1 do
match Unix.fork () with
| 0 -> Logs.debug (fun l -> l "I'm %d" (Unix.getpid ()))
| pid ->
Logs.debug (fun l ->
l "Child %d created by %d" pid (Unix.getpid ()));
worker input_write output_read tbl;
exit 0
done;
exit 0
| _ ->
for i = 0 to nb_batch_writes do
Printf.printf "Starting batch nb %d\n%!" i;
for _i = 0 to nb_workers - 1 do
let pid = read output_write in
Logs.debug (fun l -> l "Ack from %d" pid)
done;
add_values w tbl;
for _i = 0 to nb_workers - 1 do
write input_read
done;
test_find_present w tbl;
Logs.debug (fun l -> l "Write from rw index")
done;
Unix.close input_write;
Unix.close output_write;
Unix.close input_read;
Unix.close output_read

let tests = ("concurrent", [ ("concurrent reads", `Quick, concurrent_reads) ])

let () =
Common.report ();
Alcotest.run "index" [ tests ]
2 changes: 1 addition & 1 deletion test/unix/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(tests
(names main force_merge io_array)
(names main force_merge io_array concurrent_ro)
(libraries index.unix alcotest fmt logs logs.fmt re))
3 changes: 0 additions & 3 deletions test/unix/force_merge.ml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,3 @@ let () =
("readonly tests", readonly_tests);
("merge and readonly tests", merge_tests);
]

(* Unix.sleep 10 *)
(* for `ps aux | grep force_merge` and `lsof -a -s -p pid` *)
48 changes: 47 additions & 1 deletion test/unix/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,32 @@ let readonly_clear () =
Not_found (fun () -> ignore (Index.find r k)))
tbl

let add_values ?(batch_size = (2 * log_size) + 2) t tbl =
let rec loop i =
if i = 0 then Index.flush t
else
let k = Key.v () in
let v = Value.v () in
Index.replace t k v;
Hashtbl.replace tbl k v;
loop (i - 1)
in
loop batch_size

let readonly_read_rw_write () =
let nb_batch_writes = 2 in
let w = Index.v ~fresh:false ~log_size (root // "index2") in
Hashtbl.iter (fun k v -> Index.replace w k v) tbl;
test_find_present w;
let r = Index.v ~fresh:false ~readonly:true ~log_size (root // "index2") in
test_find_present r;
for _i = 0 to nb_batch_writes do
add_values w tbl;
test_find_present w;
Index.flush w;
test_find_present r
done

let close_reopen_rw () =
let w = Index.v ~fresh:true ~readonly:false ~log_size (root // "test1") in
Hashtbl.iter (fun k v -> Index.replace w k v) tbl;
Expand Down Expand Up @@ -196,13 +222,29 @@ let open_twice_readonly () =
Index.close r1;
test_read_after_close_readonly r2 k

let open_twice_rw () =
let w1 = Index.v ~fresh:true ~readonly:false ~log_size (root // "test8") in
Hashtbl.iter (fun k v -> Index.replace w1 k v) tbl;
Index.flush w1;
let r1 = Index.v ~fresh:false ~readonly:true ~log_size (root // "test8") in
test_find_present r1;
let w2 = Index.v ~fresh:false ~readonly:false ~log_size (root // "test8") in
test_find_present w2;
let k = Key.v () in
let v = Value.v () in
Index.replace w2 k v;
test_find_present w1;
if Index.find w1 k <> v then
Alcotest.fail (Printf.sprintf "Wrong insertion: %s value was not added" v)

let live_tests =
[
("find (present)", `Quick, find_present_live);
("find (absent)", `Quick, find_absent_live);
("replace", `Quick, replace_live);
("fail add (key)", `Quick, different_size_for_key);
("fail add (value)", `Quick, different_size_for_value);
("open twice rw", `Quick, open_twice_rw);
]

let restart_tests =
Expand All @@ -213,7 +255,11 @@ let restart_tests =
]

let readonly_tests =
[ ("add", `Quick, readonly); ("read after clear", `Quick, readonly_clear) ]
[
("add", `Quick, readonly);
("read after clear", `Quick, readonly_clear);
("read after write in batches", `Quick, readonly_read_rw_write);
]

let close_tests =
[
Expand Down