From 0af26c386bfefefd40bc73402a88550c19694b6a Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 3 Aug 2013 01:10:26 +0200 Subject: [PATCH 01/12] dict instead of list to update file locks --- src/mi_locks.erl | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/src/mi_locks.erl b/src/mi_locks.erl index 7ae0944..136fdb0 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -38,49 +38,40 @@ funs=[] }). -new() -> []. +new() -> dict:new(). claim_many(Keys, Locks) -> lists:foldl(fun claim/2, Locks, Keys). claim(Key, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - Lock = #lock { count=Count } -> - NewLock = Lock#lock { count=Count + 1 }, - lists:keystore(Key, #lock.key, Locks, NewLock); - - false -> - NewLock = #lock { key=Key, count=1, funs=[] }, - lists:keystore(Key, #lock.key, Locks, NewLock) + case dict:find(Key,Locks) of + {ok,#lock{count=Count}=Lock} -> + dict:store(Key,Lock#lock{count=Count + 1},Locks); + error -> + dict:store(Key,#lock{key=Key,count=1,funs=[]},Locks) end. release(Key, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - #lock { count=1, funs=Funs } -> + case dict:find(Key,Locks) of + {ok,#lock{count=1,funs=Funs}} -> [X() || X <- Funs], - lists:keydelete(Key, #lock.key, Locks); - - Lock = #lock { count=Count } -> - NewLock = Lock#lock { count = Count - 1 }, - lists:keystore(Key, #lock.key, Locks, NewLock); - - false -> + dict:erase(Key,Locks); + {ok,#lock{count=Count}=Lock} -> + dict:store(Key,Lock#lock{count=Count - 1},Locks); + error -> throw({lock_does_not_exist, Key}) end. %% Run the provided function when the key is free. If the key is %% currently free, then this is run immeditaely. when_free(Key, Fun, Locks) -> - case lists:keyfind(Key, #lock.key, Locks) of - false -> + case dict:find(Key,Locks) of + error -> Fun(), Locks; - - #lock { count=0, funs=Funs } -> + {ok,#lock{count=0,funs=Funs}} -> [X() || X <- [Fun|Funs]], - lists:keydelete(Key, #lock.key, Locks); - - Lock = #lock { funs=Funs} -> - NewLock = Lock#lock { funs=[Fun|Funs] }, - lists:keystore(Key, #lock.key, Locks, NewLock) + dict:erase(Key,Locks); + {ok,#lock{funs=Funs}=Lock} -> + dict:store(Key,Lock#lock{funs=[Fun|Funs]},Locks) end. From 3d5477e3339847e9037c8cbd118fa1f2e3297ae4 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 06:24:25 +0200 Subject: [PATCH 02/12] new segment selection algorithm - the same as cassandra : SizeTieredCompactionStrategy https://github.com/hobinyoon/apache-cassandra-1.2.9-src/blob/bb512f49bacb9425e1694cea1c1d89ba7bb455aa/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java --- src/mi_server.erl | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/mi_server.erl b/src/mi_server.erl index 79418b8..7b22a07 100644 --- a/src/mi_server.erl +++ b/src/mi_server.erl @@ -745,6 +745,34 @@ group_iterator(Iterator, eof) -> clear_deleteme_flag(Filename) -> file:delete(Filename ++ ?DELETEME_FLAG). +%% Figure out which files to merge. Take the average of file sizes, +%% return anything smaller than the average for merging. +get_segments_to_merge(Segments) -> + %% sort segs to group them in average size groups in a deterministic way + Buckets = get_buckets(lists:sort([{mi_segment:filesize(X),X} || X <- Segments])), + PrunedBuckets = dict:fold( + fun (_,Bucket,Acc0) when length(Bucket) > 4 -> Acc0; + (_,Bucket,Acc0)-> + ToMerge = lists:sublist(Bucket, min(length(Bucket),80)), + Avg = lists:sum([Size || {Size,_}<-Bucket]) div length(Bucket), + [{Avg,ToMerge}|Acc0] + end, [],Buckets) + [{_,Segs}|_] = lists:sort(PrunedBuckets), + Segs + +get_buckets(SortedSizedSegments)-> + lists:foldl(fun({Size,_}=Seg,Acc0)-> + NotSimilar = fun({AverageSize,Bucket})-> + (Size < AverageSize*0.5 or Size > AverageSize*1.5) and (Size > 50000000 or AverageSize > 50000000) + end, + case lists:dropwhile(NotSimilar,dict:to_list(Acc0)) of % if a bucket is similar, add seg to bucket and change averagesize + [{AverageSize,Bucket}|_] -> NbSeg = length(Bucket), + dict:store((AverageSize*NbSeg+Size)/(NbSeg+1),[Seg|Bucket],dict:erase(AverageSize,Acc0)); + [] -> % else create a single bucket with the seg + dict:store(Size,[Seg],Acc0) + end + end,dict:new(),SortedSizedSegments). + fold_itr(_Fun, Acc, eof) -> Acc; fold_itr(Fun, Acc, {Term, IteratorFun}) -> fold_itr(Fun, Fun(Term, Acc), IteratorFun()). From ae577ee403198a441f998763d72d343fb3fbe8b0 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 15:35:29 +0200 Subject: [PATCH 03/12] Big commit untested for new compaction scheduler The goal is to allow parallel compactions. First try, without even try to compile : - throughput controlled scheduler with throttling - add lock management per segment compaction - add configuration of several params --- src/mi_locks.erl | 24 +++++- src/mi_scheduler.erl | 111 +++++++++++++++++++-------- src/mi_server.erl | 175 ++++++++++++++++++++++--------------------- 3 files changed, 188 insertions(+), 122 deletions(-) diff --git a/src/mi_locks.erl b/src/mi_locks.erl index 136fdb0..f6b552f 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -33,8 +33,8 @@ ]). -record(lock, { - key, count, + not_compacting, funs=[] }). @@ -48,7 +48,7 @@ claim(Key, Locks) -> {ok,#lock{count=Count}=Lock} -> dict:store(Key,Lock#lock{count=Count + 1},Locks); error -> - dict:store(Key,#lock{key=Key,count=1,funs=[]},Locks) + dict:store(Key,#lock{count=1,funs=[]},Locks) end. release(Key, Locks) -> @@ -75,3 +75,23 @@ when_free(Key, Fun, Locks) -> {ok,#lock{funs=Funs}=Lock} -> dict:store(Key,Lock#lock{funs=[Fun|Funs]},Locks) end. + +claim_compact(Key,Locks) -> + case dict:find(Key,Locks) of + {ok,#lock{not_compacting=false}} -> Locks + {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=false},Locks); + error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=false},Locks) + end. + +release_compact(Key,Locks) -> + case dict:find(Key,Locks) of + {ok,#lock{not_compacting=true}} -> Locks + {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=true},Locks); + error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=true},Locks) + end. + +is_compact_free(Key,Locks) -> + case dict:find(Key,Locks) of + {ok,Lock} -> Lock#lock.not_compacting + error -> true + end. diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index ef8d974..cbb52ec 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -38,6 +38,7 @@ terminate/2, code_change/3]). -record(state, { queue, + ready, worker }). %% ==================================================================== @@ -62,16 +63,23 @@ init([]) -> %% Trap exits of the actual worker process process_flag(trap_exit, true), + WantedThroughput = application:get_env(merge_index, compaction_throughput_mb_per_sec, 30), + %% Use a dedicated worker sub-process to do the actual merging. The %% process may ignore messages for a long while during the compaction %% and we want to ensure that our message queue doesn't fill up with %% a bunch of dup requests for the same directory. - Self = self(), - WorkerPid = spawn_link(fun() -> worker_loop(Self) end), - {ok, #state{ queue = queue:new(), - worker = WorkerPid }}. -handle_call({schedule_compaction, Pid}, _From, #state { queue = Q } = State) -> + WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(self(),WantedThroughput)) end), + {ok, #state{ queue = queue:new(), + worker = WorkerPid, + ready = true}}. + +handle_call({schedule_compaction, Pid}, _From, #state { ready = true, worker = WorkerPid } = State) -> + WorkerPid ! {compaction, Pid}, + {reply, ok, State#state {ready = false}}; + +handle_call({schedule_compaction, Pid}, _From, #state { ready = false, queue = Q } = State) -> case queue:member(Pid, Q) of true -> {reply, already_queued, State}; @@ -91,18 +99,18 @@ handle_cast(Msg, State) -> handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) -> case queue:out(Q) of {empty, Q} -> - {noreply, State}; + {noreply, State#state{ ready = true }}; {{value, Pid}, NewQ} -> WorkerPid ! {compaction, Pid}, - NewState = State#state { queue=NewQ }, + NewState = State#state { queue=NewQ , ready = false }, {noreply, NewState} end; handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) -> lager:error("Compaction worker ~p exited: ~p", [WorkerPid, Reason]), %% Start a new worker. - Self=self(), - NewWorkerPid = spawn_link(fun() -> worker_loop(Self) end), - NewState = State#state { worker=NewWorkerPid }, + WantedThroughput = application:get_env(merge_index, compaction_throughput_mb_per_sec, 30), + NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(self(),WantedThroughput)) end), + NewState = State#state { worker=NewWorkerPid , ready = true}, {noreply, NewState}; handle_info(Info, State) -> @@ -119,31 +127,68 @@ code_change(_OldVsn, State, _Extra) -> %% Internal worker %% ==================================================================== -worker_loop(Parent) -> - Parent ! {worker_ready, self()}, +%% THROTTLING: Run M processes in N seconds waiting ms_before_replace(Ring,N) before doing something +%% and replace_oldest (impl is a set of size M of timestamps representing running processes. +%% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx} +new_timering(M)->{make_tuple(M,now()),1}. +replace_oldest({Set,Idx})-> + {insert_element(Idx,Set,now()),case Idx+1 of X when X>tuple_size(Set) ->1; X->X end}. +ms_before_replace({Set,Idx},N)-> + max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). + +-define(TIMERING_SIZE, 10). +-define(TIMERING_SPAN_INIT, 10). +-record(wstate, { parent, + wanted_throughput, + timering, + timering_span, + test_start, + test_compactions}). + +worker_init_state(Parent,WantedThroughput)-> + #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),wanted_throughput=WantedThroughput, + timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]} + +worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, + test_compactions=TestCompactions}=State) when length(TestCompactions)==?TIMERING_SIZE -> + TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]), + TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]), + TestElapsedSecs = timer:now_diff(os:timestamp(), TestStart) / 1000000, + Throughput = TestBytes/TestElapsedSecs/(1024*1024), + lager:info("Overall Compaction: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", + [TestSegments, TestBytes, TestElapsedSecs, Throughput]), + worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); + +worker_loop(#wstate{parent=Parent,timering=TimeRing,timering_span=TimeRingSpan, + test_start=TestStart,test_compactions=TestCompactions}=State) -> + Worker = self(), receive + {compaction_res,Result}-> + ?MODULE:worker_loop(State#wstate{test_compactions=[Result|TestCompactions]}); {compaction, Pid} -> - Start = os:timestamp(), - Result = merge_index:compact(Pid), - ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, - case Result of - {ok, OldSegments, OldBytes} -> - case ElapsedSecs > 1 of - true -> - lager:info( - "Pid ~p compacted ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", - [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); - false -> - ok - end; - - {Error, Reason} when Error == error; Error == 'EXIT' -> - lager:error("Failed to compact ~p: ~p", [Pid, Reason]) - end, - ?MODULE:worker_loop(Parent); + spawn_link(fun()-> + Start = os:timestamp(), + Result = merge_index:compact(Pid), + Worker ! {compaction_res,Result} + ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, + case Result of + {ok, OldSegments, OldBytes} -> + case ElapsedSecs > 1 of + true -> + lager:info( + "Single Compaction ~p: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", + [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); + false -> + ok + end; + + {Error, Reason} when Error == error; Error == 'EXIT' -> + lager:error("Failed to compact ~p: ~p", [Pid, Reason]) + end, + end), + send_after(ms_before_replace(TimeRing,TimeRingSpan),Parent,{worker_ready,Worker}), + ?MODULE:worker_loop(State#wstate{timering=replace_oldest(TimeRing)}); _ -> %% ignore unknown messages - ?MODULE:worker_loop(Parent) - after 1000 -> - ?MODULE:worker_loop(Parent) + ?MODULE:worker_loop(State) end. diff --git a/src/mi_server.erl b/src/mi_server.erl index 7b22a07..197d6f2 100644 --- a/src/mi_server.erl +++ b/src/mi_server.erl @@ -59,7 +59,7 @@ segments, buffers, next_id, - is_compacting, + config, lookup_range_pids, buffer_rollover_size, converter, @@ -74,6 +74,14 @@ segments }). +-record(config, { + max_compact_segments, + bucket_low, + bucket_high, + min_segment_size + }). +-define(MIN_COMPACT_SEGMENTS,5). + -define(RESULTVEC_SIZE, 1000). -define(DELETEME_FLAG, ".deleted"). @@ -146,6 +154,15 @@ init([Root]) -> %% don't pull down this merge_index if they fail process_flag(trap_exit, true), + %% Cache config in order to avoid application:get_env overhead + SegSimilarityRatio = application:get_env(merge_index, segment_similarity_ratio, 0.5), + Config = #config{ + max_compact_segments=application:get_env(merge_index, max_compact_segments, 20), + min_segment_size=application:get_env(merge_index, min_segment_size, 50000000), + bucket_low=1-SegSimilarityRatio, + bucket_high=1+SegSimilarityRatio + }, + %% Create the state... State = #state { root = Root, @@ -153,7 +170,8 @@ init([Root]) -> buffers = [Buffer], segments = Segments, next_id = NextID, - is_compacting = false, + config = Config, + compacting_pids = [], lookup_range_pids = [], buffer_rollover_size=fuzzed_rollover_size(), to_convert = queue:new() @@ -224,52 +242,40 @@ handle_call({index, Postings}, _From, State) -> end; handle_call(start_compaction, _From, State) - when is_tuple(State#state.is_compacting) -> - %% Don't compact if we are already compacting, or if we have fewer - %% than five open segments. + when length(State#state.segments) =< ?MIN_COMPACT_SEGMENTS -> + %% Don't compact if we have fewer than MIN_COMPACT_SEGMENTS {reply, {ok, 0, 0}, State}; handle_call(start_compaction, From, State) -> - %% Get list of segments to compact. Do this by getting filesizes, - %% and then lopping off files larger than the average. This could be - %% optimized with tuning, but probably a good enough solution. - Segments = State#state.segments, - {ok, MaxSegments} = application:get_env(merge_index, max_compact_segments), - {ok, {M,F}} = application:get_env(merge_index, compact_mod_fun), - SegmentsToCompact = case M:F(Segments) of - STC when length(STC) > MaxSegments -> - lists:sublist(STC, MaxSegments); - STC -> - STC - end, - - case SegmentsToCompact of - [] -> - {reply, {ok, 0, 0}, State}; - _ -> + %% Get list of segments to compact : as Cassandra SizeTieredCompactionStrategy + #state{segments=Segments,locks=Locks,config=Config,compacting_pids=CompactingPids} = State, + case get_segments_to_merge(Segments,Config,Locks) of + [] -> {reply, {ok, 0, 0}, State}; + SegmentsToCompact -> BytesToCompact = lists:sum([mi_segment:filesize(X) || X <- SegmentsToCompact]), %% Spawn a function to merge a bunch of segments into one... Pid = self(), - CF = - fun() -> - %% Create the group iterator... - SegmentIterators = [mi_segment:iterator(X) || X <- SegmentsToCompact], - GroupIterator = build_iterator_tree(SegmentIterators), - - %% Create the new compaction segment... - <> = erlang:md5(term_to_binary({now, make_ref()})), - SName = join(State, io_lib:format("segment.~.16B", [MD5])), - set_deleteme_flag(SName), - CompactSegment = mi_segment:open_write(SName), - - %% Run the compaction... - mi_segment:from_iterator(GroupIterator, CompactSegment), - gen_server:cast(Pid, {compacted, CompactSegment, SegmentsToCompact, BytesToCompact, From}) - end, - CompactingPid = spawn_opt(CF, [link, {fullsweep_after, 0}]), - {noreply, State#state { is_compacting={From, CompactingPid} }} - end; + CompactingPid = spawn_opt(fun() -> + %% Create the group iterator... + SegmentIterators = [mi_segment:iterator(X) || X <- SegmentsToCompact], + GroupIterator = build_iterator_tree(SegmentIterators), + + %% Create the new compaction segment... + <> = erlang:md5(term_to_binary({now, make_ref()})), + SName = join(State, io_lib:format("segment.~.16B", [MD5])), + set_deleteme_flag(SName), + CompactSegment = mi_segment:open_write(SName), + + %% Run the compaction... + mi_segment:from_iterator(GroupIterator, CompactSegment), + gen_server:cast(Pid, {compacted, CompactSegment, SegmentsToCompact, BytesToCompact, From}) + end, [link, {fullsweep_after, 0}]), + NewLocks = lists:foldl(fun (S,Acc)-> + mi_locks:claim_compact(mi_segment:filename(S),Acc) + end,Locks,SegmentsToCompact), + {noreply, State#state { compacting_pids=[{From, CompactingPid}|CompactingPids] , locks=NewLocks}} + end; handle_call({info, Index, Field, Term}, _From, State) -> %% Calculate the IFT... @@ -411,7 +417,7 @@ handle_call(Msg, _From, State) -> {reply, ok, State}. handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) -> - #state { locks=Locks, segments=Segments } = State, + #state { locks=Locks, segments=Segments , compacting_pids=CompactingPids } = State, %% Clean up. Remove delete flag on the new segment. Add delete %% flags to the old segments. Register to delete the old segments @@ -431,7 +437,7 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - NewState = State#state { locks=NewLocks, segments=[CompactSegmentRO|(Segments -- OldSegments)], - is_compacting=false + compacting_pids=lists:keydelete(From,1,CompactingPids) }, %% Tell the awaiting process that we've finished compaction. @@ -440,8 +446,7 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) -> #state { root=Root, locks=Locks, buffers=Buffers, segments=Segments, - to_convert=ToConvert, - is_compacting=IsCompacting } = State, + to_convert=ToConvert } = State, %% Clean up by clearing delete flag on the segment, adding delete %% flag to the buffer, and telling the system to delete the buffer @@ -481,14 +486,7 @@ handle_cast({buffer_to_segment, Buffer, SegmentWO}, State) -> }, %% Give us the opportunity to do a merge... - {ok, {M,F}} = application:get_env(merge_index, compact_mod_fun), - SegmentsToMerge = M:F(NewSegments), - case length(SegmentsToMerge) of - Num when Num =< 2 orelse is_tuple(IsCompacting) -> - ok; - _ -> - mi_scheduler:schedule_compaction(self()) - end, + mi_scheduler:schedule_compaction(self()), {noreply, NewState}; false -> lager:warning("`buffer_to_segment` cast received" @@ -501,27 +499,8 @@ handle_cast(Msg, State) -> lager:error("Unexpected cast ~p", [Msg]), {noreply, State}. -handle_info({'EXIT', CompactingPid, Reason}, - #state{is_compacting={From, CompactingPid}}=State) -> - %% the spawned compaction process exited - case Reason of - normal -> - %% compaction finished normally: nothing to be done - %% handle_call({compacted... already sent the reply - ok; - _ -> - %% compaction failed: not too much to worry about - %% (it should be safe to try again later) - %% but we need to let the compaction-requester know - %% that we're not compacting any more - gen_server:reply(From, {error, Reason}) - end, - - %% clear out compaction flags, so we try again when necessary - {noreply, State#state{is_compacting=false}}; - handle_info({'EXIT', Pid, Reason}, - #state{lookup_range_pids=SRPids}=State) -> + #state{lookup_range_pids=SRPids,compacting=Compacting}=State) -> case lists:keytake(Pid, #stream_range.pid, SRPids) of {value, SR, NewSRPids} -> @@ -553,8 +532,21 @@ handle_info({'EXIT', Pid, Reason}, {noreply, State#state { locks=NewLocks1, lookup_range_pids=NewSRPids }}; false -> - %% some random other process exited: ignore - {noreply, State} + case lists:keytake(Pid, 2, CompactingPids) of + {value, {From,_CompactingPid}, NewCompactingPids}-> + %% a spawned compaction process exited + case Reason of + normal -> ok; %% compaction finished normally: nothing to be done + _ -> %% compaction failed: not too much to worry about (it should be safe to try again later) + %% but we need to let the compaction-requester know + %% that we're not compacting any more + gen_server:reply(From, {error, Reason}) + end, + %% clear out compaction flags, so we try again when necessary + {noreply, State#state{compacting_pids=NewCompactingPids}}; + false -> %% some random other process exited: ignore + {noreply, State} + end end; handle_info(Msg, State) -> @@ -745,25 +737,34 @@ group_iterator(Iterator, eof) -> clear_deleteme_flag(Filename) -> file:delete(Filename ++ ?DELETEME_FLAG). -%% Figure out which files to merge. Take the average of file sizes, -%% return anything smaller than the average for merging. -get_segments_to_merge(Segments) -> - %% sort segs to group them in average size groups in a deterministic way - Buckets = get_buckets(lists:sort([{mi_segment:filesize(X),X} || X <- Segments])), +%% Figure out which files to merge +get_segments_to_merge(Segments,#{max_compact_segments=MaxCompactSegments}=Config,Locks) -> + NotCompactingSegments = lists:filter(fun(S)-> + mi_locks:is_compact_free(mi_segment:filename(S),Locks) + end,Segments), + %% Group segments by similar size (buckets) + Buckets = get_buckets(Segments,Config), + %% Take only the groups > min_compact_segments and take the max_compact_segments firsts PrunedBuckets = dict:fold( - fun (_,Bucket,Acc0) when length(Bucket) > 4 -> Acc0; + fun (_,Bucket,Acc0) when length(Bucket) < ?MIN_COMPACT_SEGMENTS -> Acc0; (_,Bucket,Acc0)-> - ToMerge = lists:sublist(Bucket, min(length(Bucket),80)), + SortedBucket = lists:reverse(Bucket), %% alreay sorted by construction but reversed + ToMerge = lists:sublist(SortedBucket, min(length(Bucket),MaxCompactSegments)), Avg = lists:sum([Size || {Size,_}<-Bucket]) div length(Bucket), [{Avg,ToMerge}|Acc0] - end, [],Buckets) - [{_,Segs}|_] = lists:sort(PrunedBuckets), - Segs + end, [],Buckets), + %% then take the group with the smallest segment average size + case lists:sort(PrunedBuckets) of + [] -> []; + [{_,Segs}|_] = [Seg || {_,Seg}<-Segs] + end. -get_buckets(SortedSizedSegments)-> +get_buckets(Segments,#config{bucket_low=BucketLow, bucket_high=BucketHigh, min_segment_size=MinSegSize})-> + %% sort segs to group them in average size groups in a deterministic way + SortedSizedSegments = lists:sort([{mi_segment:filesize(X),X} || X <- Segments]), lists:foldl(fun({Size,_}=Seg,Acc0)-> NotSimilar = fun({AverageSize,Bucket})-> - (Size < AverageSize*0.5 or Size > AverageSize*1.5) and (Size > 50000000 or AverageSize > 50000000) + not ((Size > AverageSize*BucketLow and Size < AverageSize*BucketHigh) or (Size < MinSegSize and AverageSize < MinSegSize)) end, case lists:dropwhile(NotSimilar,dict:to_list(Acc0)) of % if a bucket is similar, add seg to bucket and change averagesize [{AverageSize,Bucket}|_] -> NbSeg = length(Bucket), From 791652bdbb7b228ddf96bdcee9f48ed89eafd5b2 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 16:02:59 +0200 Subject: [PATCH 04/12] now version compiles --- src/mi_locks.erl | 6 +++--- src/mi_scheduler.erl | 16 ++++++++-------- src/mi_server.erl | 14 ++++++++------ 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/mi_locks.erl b/src/mi_locks.erl index f6b552f..5d5aba4 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -78,20 +78,20 @@ when_free(Key, Fun, Locks) -> claim_compact(Key,Locks) -> case dict:find(Key,Locks) of - {ok,#lock{not_compacting=false}} -> Locks + {ok,#lock{not_compacting=false}} -> Locks; {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=false},Locks); error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=false},Locks) end. release_compact(Key,Locks) -> case dict:find(Key,Locks) of - {ok,#lock{not_compacting=true}} -> Locks + {ok,#lock{not_compacting=true}} -> Locks; {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=true},Locks); error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=true},Locks) end. is_compact_free(Key,Locks) -> case dict:find(Key,Locks) of - {ok,Lock} -> Lock#lock.not_compacting + {ok,Lock} -> Lock#lock.not_compacting; error -> true end. diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index cbb52ec..eda6880 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -130,9 +130,9 @@ code_change(_OldVsn, State, _Extra) -> %% THROTTLING: Run M processes in N seconds waiting ms_before_replace(Ring,N) before doing something %% and replace_oldest (impl is a set of size M of timestamps representing running processes. %% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx} -new_timering(M)->{make_tuple(M,now()),1}. +new_timering(M)->{erlang:make_tuple(M,now()),1}. replace_oldest({Set,Idx})-> - {insert_element(Idx,Set,now()),case Idx+1 of X when X>tuple_size(Set) ->1; X->X end}. + {erlang:insert_element(Idx,Set,now()),case Idx+1 of X when X>tuple_size(Set) ->1; X->X end}. ms_before_replace({Set,Idx},N)-> max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). @@ -147,7 +147,7 @@ ms_before_replace({Set,Idx},N)-> worker_init_state(Parent,WantedThroughput)-> #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),wanted_throughput=WantedThroughput, - timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]} + timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]}. worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, test_compactions=TestCompactions}=State) when length(TestCompactions)==?TIMERING_SIZE -> @@ -159,8 +159,8 @@ worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, [TestSegments, TestBytes, TestElapsedSecs, Throughput]), worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); -worker_loop(#wstate{parent=Parent,timering=TimeRing,timering_span=TimeRingSpan, - test_start=TestStart,test_compactions=TestCompactions}=State) -> +worker_loop(#wstate{parent=Parent,timering=TimeRing, + timering_span=TimeRingSpan, test_compactions=TestCompactions}=State) -> Worker = self(), receive {compaction_res,Result}-> @@ -169,7 +169,7 @@ worker_loop(#wstate{parent=Parent,timering=TimeRing,timering_span=TimeRingSpan, spawn_link(fun()-> Start = os:timestamp(), Result = merge_index:compact(Pid), - Worker ! {compaction_res,Result} + Worker ! {compaction_res,Result}, ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, case Result of {ok, OldSegments, OldBytes} -> @@ -184,9 +184,9 @@ worker_loop(#wstate{parent=Parent,timering=TimeRing,timering_span=TimeRingSpan, {Error, Reason} when Error == error; Error == 'EXIT' -> lager:error("Failed to compact ~p: ~p", [Pid, Reason]) - end, + end end), - send_after(ms_before_replace(TimeRing,TimeRingSpan),Parent,{worker_ready,Worker}), + erlang:send_after(ms_before_replace(TimeRing,TimeRingSpan),Parent,{worker_ready,Worker}), ?MODULE:worker_loop(State#wstate{timering=replace_oldest(TimeRing)}); _ -> %% ignore unknown messages diff --git a/src/mi_server.erl b/src/mi_server.erl index 197d6f2..2dc41d0 100644 --- a/src/mi_server.erl +++ b/src/mi_server.erl @@ -60,6 +60,7 @@ buffers, next_id, config, + compacting_pids, lookup_range_pids, buffer_rollover_size, converter, @@ -500,7 +501,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({'EXIT', Pid, Reason}, - #state{lookup_range_pids=SRPids,compacting=Compacting}=State) -> + #state{lookup_range_pids=SRPids,compacting_pids=CompactingPids}=State) -> case lists:keytake(Pid, #stream_range.pid, SRPids) of {value, SR, NewSRPids} -> @@ -738,12 +739,12 @@ clear_deleteme_flag(Filename) -> file:delete(Filename ++ ?DELETEME_FLAG). %% Figure out which files to merge -get_segments_to_merge(Segments,#{max_compact_segments=MaxCompactSegments}=Config,Locks) -> +get_segments_to_merge(Segments,#config{max_compact_segments=MaxCompactSegments}=Config,Locks) -> NotCompactingSegments = lists:filter(fun(S)-> mi_locks:is_compact_free(mi_segment:filename(S),Locks) end,Segments), %% Group segments by similar size (buckets) - Buckets = get_buckets(Segments,Config), + Buckets = get_buckets(NotCompactingSegments,Config), %% Take only the groups > min_compact_segments and take the max_compact_segments firsts PrunedBuckets = dict:fold( fun (_,Bucket,Acc0) when length(Bucket) < ?MIN_COMPACT_SEGMENTS -> Acc0; @@ -756,15 +757,16 @@ get_segments_to_merge(Segments,#{max_compact_segments=MaxCompactSegments}=Config %% then take the group with the smallest segment average size case lists:sort(PrunedBuckets) of [] -> []; - [{_,Segs}|_] = [Seg || {_,Seg}<-Segs] + [{_,Segs}|_] -> [Seg || {_,Seg}<-Segs] end. get_buckets(Segments,#config{bucket_low=BucketLow, bucket_high=BucketHigh, min_segment_size=MinSegSize})-> %% sort segs to group them in average size groups in a deterministic way SortedSizedSegments = lists:sort([{mi_segment:filesize(X),X} || X <- Segments]), lists:foldl(fun({Size,_}=Seg,Acc0)-> - NotSimilar = fun({AverageSize,Bucket})-> - not ((Size > AverageSize*BucketLow and Size < AverageSize*BucketHigh) or (Size < MinSegSize and AverageSize < MinSegSize)) + NotSimilar = fun({AverageSize,_})-> + not ((Size > AverageSize*BucketLow andalso Size < AverageSize*BucketHigh) + orelse (Size < MinSegSize andalso AverageSize < MinSegSize)) end, case lists:dropwhile(NotSimilar,dict:to_list(Acc0)) of % if a bucket is similar, add seg to bucket and change averagesize [{AverageSize,Bucket}|_] -> NbSeg = length(Bucket), From c463db39dd5cf407b65907cb07ae44c477fc7445 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 16:08:14 +0200 Subject: [PATCH 05/12] add configuration into .app --- src/merge_index.app.src | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/merge_index.app.src b/src/merge_index.app.src index 0a8696b..7c95d11 100644 --- a/src/merge_index.app.src +++ b/src/merge_index.app.src @@ -14,6 +14,9 @@ {compact_mod_fun, {mi_segment, compact_by_average}}, {compact_staleness_threshold, {1, hours}}, {max_compact_segments, 20}, + {segment_similarity_ratio, 0.5}, + {min_segment_size, 50000000}, + {compaction_throughput_mb_per_sec, 30}, {segment_query_read_ahead_size, 65536}, {segment_compact_read_ahead_size, 5242880}, {segment_file_buffer_size, 20971520}, From eef21a2ced0401f9e3470d7f7a71f3bc92bad37a Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 16:22:39 +0200 Subject: [PATCH 06/12] Adjust Throttling window to adjust throughput --- src/mi_scheduler.erl | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index eda6880..ba262b3 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -126,6 +126,15 @@ code_change(_OldVsn, State, _Extra) -> %% ==================================================================== %% Internal worker %% ==================================================================== +-define(TIMERING_SIZE, 30). +-define(TIMERING_SPAN_INIT, 130). + +-record(wstate, { parent, + wanted_throughput, + timering, + timering_span, + test_start, + test_compactions}). %% THROTTLING: Run M processes in N seconds waiting ms_before_replace(Ring,N) before doing something %% and replace_oldest (impl is a set of size M of timestamps representing running processes. @@ -136,20 +145,11 @@ replace_oldest({Set,Idx})-> ms_before_replace({Set,Idx},N)-> max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). --define(TIMERING_SIZE, 10). --define(TIMERING_SPAN_INIT, 10). --record(wstate, { parent, - wanted_throughput, - timering, - timering_span, - test_start, - test_compactions}). - worker_init_state(Parent,WantedThroughput)-> #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),wanted_throughput=WantedThroughput, timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]}. -worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, +worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart,wanted_throughput=WantedThroughput, test_compactions=TestCompactions}=State) when length(TestCompactions)==?TIMERING_SIZE -> TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]), TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]), @@ -157,7 +157,15 @@ worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, Throughput = TestBytes/TestElapsedSecs/(1024*1024), lager:info("Overall Compaction: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", [TestSegments, TestBytes, TestElapsedSecs, Throughput]), - worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); + AcceptableDiff = WantedThroughput*0.2, + case abs(Throughput-WantedThroughput) of + Diff when Diff < AcceptableDiff -> + worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); + _ -> %% We need to adjust timering span window in order to have the good throughput + NewTimeRingSpan = trunc(TimeRingSpan * (WantedThroughput/Throughput)), + lager:info("Adjust throttling to have ~p compaction every ~p seconds",[?TIMERING_SIZE,NewTimeRingSpan]), + worker_loop(State#wstate{timering_span=NewTimeRingSpan,test_start=now(),test_compactions=[]}) + end; worker_loop(#wstate{parent=Parent,timering=TimeRing, timering_span=TimeRingSpan, test_compactions=TestCompactions}=State) -> From 91e1c117d0bf13e50aa344d6cfb2817f975eef1e Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 16:28:54 +0200 Subject: [PATCH 07/12] forgot to add exports --- src/mi_locks.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mi_locks.erl b/src/mi_locks.erl index 5d5aba4..e8cab62 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -29,7 +29,10 @@ claim/2, claim_many/2, release/2, - when_free/3 + when_free/3, + claim_compact/2, + release_compact/2, + is_compact_free/2 ]). -record(lock, { From 4cfef5656f7d5b9c25495427e68ecb0618dbaf93 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sat, 5 Jul 2014 22:18:52 +0200 Subject: [PATCH 08/12] Debug Configuration/Locks/ and forgotten exports --- src/mi_locks.erl | 15 +++++++-------- src/mi_scheduler.erl | 26 +++++++++++++++----------- src/mi_server.erl | 12 ++++++++---- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/mi_locks.erl b/src/mi_locks.erl index e8cab62..f6a91df 100644 --- a/src/mi_locks.erl +++ b/src/mi_locks.erl @@ -37,7 +37,7 @@ -record(lock, { count, - not_compacting, + not_compacting=true, funs=[] }). @@ -81,17 +81,16 @@ when_free(Key, Fun, Locks) -> claim_compact(Key,Locks) -> case dict:find(Key,Locks) of - {ok,#lock{not_compacting=false}} -> Locks; - {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=false},Locks); - error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=false},Locks) + {ok,#lock{count=Count}=Lock} -> dict:store(Key,Lock#lock{not_compacting=false,count=Count+1},Locks); + error -> dict:store(Key,#lock{count=1,funs=[],not_compacting=false},Locks) end. release_compact(Key,Locks) -> - case dict:find(Key,Locks) of - {ok,#lock{not_compacting=true}} -> Locks; + NewLocks = case dict:find(Key,Locks) of {ok,Lock} -> dict:store(Key,Lock#lock{not_compacting=true},Locks); - error -> dict:store(Key,#lock{count=0,funs=[],not_compacting=true},Locks) - end. + error -> throw({lock_does_not_exist, Key}) + end, + release(Key,NewLocks). is_compact_free(Key,Locks) -> case dict:find(Key,Locks) of diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index ba262b3..6036218 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -26,7 +26,10 @@ -export([ start_link/0, start/0, - schedule_compaction/1 + schedule_compaction/1, + ms_before_replace/2, + new_timering/1, + replace_oldest/1 ]). %% Private export -export([worker_loop/1]). @@ -63,14 +66,14 @@ init([]) -> %% Trap exits of the actual worker process process_flag(trap_exit, true), - WantedThroughput = application:get_env(merge_index, compaction_throughput_mb_per_sec, 30), + {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), %% Use a dedicated worker sub-process to do the actual merging. The %% process may ignore messages for a long while during the compaction %% and we want to ensure that our message queue doesn't fill up with %% a bunch of dup requests for the same directory. - - WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(self(),WantedThroughput)) end), + Self = self(), + WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self,WantedThroughput)) end), {ok, #state{ queue = queue:new(), worker = WorkerPid, ready = true}}. @@ -108,8 +111,9 @@ handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) -> handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) -> lager:error("Compaction worker ~p exited: ~p", [WorkerPid, Reason]), %% Start a new worker. - WantedThroughput = application:get_env(merge_index, compaction_throughput_mb_per_sec, 30), - NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(self(),WantedThroughput)) end), + {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), + Self = self(), + NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self,WantedThroughput)) end), NewState = State#state { worker=NewWorkerPid , ready = true}, {noreply, NewState}; @@ -126,8 +130,8 @@ code_change(_OldVsn, State, _Extra) -> %% ==================================================================== %% Internal worker %% ==================================================================== --define(TIMERING_SIZE, 30). --define(TIMERING_SPAN_INIT, 130). +-define(TIMERING_SIZE, 40). +-define(TIMERING_SPAN_INIT, 3). -record(wstate, { parent, wanted_throughput, @@ -141,9 +145,9 @@ code_change(_OldVsn, State, _Extra) -> %% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx} new_timering(M)->{erlang:make_tuple(M,now()),1}. replace_oldest({Set,Idx})-> - {erlang:insert_element(Idx,Set,now()),case Idx+1 of X when X>tuple_size(Set) ->1; X->X end}. + {erlang:setelement(Idx,Set,now()),case Idx+1 of X when X>erlang:size(Set) ->1; X->X end}. ms_before_replace({Set,Idx},N)-> - max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). + max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). worker_init_state(Parent,WantedThroughput)-> #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),wanted_throughput=WantedThroughput, @@ -162,7 +166,7 @@ worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart,wanted_throu Diff when Diff < AcceptableDiff -> worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); _ -> %% We need to adjust timering span window in order to have the good throughput - NewTimeRingSpan = trunc(TimeRingSpan * (WantedThroughput/Throughput)), + NewTimeRingSpan = trunc(TimeRingSpan * (Throughput/WantedThroughput)) + 1, lager:info("Adjust throttling to have ~p compaction every ~p seconds",[?TIMERING_SIZE,NewTimeRingSpan]), worker_loop(State#wstate{timering_span=NewTimeRingSpan,test_start=now(),test_compactions=[]}) end; diff --git a/src/mi_server.erl b/src/mi_server.erl index 2dc41d0..f286643 100644 --- a/src/mi_server.erl +++ b/src/mi_server.erl @@ -156,10 +156,12 @@ init([Root]) -> process_flag(trap_exit, true), %% Cache config in order to avoid application:get_env overhead - SegSimilarityRatio = application:get_env(merge_index, segment_similarity_ratio, 0.5), + {ok,SegSimilarityRatio} = application:get_env(merge_index, segment_similarity_ratio), + {ok,MaxCompactSegments} = application:get_env(merge_index, max_compact_segments), + {ok,MinSegmentSize} = application:get_env(merge_index, min_segment_size), Config = #config{ - max_compact_segments=application:get_env(merge_index, max_compact_segments, 20), - min_segment_size=application:get_env(merge_index, min_segment_size, 50000000), + max_compact_segments=MaxCompactSegments, + min_segment_size=MinSegmentSize, bucket_low=1-SegSimilarityRatio, bucket_high=1+SegSimilarityRatio }, @@ -430,7 +432,9 @@ handle_cast({compacted, CompactSegmentWO, OldSegments, OldBytes, From}, State) - [set_deleteme_flag(mi_segment:filename(X)) || X <- OldSegments], F = fun(X, Acc) -> - mi_locks:when_free(mi_segment:filename(X), fun() -> mi_segment:delete(X) end, Acc) + Key = mi_segment:filename(X), + Acc1 = mi_locks:when_free(Key, fun() -> mi_segment:delete(X) end, Acc), + mi_locks:release_compact(Key,Acc1) end, NewLocks = lists:foldl(F, Locks, OldSegments), From e0384892c4e99b690af7266f5c30ab7bcbfb56c1 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Sun, 6 Jul 2014 03:28:57 +0200 Subject: [PATCH 09/12] Throttling issue : do not consider 0 Compation case --- src/mi_scheduler.erl | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index 6036218..831397d 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -130,7 +130,7 @@ code_change(_OldVsn, State, _Extra) -> %% ==================================================================== %% Internal worker %% ==================================================================== --define(TIMERING_SIZE, 40). +-define(TIMERING_SIZE, 15). -define(TIMERING_SPAN_INIT, 3). -record(wstate, { parent, @@ -181,19 +181,14 @@ worker_loop(#wstate{parent=Parent,timering=TimeRing, spawn_link(fun()-> Start = os:timestamp(), Result = merge_index:compact(Pid), - Worker ! {compaction_res,Result}, - ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, case Result of + {ok, 0, 0}->ok; {ok, OldSegments, OldBytes} -> - case ElapsedSecs > 1 of - true -> - lager:info( - "Single Compaction ~p: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", - [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); - false -> - ok - end; - + Worker ! {compaction_res,Result}, + ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, + lager:debug( + "Single Compaction ~p: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", + [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); {Error, Reason} when Error == error; Error == 'EXIT' -> lager:error("Failed to compact ~p: ~p", [Pid, Reason]) end From 5eb5aa0c28157ce763d0180da9274e481dce72d9 Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Mon, 7 Jul 2014 01:43:06 +0200 Subject: [PATCH 10/12] not clever to not allow dynamic throughput configuration --- src/mi_scheduler.erl | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index 831397d..865f5da 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -66,14 +66,12 @@ init([]) -> %% Trap exits of the actual worker process process_flag(trap_exit, true), - {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), - %% Use a dedicated worker sub-process to do the actual merging. The %% process may ignore messages for a long while during the compaction %% and we want to ensure that our message queue doesn't fill up with %% a bunch of dup requests for the same directory. Self = self(), - WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self,WantedThroughput)) end), + WorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end), {ok, #state{ queue = queue:new(), worker = WorkerPid, ready = true}}. @@ -111,9 +109,8 @@ handle_info({worker_ready, WorkerPid}, #state { queue = Q } = State) -> handle_info({'EXIT', WorkerPid, Reason}, #state { worker = WorkerPid } = State) -> lager:error("Compaction worker ~p exited: ~p", [WorkerPid, Reason]), %% Start a new worker. - {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), Self = self(), - NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self,WantedThroughput)) end), + NewWorkerPid = spawn_link(fun() -> worker_loop(worker_init_state(Self)) end), NewState = State#state { worker=NewWorkerPid , ready = true}, {noreply, NewState}; @@ -134,7 +131,6 @@ code_change(_OldVsn, State, _Extra) -> -define(TIMERING_SPAN_INIT, 3). -record(wstate, { parent, - wanted_throughput, timering, timering_span, test_start, @@ -149,12 +145,13 @@ replace_oldest({Set,Idx})-> ms_before_replace({Set,Idx},N)-> max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). -worker_init_state(Parent,WantedThroughput)-> - #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE),wanted_throughput=WantedThroughput, +worker_init_state(Parent)-> + #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE), timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]}. -worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart,wanted_throughput=WantedThroughput, +worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, test_compactions=TestCompactions}=State) when length(TestCompactions)==?TIMERING_SIZE -> + {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]), TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]), TestElapsedSecs = timer:now_diff(os:timestamp(), TestStart) / 1000000, From fb92cb321cc427863241aebce028ce272ba958dd Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Mon, 7 Jul 2014 23:54:53 +0200 Subject: [PATCH 11/12] adjust throttling in milliseconds, not seconds --- src/mi_scheduler.erl | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index 865f5da..10d04de 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -127,8 +127,8 @@ code_change(_OldVsn, State, _Extra) -> %% ==================================================================== %% Internal worker %% ==================================================================== --define(TIMERING_SIZE, 15). --define(TIMERING_SPAN_INIT, 3). +-define(TIMERING_SIZE, 10). +-define(TIMERING_AJUST_EVERY, 5). -record(wstate, { parent, timering, @@ -136,35 +136,36 @@ code_change(_OldVsn, State, _Extra) -> test_start, test_compactions}). -%% THROTTLING: Run M processes in N seconds waiting ms_before_replace(Ring,N) before doing something +%% THROTTLING: Run M processes in N milliseconds waiting ms_before_replace(Ring,N) before doing something %% and replace_oldest (impl is a set of size M of timestamps representing running processes. %% Replace the oldest one if older than N seconds else wait (oldest_ts-Nsec) : {Ts_Set, Oldest_Idx} new_timering(M)->{erlang:make_tuple(M,now()),1}. replace_oldest({Set,Idx})-> {erlang:setelement(Idx,Set,now()),case Idx+1 of X when X>erlang:size(Set) ->1; X->X end}. ms_before_replace({Set,Idx},N)-> - max(0,timer:seconds(N) - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). + max(0,N - trunc(timer:now_diff(now(),element(Idx,Set))/1000)). worker_init_state(Parent)-> #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE), - timering_span=?TIMERING_SPAN_INIT,test_start=now(),test_compactions=[]}. + timering_span=0,test_start=now(),test_compactions=[]}. worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, - test_compactions=TestCompactions}=State) when length(TestCompactions)==?TIMERING_SIZE -> + test_compactions=TestCompactions}=State) when length(TestCompactions)==(?TIMERING_SIZE*?TIMERING_AJUST_EVERY) -> {ok,WantedThroughput} = application:get_env(merge_index, compaction_throughput_mb_per_sec), TestBytes = lists:sum([OldBytes || {ok, _, OldBytes}<-TestCompactions]), TestSegments = lists:sum([OldSegments || {ok, OldSegments, _}<-TestCompactions]), - TestElapsedSecs = timer:now_diff(os:timestamp(), TestStart) / 1000000, - Throughput = TestBytes/TestElapsedSecs/(1024*1024), - lager:info("Overall Compaction: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", - [TestSegments, TestBytes, TestElapsedSecs, Throughput]), - AcceptableDiff = WantedThroughput*0.2, - case abs(Throughput-WantedThroughput) of + TestElapsedMSecs = timer:now_diff(os:timestamp(), TestStart) / 1000, + ThroughputBms = TestBytes/TestElapsedMSecs, + WantedThroughputBms = WantedThroughput *1024*1024 / 1000, + lager:info("Overall Compaction: ~p segments for ~p MBytes in ~p milliseconds, ~.2f MB/sec", + [TestSegments, TestBytes/(1024*1024), TestElapsedMSecs, (ThroughputBms*1000) / (1024*1024)]), + AcceptableDiff = WantedThroughputBms*0.2, + case abs(ThroughputBms-WantedThroughputBms) of Diff when Diff < AcceptableDiff -> worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); _ -> %% We need to adjust timering span window in order to have the good throughput - NewTimeRingSpan = trunc(TimeRingSpan * (Throughput/WantedThroughput)) + 1, - lager:info("Adjust throttling to have ~p compaction every ~p seconds",[?TIMERING_SIZE,NewTimeRingSpan]), + NewTimeRingSpan = trunc(TestBytes/WantedThroughputBms/?TIMERING_AJUST_EVERY)+1, + lager:info("Adjust throttling to have ~p compaction every ~p milliseconds",[?TIMERING_SIZE,NewTimeRingSpan]), worker_loop(State#wstate{timering_span=NewTimeRingSpan,test_start=now(),test_compactions=[]}) end; @@ -182,10 +183,10 @@ worker_loop(#wstate{parent=Parent,timering=TimeRing, {ok, 0, 0}->ok; {ok, OldSegments, OldBytes} -> Worker ! {compaction_res,Result}, - ElapsedSecs = timer:now_diff(os:timestamp(), Start) / 1000000, + ElapsedMSecs = timer:now_diff(os:timestamp(), Start) / 1000, lager:debug( - "Single Compaction ~p: ~p segments for ~p bytes in ~p seconds, ~.2f MB/sec", - [Pid, OldSegments, OldBytes, ElapsedSecs, OldBytes/ElapsedSecs/(1024*1024)]); + "Single Compaction ~p: ~p segments for ~p bytes in ~p milliseconds, ~.2f MB/sec", + [Pid, OldSegments, OldBytes, ElapsedMSecs, OldBytes/ElapsedMSecs/1024]); {Error, Reason} when Error == error; Error == 'EXIT' -> lager:error("Failed to compact ~p: ~p", [Pid, Reason]) end From 314b0a2e957262b12019b33d05ffc1d95b4691fd Mon Sep 17 00:00:00 2001 From: Arnaud Wetzel Date: Tue, 8 Jul 2014 17:46:28 +0200 Subject: [PATCH 12/12] allow only 30% maximum adjustment to ensure a kind of continuity --- src/mi_scheduler.erl | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/mi_scheduler.erl b/src/mi_scheduler.erl index 10d04de..fd24edb 100644 --- a/src/mi_scheduler.erl +++ b/src/mi_scheduler.erl @@ -147,7 +147,7 @@ ms_before_replace({Set,Idx},N)-> worker_init_state(Parent)-> #wstate{parent=Parent, timering=new_timering(?TIMERING_SIZE), - timering_span=0,test_start=now(),test_compactions=[]}. + timering_span=1000,test_start=now(),test_compactions=[]}. worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, test_compactions=TestCompactions}=State) when length(TestCompactions)==(?TIMERING_SIZE*?TIMERING_AJUST_EVERY) -> @@ -157,17 +157,16 @@ worker_loop(#wstate{timering_span=TimeRingSpan,test_start=TestStart, TestElapsedMSecs = timer:now_diff(os:timestamp(), TestStart) / 1000, ThroughputBms = TestBytes/TestElapsedMSecs, WantedThroughputBms = WantedThroughput *1024*1024 / 1000, - lager:info("Overall Compaction: ~p segments for ~p MBytes in ~p milliseconds, ~.2f MB/sec", - [TestSegments, TestBytes/(1024*1024), TestElapsedMSecs, (ThroughputBms*1000) / (1024*1024)]), - AcceptableDiff = WantedThroughputBms*0.2, - case abs(ThroughputBms-WantedThroughputBms) of - Diff when Diff < AcceptableDiff -> - worker_loop(State#wstate{timering_span=TimeRingSpan,test_start=now(),test_compactions=[]}); - _ -> %% We need to adjust timering span window in order to have the good throughput - NewTimeRingSpan = trunc(TestBytes/WantedThroughputBms/?TIMERING_AJUST_EVERY)+1, - lager:info("Adjust throttling to have ~p compaction every ~p milliseconds",[?TIMERING_SIZE,NewTimeRingSpan]), - worker_loop(State#wstate{timering_span=NewTimeRingSpan,test_start=now(),test_compactions=[]}) - end; + lager:info("Overall Compaction: ~p segments for ~p MBytes in ~.2f seconds, ~.2f MB/sec", + [TestSegments, TestBytes/(1024*1024), TestElapsedMSecs/1000, (ThroughputBms*1000) / (1024*1024)]), + %% We need to adjust timering span window in order to have the good throughput + %% ensure a kind of continuity not allowing more than 30% adjustment + Span = case trunc(TestBytes/WantedThroughputBms/?TIMERING_AJUST_EVERY)+1 of + NewTime when NewTime < TimeRingSpan -> max(trunc(TimeRingSpan*0.3)+1,NewTime); + NewTime -> min(trunc(TimeRingSpan*1.3)+1,NewTime) + end, + lager:info("Adjust throttling to have ~p compaction every ~p milliseconds",[?TIMERING_SIZE,Span]), + worker_loop(State#wstate{timering_span=Span,test_start=now(),test_compactions=[]}); worker_loop(#wstate{parent=Parent,timering=TimeRing, timering_span=TimeRingSpan, test_compactions=TestCompactions}=State) ->