diff --git a/test/unix/concurrent_ro.ml b/test/unix/concurrent_ro.ml new file mode 100644 index 00000000..3bb0171c --- /dev/null +++ b/test/unix/concurrent_ro.ml @@ -0,0 +1,97 @@ +open Common + +let index_name = Filename.concat "_tests" "unix.concurrent_ro" + +let nb_workers = 2 + +let nb_batch_writes = 2 + +let batch_size = 5 + +let test_find_present t tbl = + Hashtbl.iter + (fun k v -> + match Index.find t k with + | exception Not_found -> + Alcotest.failf "Wrong insertion: %s key is missing." k + | v' -> + if not (v = v') then + Alcotest.failf "Wrong insertion: %s value is missing." v) + tbl + +let add_values t tbl = + let rec loop i = + if i = 0 then Index.flush t + else + let k = Key.v () in + let v = Value.v () in + Index.replace t k v; + Hashtbl.replace tbl k v; + loop (i - 1) + in + loop batch_size + +let write input = + try + let m = Bytes.of_string (string_of_int (Unix.getpid ())) in + ignore (Unix.write input m 0 (Bytes.length m)) + with Unix.Unix_error (n, f, arg) -> + failwith (f ^ arg ^ Unix.error_message n) + +let read output = + let buff = Bytes.create 5 in + match Unix.read output buff 0 5 with + | 0 -> failwith "Something wrong when reading from the pipe" + | n -> int_of_string (Bytes.to_string (Bytes.sub buff 0 n)) + +let worker input_write output_read tbl = + let r = Index.v ~fresh:false ~readonly:true ~log_size index_name in + test_find_present r tbl; + for _i = 0 to nb_batch_writes do + write input_write; + ignore (read output_read); + test_find_present r tbl; + Logs.debug (fun l -> l "Read from ro index by %d" (Unix.getpid ())) + done + +let concurrent_reads () = + let output_write, input_write = Unix.pipe () + and output_read, input_read = Unix.pipe () in + let tbl = tbl index_name in + let w = Index.v ~fresh:false ~log_size index_name in + match Unix.fork () with + | 0 -> + for _i = 0 to nb_workers - 1 do + match Unix.fork () with + | 0 -> Logs.debug (fun l -> l "I'm %d" (Unix.getpid ())) + | pid -> + Logs.debug (fun l -> + l "Child %d created by %d" pid (Unix.getpid ())); + worker input_write output_read tbl; + exit 0 + done; + exit 0 + | _ -> + for i = 0 to nb_batch_writes do + Printf.printf "Starting batch nb %d\n%!" i; + for _i = 0 to nb_workers - 1 do + let pid = read output_write in + Logs.debug (fun l -> l "Ack from %d" pid) + done; + add_values w tbl; + for _i = 0 to nb_workers - 1 do + write input_read + done; + test_find_present w tbl; + Logs.debug (fun l -> l "Write from rw index") + done; + Unix.close input_write; + Unix.close output_write; + Unix.close input_read; + Unix.close output_read + +let tests = ("concurrent", [ ("concurrent reads", `Quick, concurrent_reads) ]) + +let () = + Common.report (); + Alcotest.run "index" [ tests ] diff --git a/test/unix/dune b/test/unix/dune index 28ca2269..9bd78881 100644 --- a/test/unix/dune +++ b/test/unix/dune @@ -1,3 +1,3 @@ (tests - (names main force_merge io_array) + (names main force_merge io_array concurrent_ro) (libraries index.unix alcotest fmt logs logs.fmt re)) diff --git a/test/unix/force_merge.ml b/test/unix/force_merge.ml index cb1fe6e0..a73af31f 100644 --- a/test/unix/force_merge.ml +++ b/test/unix/force_merge.ml @@ -172,6 +172,3 @@ let () = ("readonly tests", readonly_tests); ("merge and readonly tests", merge_tests); ] - -(* Unix.sleep 10 *) -(* for `ps aux | grep force_merge` and `lsof -a -s -p pid` *) diff --git a/test/unix/main.ml b/test/unix/main.ml index 6a7ca656..c9e454e9 100644 --- a/test/unix/main.ml +++ b/test/unix/main.ml @@ -114,6 +114,32 @@ let readonly_clear () = Not_found (fun () -> ignore (Index.find r k))) tbl +let add_values ?(batch_size = (2 * log_size) + 2) t tbl = + let rec loop i = + if i = 0 then Index.flush t + else + let k = Key.v () in + let v = Value.v () in + Index.replace t k v; + Hashtbl.replace tbl k v; + loop (i - 1) + in + loop batch_size + +let readonly_read_rw_write () = + let nb_batch_writes = 2 in + let w = Index.v ~fresh:false ~log_size (root // "index2") in + Hashtbl.iter (fun k v -> Index.replace w k v) tbl; + test_find_present w; + let r = Index.v ~fresh:false ~readonly:true ~log_size (root // "index2") in + test_find_present r; + for _i = 0 to nb_batch_writes do + add_values w tbl; + test_find_present w; + Index.flush w; + test_find_present r + done + let close_reopen_rw () = let w = Index.v ~fresh:true ~readonly:false ~log_size (root // "test1") in Hashtbl.iter (fun k v -> Index.replace w k v) tbl; @@ -196,6 +222,21 @@ let open_twice_readonly () = Index.close r1; test_read_after_close_readonly r2 k +let open_twice_rw () = + let w1 = Index.v ~fresh:true ~readonly:false ~log_size (root // "test8") in + Hashtbl.iter (fun k v -> Index.replace w1 k v) tbl; + Index.flush w1; + let r1 = Index.v ~fresh:false ~readonly:true ~log_size (root // "test8") in + test_find_present r1; + let w2 = Index.v ~fresh:false ~readonly:false ~log_size (root // "test8") in + test_find_present w2; + let k = Key.v () in + let v = Value.v () in + Index.replace w2 k v; + test_find_present w1; + if Index.find w1 k <> v then + Alcotest.fail (Printf.sprintf "Wrong insertion: %s value was not added" v) + let live_tests = [ ("find (present)", `Quick, find_present_live); @@ -203,6 +244,7 @@ let live_tests = ("replace", `Quick, replace_live); ("fail add (key)", `Quick, different_size_for_key); ("fail add (value)", `Quick, different_size_for_value); + ("open twice rw", `Quick, open_twice_rw); ] let restart_tests = @@ -213,7 +255,11 @@ let restart_tests = ] let readonly_tests = - [ ("add", `Quick, readonly); ("read after clear", `Quick, readonly_clear) ] + [ + ("add", `Quick, readonly); + ("read after clear", `Quick, readonly_clear); + ("read after write in batches", `Quick, readonly_read_rw_write); + ] let close_tests = [