From 5d534a4ea7ccea9aa77b6b25a851e99f65e06807 Mon Sep 17 00:00:00 2001 From: Levi McAuley Date: Fri, 26 Feb 2016 17:35:46 -0800 Subject: [PATCH] Continue to reopen DB and enum changes until DB reopens up-to-date --- src/couch_index_updater.erl | 134 +++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 54 deletions(-) diff --git a/src/couch_index_updater.erl b/src/couch_index_updater.erl index ad48f40..b231102 100644 --- a/src/couch_index_updater.erl +++ b/src/couch_index_updater.erl @@ -119,8 +119,6 @@ code_change(_OldVsn, State, _Extra) -> update(Idx, Mod, IdxState) -> - DbName = Mod:get(db_name, IdxState), - CurrSeq = Mod:get(update_seq, IdxState), UpdateOpts = Mod:get(update_options, IdxState), CommittedOnly = lists:member(committed_only, UpdateOpts), IncludeDesign = lists:member(include_design, UpdateOpts), @@ -129,73 +127,101 @@ update(Idx, Mod, IdxState) -> _ -> [conflicts, deleted_conflicts] end, - couch_util:with_db(DbName, fun(Db) -> - DbUpdateSeq = couch_db:get_update_seq(Db), - DbCommittedSeq = couch_db:get_committed_update_seq(Db), + GetSeq = fun + (#full_doc_info{update_seq=Seq}) -> Seq; + (#doc_info{high_seq=Seq}) -> Seq + end, - PurgedIdxState = case purge_index(Db, Mod, IdxState) of - {ok, IdxState0} -> IdxState0; - reset -> exit({reset, self()}) - end, + GetInfo = fun + (#full_doc_info{id=Id, update_seq=Seq, deleted=Del}=FDI) -> + {Id, Seq, Del, couch_doc:to_doc_info(FDI)}; + (#doc_info{id=Id, high_seq=Seq, revs=[RI|_]}=DI) -> + {Id, Seq, RI#rev_info.deleted, DI} + end, - NumChanges = couch_db:count_changes_since(Db, CurrSeq), + LoadDoc = fun(Db, DI) -> + {DocId, Seq, Deleted, DocInfo} = GetInfo(DI), - GetSeq = fun - (#full_doc_info{update_seq=Seq}) -> Seq; - (#doc_info{high_seq=Seq}) -> Seq - end, + case {IncludeDesign, DocId} of + {false, <<"_design/", _/binary>>} -> + {nil, Seq}; + _ when Deleted -> + {#doc{id=DocId, deleted=true}, Seq}; + _ -> + {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts), + {Doc, Seq} + end + end, - GetInfo = fun - (#full_doc_info{id=Id, update_seq=Seq, deleted=Del}=FDI) -> - {Id, Seq, Del, couch_doc:to_doc_info(FDI)}; - (#doc_info{id=Id, high_seq=Seq, revs=[RI|_]}=DI) -> - {Id, Seq, RI#rev_info.deleted, DI} - end, + EnumProc = fun(Db, DbCommittedSeq, DocInfo, IdxStateAcc) -> + case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of + true -> + {stop, {IdxStateAcc, false}}; + false -> + {Doc, Seq} = LoadDoc(Db, DocInfo), + {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc), + garbage_collect(), + {ok, {NewSt, true}} + end + end, + + UpdateOnceProc = fun(Db0, IdxState0) -> + {ok, Db} = couch_db:reopen(Db0), - LoadDoc = fun(DI) -> - {DocId, Seq, Deleted, DocInfo} = GetInfo(DI), - - case {IncludeDesign, DocId} of - {false, <<"_design/", _/binary>>} -> - {nil, Seq}; - _ when Deleted -> - {#doc{id=DocId, deleted=true}, Seq}; - _ -> - {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts), - {Doc, Seq} - end + DbUpdateSeq = couch_db:get_update_seq(Db), + DbCommittedSeq = couch_db:get_committed_update_seq(Db), + DbRelevantSeq = if + CommittedOnly -> DbCommittedSeq; + true -> DbUpdateSeq end, - Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> - case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of - true -> - {stop, {IdxStateAcc, false}}; - false -> - {Doc, Seq} = LoadDoc(DocInfo), - {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc), - garbage_collect(), - {ok, {NewSt, true}} - end + PurgedIdxState = case purge_index(Db, Mod, IdxState0) of + {ok, St} -> St; + reset -> exit({reset, self()}) end, - {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), - Acc0 = {InitIdxState, true}, - {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), - {ProcIdxSt, SendLast} = Acc, + CurrSeq = Mod:get(update_seq, PurgedIdxState), - % If we didn't bail due to hitting the last committed seq we need - % to send our last update_seq through. - {ok, LastIdxSt} = case SendLast of - true -> - Mod:process_doc(nil, DbUpdateSeq, ProcIdxSt); + case CurrSeq of + DbUpdateSeq -> + {Db, PurgedIdxState, false}; _ -> - {ok, ProcIdxSt} - end, + Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> + EnumProc(Db, DbCommittedSeq, DocInfo, IdxStateAcc) + end, + Acc0 = {PurgedIdxState, true}, + {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), + {MaybeLastIdxState, SendLast} = Acc, + + % If we didn't bail due to hitting the last committed seq we need + % to send our last update_seq through. + {ok, LastIdxState} = case SendLast of + true -> Mod:process_doc(nil, DbUpdateSeq, MaybeLastIdxState); + _ -> {ok, MaybeLastIdxState} + end, + + {Db, LastIdxState, true} + end + end, + + update(Idx, Mod, IdxState, UpdateOnceProc). - {ok, FinalIdxState} = Mod:finish_update(LastIdxSt), + +update(Idx, Mod, OuterIdxState, UpdateOnceProc) -> + DbName = Mod:get(db_name, OuterIdxState), + couch_util:with_db(DbName, fun(Db) -> + {ok, InitialIdxState} = Mod:start_update(Idx, OuterIdxState, 0), + ModifiedIdxState = update_until_done(Db, InitialIdxState, UpdateOnceProc), + {ok, FinalIdxState} = Mod:finish_update(ModifiedIdxState), exit({updated, self(), FinalIdxState}) end). +update_until_done(Db0, IdxState0, UpdateOnceProc) -> + {Db, IdxState, IsLast} = UpdateOnceProc(Db0, IdxState0), + case IsLast of + true -> IdxState; + _ -> update_until_done(Db, IdxState, UpdateOnceProc) + end. purge_index(Db, Mod, IdxState) -> DbPurgeSeq = couch_db:get_purge_seq(Db),