diff --git a/src/dreyfus_index.erl b/src/dreyfus_index.erl index 19dce4f..e36eb13 100644 --- a/src/dreyfus_index.erl +++ b/src/dreyfus_index.erl @@ -157,6 +157,24 @@ handle_call(info, _From, State) -> % obsolete Reply = info_int(State#state.index_pid), {reply, Reply, State}. +handle_cast({ddoc_updated, DDocResult}, #state{} = State) -> + #index{sig = Sig} = State#state.index, + KeepIndexProcess = case DDocResult of + {not_found, deleted} -> + false; + {ok, DDoc} -> + Indexes = design_doc_to_indexes(DDoc), + % find if any index of new DDoc has the same Sig + lists:foldl(fun(#index{sig=SigNew}, Acc) -> + (SigNew == Sig) or Acc + end, false, Indexes) + end, + case KeepIndexProcess of + false -> + {stop, {shutdown, ddoc_updated}, State}; + true -> + {noreply, State} + end; handle_cast(_Msg, State) -> {noreply, State}. @@ -214,6 +232,9 @@ handle_info({'DOWN',_,_,Pid,Reason}, #state{ [gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList], {stop, normal, State}. +terminate({shutdown, ddoc_updated}, State) -> + Waiters = State#state.waiting_list, + [gen_server:reply(From, ddoc_updated) || {From, _} <- Waiters]; terminate(_Reason, _State) -> ok. diff --git a/src/dreyfus_index_manager.erl b/src/dreyfus_index_manager.erl index 2575294..c79a416 100644 --- a/src/dreyfus_index_manager.erl +++ b/src/dreyfus_index_manager.erl @@ -21,6 +21,7 @@ -define(BY_SIG, dreyfus_by_sig). -define(BY_PID, dreyfus_by_pid). +-define(BY_DB, dreyfus_by_db). % public api. -export([start_link/0, get_index/2, get_disk_size/2]). @@ -44,8 +45,9 @@ get_disk_size(DbName, Index) -> % gen_server functions. init([]) -> - ets:new(?BY_SIG, [set, private, named_table]), + ets:new(?BY_SIG, [set, protected, named_table]), ets:new(?BY_PID, [set, private, named_table]), + ets:new(?BY_DB, [bag, protected, named_table]), couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), process_flag(trap_exit, true), {ok, nil}. @@ -61,20 +63,22 @@ handle_call({get_index, DbName, #index{sig=Sig}=Index}, From, State) -> ets:insert(?BY_SIG, {{DbName, Sig}, [From | WaitList]}), {noreply, State}; [{_, ExistingPid}] -> + DDocId = Index#index.ddoc_id, + ets:insert(?BY_DB, {DbName, {DDocId, Sig}}), {reply, {ok, ExistingPid}, State} end; -handle_call({get_disk_size, DbName, #index{sig=Sig}=Index}, From, State) -> +handle_call({get_disk_size, DbName, #index{sig=Sig}}, _From, State) -> Path = <>, Reply = clouseau_rpc:disk_size(Path), {reply, Reply, State}; -handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> +handle_call({open_ok, DbName, DDocId, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), - add_to_ets(NewPid, DbName, Sig), + add_to_ets(NewPid, DbName, DDocId, Sig), {reply, ok, State}; handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) -> @@ -86,6 +90,9 @@ handle_call({open_error, DbName, Sig, Error}, {OpenerPid, _}, State) -> handle_cast({cleanup, DbName}, State) -> clouseau_rpc:cleanup(DbName), + {noreply, State}; +handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) -> + ets:delete_object(?BY_DB, {DbName, {DDocId, Sig}}), {noreply, State}. handle_info({'EXIT', FromPid, Reason}, State) -> @@ -115,19 +122,63 @@ code_change(_OldVsn, nil, _Extra) -> % private functions -handle_db_event(DbName, created, _St) -> +handle_db_event(DbName, created, St) -> gen_server:cast(?MODULE, {cleanup, DbName}), - {ok, nil}; -handle_db_event(DbName, deleted, _St) -> + {ok, St}; +handle_db_event(DbName, deleted, St) -> gen_server:cast(?MODULE, {cleanup, DbName}), - {ok, nil}; -handle_db_event(_DbName, _Event, _St) -> - {ok, nil}. - -new_index(DbName, #index{sig=Sig}=Index) -> + {ok, St}; +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, + DDocId}, St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + DbShards = [mem3:name(Sh) || Sh <- mem3:local_shards(mem3:dbname(DbName))], + lists:foreach(fun(DbShard) -> + lists:foreach(fun({_DbShard, {_DDocId, Sig}}) -> + % check if there are other ddocs with the same Sig for the same db + SigDDocs = ets:match_object(?BY_DB, {DbShard, {'_', Sig}}), + if length(SigDDocs) > 1 -> + % remove a record from this DDoc from ?BY_DB + Args = [DbShard, DDocId, Sig], + gen_server:cast(?MODULE, {rem_from_ets, Args}); + true -> + % single DDoc with this Sig - maybe close dreyfus_index process + case ets:lookup(?BY_SIG, {DbShard, Sig}) of + [{_, IndexPid}] -> (catch + gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); + [] -> [] + end + end + end, ets:match_object(?BY_DB, {DbShard, {DDocId, '_'}})) + end, DbShards), + {ok, St}; +handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> + DDocResult = couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) + end), + lists:foreach(fun({_DbName, {_DDocId, Sig}}) -> + SigDDocs = ets:match_object(?BY_DB, {DbName, {'_', Sig}}), + if length(SigDDocs) > 1 -> + Args = [DbName, DDocId, Sig], + gen_server:cast(?MODULE, {rem_from_ets, Args}); + true -> + case ets:lookup(?BY_SIG, {DbName, Sig}) of + [{_, IndexPid}] -> (catch + gen_server:cast(IndexPid, {ddoc_updated, DDocResult})); + [] -> [] + end + end + end, ets:match_object(?BY_DB, {DbName, {DDocId, '_'}})), + {ok, St}; +handle_db_event(_DbName, _Event, St) -> + {ok, St}. + + +new_index(DbName, #index{ddoc_id=DDocId, sig=Sig}=Index) -> case (catch dreyfus_index:start_link(DbName, Index)) of {ok, NewPid} -> - Msg = {open_ok, DbName, Sig, NewPid}, + Msg = {open_ok, DbName, DDocId, Sig, NewPid}, ok = gen_server:call(?MODULE, Msg, infinity), unlink(NewPid); Error -> @@ -135,11 +186,13 @@ new_index(DbName, #index{sig=Sig}=Index) -> ok = gen_server:call(?MODULE, Msg, infinity) end. -add_to_ets(Pid, DbName, Sig) -> +add_to_ets(Pid, DbName, DDocId, Sig) -> true = ets:insert(?BY_PID, {Pid, {DbName, Sig}}), - true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}). + true = ets:insert(?BY_SIG, {{DbName, Sig}, Pid}), + true = ets:insert(?BY_DB, {DbName, {DDocId, Sig}}). delete_from_ets(Pid, DbName, Sig) -> + true = ets:match_delete(?BY_DB, {DbName, {'_', Sig}}), true = ets:delete(?BY_PID, Pid), true = ets:delete(?BY_SIG, {DbName, Sig}). diff --git a/test/search_index_ddoc_updated_tests.erl b/test/search_index_ddoc_updated_tests.erl new file mode 100644 index 0000000..ca2c55a --- /dev/null +++ b/test/search_index_ddoc_updated_tests.erl @@ -0,0 +1,254 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(search_index_ddoc_updated_tests). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("dreyfus/include/dreyfus.hrl"). + +-define(DDOCID1, <<"_design/ddoc1">>). +-define(DDOCID2, <<"_design/ddoc2">>). +-define(DDOCID3, <<"_design/ddoc3">>). + +-define(IDXNAME, <<"index1">>). +-define(IDXNAME2, <<"index2">>). + +-define(DDOC1, couch_doc:from_json_obj({[ + {<<"_id">>,?DDOCID1}, + {<<"indexes">>, {[ + {?IDXNAME, {[ + {<<"index">>, + <<"function(doc){if(doc.f){index(\\\"f\\\", doc.f);}}">>} + ]}} + ]}} +]})). + +-define(DDOC2, couch_doc:from_json_obj({[ + {<<"_id">>, ?DDOCID2}, + {<<"indexes">>, {[ + {?IDXNAME, {[ + {<<"index">>, + <<"function(doc){if(doc.f){index(\\\"f\\\", doc.f);}}">>} + ]}} + ]}} +]})). + +-define(DDOC3, couch_doc:from_json_obj({[ + {<<"_id">>, ?DDOCID3}, + {<<"indexes">>, {[ + {?IDXNAME, {[ + {<<"index">>, + <<"function(doc){if(doc.f1){index(\\\"f1\\\", doc.f1);}}">>} + ]}}, + {?IDXNAME2, {[ + {<<"index">>, + <<"function(doc){if(doc.f2){index(\\\"f2\\\", doc.f2);}}">>} + ]}} + ]}} +]})). + + +-define(DDOC31IND, + {[ + {?IDXNAME, {[ + {<<"index">>, + <<"function(doc){if(doc.f11){index(\\\"f11\\\", doc.f11);}}">>} + ]}}, + {?IDXNAME2, {[ + {<<"index">>, + <<"function(doc){if(doc.f2){index(\\\"f2\\\", doc.f2);}}">>} + ]}} + ]} +). + + + +setup() -> + Name = ?tempdb(), + couch_server:delete(Name, [?ADMIN_CTX]), + {ok, Db} = couch_db:create(Name, [?ADMIN_CTX]), + Db. + +teardown(Db) -> + couch_db:close(Db), + couch_server:delete(Db#db.name, [?ADMIN_CTX]), + ok. + + +ddoc_update_test_() -> + { + "Check ddoc update actions", + { + setup, + fun() -> + Ctx = test_util:start_couch([dreyfus]), + fake_rexi(), + fake_clouseau(), + fake_dreyfus_index(), + Ctx + end, + fun(Ctx) -> + (catch meck:unload(rexi)), + (catch meck:unload(clouseau_rpc)), + (catch meck:unload(dreyfus_index)), + test_util:stop_couch(Ctx) + end, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_stop_indexes_on_delete_single_ddoc/1, + fun should_not_stop_indexes_on_delete_multiple_ddoc/1, + fun should_stop_indexes_on_update/1 + ] + } + } + }. + + +should_stop_indexes_on_delete_single_ddoc(Db0) -> + ?_test(begin + {ok, _} = couch_db:update_docs(Db0, [?DDOC1], []), + {ok, Db} = couch_db:reopen(Db0), + {ok, DDoc1} = couch_db:open_doc( + Db, ?DDOCID1, [ejson_body, ?ADMIN_CTX]), + + dreyfus_rpc:call( + search, Db#db.name, DDoc1, ?IDXNAME, #index_query_args{}), + IndsBefore = get_indexes_by_ddoc(Db#db.name, ?DDOCID1, 1), + ?assertEqual(1, length(IndsBefore)), + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(1, length(AliveBefore)), + + % delete DDoc1 + DDocJson11 = couch_doc:from_json_obj({[ + {<<"_id">>, ?DDOCID1}, + {<<"_deleted">>, true}, + {<<"_rev">>, couch_doc:rev_to_str(DDoc1#doc.revs)} + ]}), + {ok, _} = couch_db:update_doc(Db, DDocJson11, []), + + %% assert that previously running indexes are gone + IndsAfter = get_indexes_by_ddoc(Db#db.name, ?DDOCID1, 0), + ?assertEqual(0, length(IndsAfter)), + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(0, length(AliveAfter)) + end). + + +should_not_stop_indexes_on_delete_multiple_ddoc(Db0) -> + ?_test(begin + % create DDOC1 and DDOC2 with the same Sig + {ok, _} = couch_db:update_docs(Db0, [?DDOC1, ?DDOC2], []), + {ok, Db} = couch_db:reopen(Db0), + {ok, DDoc1} = couch_db:open_doc( + Db, ?DDOCID1, [ejson_body, ?ADMIN_CTX]), + {ok, DDoc2} = couch_db:open_doc( + Db, ?DDOCID2, [ejson_body, ?ADMIN_CTX]), + + dreyfus_rpc:call( + search, Db#db.name, DDoc1, ?IDXNAME, #index_query_args{}), + dreyfus_rpc:call( + search, Db#db.name, DDoc2, ?IDXNAME, #index_query_args{}), + IndsBefore = get_indexes_by_ddoc(Db#db.name, ?DDOCID1, 1), + ?assertEqual(1, length(IndsBefore)), + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(1, length(AliveBefore)), + + % delete DDoc1 + DDocJson11 = couch_doc:from_json_obj({[ + {<<"_id">>, ?DDOCID1}, + {<<"_deleted">>, true}, + {<<"_rev">>, couch_doc:rev_to_str(DDoc1#doc.revs)} + ]}), + {ok, _} = couch_db:update_doc(Db, DDocJson11, []), + + %% assert that previously running indexes are still there + IndsAfter = get_indexes_by_ddoc(Db#db.name, ?DDOCID1, 1), + ?assertEqual(1, length(IndsAfter)), + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(1, length(AliveAfter)) + end). + + +should_stop_indexes_on_update(Db0) -> + ?_test(begin + {ok, _} = couch_db:update_docs(Db0, [?DDOC3], []), + {ok, Db} = couch_db:reopen(Db0), + {ok, DDoc3} = couch_db:open_doc( + Db, ?DDOCID3, [ejson_body, ?ADMIN_CTX]), + + dreyfus_rpc:call( + search, Db#db.name, DDoc3, ?IDXNAME, #index_query_args{}), + dreyfus_rpc:call( + search, Db#db.name, DDoc3, ?IDXNAME2, #index_query_args{}), + IndsBefore = get_indexes_by_ddoc(Db#db.name, ?DDOCID3, 2), + ?assertEqual(2, length(IndsBefore)), + AliveBefore = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(2, length(AliveBefore)), + + % update <<"index1">> of DDoc3 + DDocJson31 = couch_doc:from_json_obj({[ + {<<"_id">>, ?DDOCID3}, + {<<"indexes">>, ?DDOC31IND}, + {<<"_rev">>, couch_doc:rev_to_str(DDoc3#doc.revs)} + ]}), + {ok, _} = couch_db:update_doc(Db, DDocJson31, []), + + %% assert that one index process is gone (for <<"index1">>), + %% and one is still running (for <<"index2>>") + IndsAfter = get_indexes_by_ddoc(Db#db.name, ?DDOCID3, 1), + ?assertEqual(1, length(IndsAfter)), + AliveAfter = lists:filter(fun erlang:is_process_alive/1, IndsBefore), + ?assertEqual(1, length(AliveAfter)) + end). + + +fake_rexi() -> + meck:new([rexi]), + meck:expect(rexi, reply, fun(Msg) -> Msg end). + + +fake_clouseau() -> + ok = meck:new([clouseau_rpc], [non_strict]), + ok = meck:expect(clouseau_rpc, open_index, ['_', '_', '_'], {ok, self()}), + ok = meck:expect(clouseau_rpc, get_update_seq, ['_'], {ok, 10}). + + +fake_dreyfus_index() -> + ok = meck:new([dreyfus_index], [passthrough]), + ok = meck:expect(dreyfus_index, await, ['_', '_'], {ok, 0, 0}), + ok = meck:expect(dreyfus_index, search, ['_', '_'], + {ok, #top_docs{ + update_seq = 10, + total_hits = 0, + hits = []}}). + + +get_indexes_by_ddoc(DbName, DDocID, N) -> + Indexes = test_util:wait(fun() -> + Idxs = ets:match_object( + dreyfus_by_db, {DbName, {DDocID, '_'}}), + case length(Idxs) == N of + true -> + Idxs; + false -> + wait + end + end), + lists:foldl(fun({DBName, {_DDocID, Sig}}, Acc) -> + case ets:lookup(dreyfus_by_sig, {DBName, Sig}) of + [{_, Pid}] -> [Pid|Acc]; + _ -> Acc + end + end, [], Indexes).