diff --git a/.gitignore b/.gitignore index 830e627..160f35c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +_build *.beam *.eqc .eunit/* diff --git a/Makefile b/Makefile deleted file mode 100644 index 1de9a9d..0000000 --- a/Makefile +++ /dev/null @@ -1,39 +0,0 @@ -DIALYZER_APPS=erts kernel stdlib crypto -DIALYZER_FLAGS ?= -Wunmatched_returns -Werror_handling -Wrace_conditions - -.PHONY: all compile clean deps test dialyzer typer - -all: deps compile - -clean: - $(REBAR) clean - -deps: - $(REBAR) get-deps - $(REBAR) compile - -compile: - $(REBAR) skip_deps=true compile - -testdeps: deps - $(REBAR) -C rebar.test.config get-deps - $(REBAR) -C rebar.test.config compile - -runtests: testdeps compile - bash test/run.sh - -update-doc-lines: - @escript doc/update_line_numbers.erl ebin doc/*.md - -typer: - typer --plt $(DEPS_PLT) -I include -r ./src - -include tools.mk - -ifeq ($(REBAR),) -$(error "Rebar not found. Please set REBAR variable or update PATH") -endif - -## Override test after tools.mk; use custom test runner for isolation. -test: testdeps - bash test/run.sh diff --git a/README.md b/README.md index 9959898..210e4b1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,16 @@ +riak_ensemble_ng +------------------- + +This repo and the corresponding hex package differ from the `basho/riak_ensemble` in the following ways: + +* Removed use of `riak_ensemble_clock` replacing with OTP-18 time functionality +* `eleveldb` is no longer the default `synctree` + +To use `eleveldb` as the `synctree` add the `eleveldb` dep to your projects `rebar.config` `dep` list. Add `eleveldb` to your app's `application` list *before* the entry for `riak_ensemble`. And add to your configuration `{riak_ensemble, [{synctree_backend, synctree_leveldb}]}`. + +Original README +--------------- + (Note: Work-in-progress documentation [here](https://github.com/basho/riak_ensemble/blob/develop/doc/Readme.md)) `riak_ensemble` is a consensus library that supports creating multiple diff --git a/c_src/riak_ensemble_clock.c b/c_src/riak_ensemble_clock.c deleted file mode 100644 index de5a7d2..0000000 --- a/c_src/riak_ensemble_clock.c +++ /dev/null @@ -1,184 +0,0 @@ -/******************************************************************** - * - * Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtaine - * a copy of the License at - * - * http: www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - ********************************************************************/ -#include "erl_nif.h" - -#include -#include -#include -#include - -#if defined(__MACH__) && defined(__APPLE__) -#include -#include -#endif - -static ERL_NIF_TERM ATOM_OK; -static ERL_NIF_TERM ATOM_ERROR; - -#if defined(__MACH__) && defined(__APPLE__) -static mach_timebase_info_data_t timebase_info; -#endif - -/*********************************************************************/ - -#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0) -uint64_t posix_get_clock(clockid_t clock) -{ - struct timespec ts; - if(clock_gettime(clock, &ts) == -1) - return 0; - return ((uint64_t)ts.tv_sec * 1000000000) + ts.tv_nsec; -} - -/* Note: Prefer CLOCK_BOOTTIME on Linux where supported, as this - includes time spent in suspend. CLOCK_MONOTONIC may or may - not include time spent in suspend -- it's CPU dependent. In - practice, this shouldn't matter -- people don't typically - suspend/resume production servers while under client load. - Likewise, client TCP connections are unlikely to survive - across reasonable suspend durations. -*/ - -uint64_t posix_monotonic_time(void) -{ - uint64_t time; -#if defined(CLOCK_BOOTTIME) - if((time = posix_get_clock(CLOCK_BOOTTIME))) - return time; -#elif defined(CLOCK_MONOTONIC) - if((time = posix_get_clock(CLOCK_MONOTONIC))) - return time; -#endif - return 0; -} -#endif - -/********************************************************************* - * See Apple technical note: * - * https://developer.apple.com/library/mac/qa/qa1398/_index.html * - *********************************************************************/ - -/* Note: mach_absolute_time() is based on the CPU timestamp counter, - which is synchronized across all CPUs since Intel Nehalem. - Earlier CPUs do not provide this guarantee. It's unclear if - Apple provides any correction for this behavior on older CPUs. - We assume this doesn't matter in practice -- people don't use - ancient OS X machines as production servers. -*/ - -#if defined(__MACH__) && defined(__APPLE__) -uint64_t osx_monotonic_time(void) -{ - uint64_t time; - uint64_t timeNano; - - time = mach_absolute_time(); - - // Do the maths. We hope that the multiplication doesn't - // overflow; the price you pay for working in fixed point. - - timeNano = time * timebase_info.numer / timebase_info.denom; - - return timeNano; -} -#endif - -/*********************************************************************/ - -static uint64_t get_monotonic_time() -{ - uint64_t time = 0; - -#if defined(__MACH__) && defined(__APPLE__) - time = osx_monotonic_time(); -#endif - -#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0) - time = posix_monotonic_time(); -#endif - - return time; -} - -/*********************************************************************/ - -static ERL_NIF_TERM monotonic_time(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - uint64_t time = get_monotonic_time(); - - if(time) { - return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, time)); - } - else { - return ATOM_ERROR; - } -} - -/*********************************************************************/ - -static ERL_NIF_TERM monotonic_time_ms(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - uint64_t time = get_monotonic_time() / 1000000; - - if(time) { - return enif_make_tuple2(env, ATOM_OK, enif_make_uint64(env, time)); - } - else { - return ATOM_ERROR; - } -} - -/*********************************************************************/ - -static void init(ErlNifEnv *env) -{ - ATOM_OK = enif_make_atom(env, "ok"); - ATOM_ERROR = enif_make_atom(env, "error"); - -#if defined(__MACH__) && defined(__APPLE__) - (void) mach_timebase_info(&timebase_info); -#endif -} - -static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) -{ - init(env); - return 0; -} - -static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, - ERL_NIF_TERM load_info) -{ - init(env); - return 0; -} - -static void on_unload(ErlNifEnv *env, void *priv_data) -{ -} - -/*********************************************************************/ - -static ErlNifFunc nif_funcs[] = { - {"monotonic_time", 0, monotonic_time}, - {"monotonic_time_ms", 0, monotonic_time_ms} -}; - -ERL_NIF_INIT(riak_ensemble_clock, nif_funcs, &on_load, NULL, &on_upgrade, &on_unload) diff --git a/rebar b/rebar deleted file mode 100755 index a7eccc6..0000000 Binary files a/rebar and /dev/null differ diff --git a/rebar.config b/rebar.config index 71af809..ffe964f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,20 +1,18 @@ +{deps, [{gen_fsm_compat, "~>0.3.0"}, {lager, "~>3.6.0"}]}. + {erl_opts, [debug_info, warnings_as_errors, warn_untyped_record, - {platform_define, "^[0-9]+", namespaced_types}, {parse_transform, lager_transform}]}. + {eunit_opts, [verbose]}. {edoc_opts, [preprocess, {dir, "edoc"}]}. {cover_enabled, true}. {xref_checks, [undefined_function_calls]}. -{deps, [ - {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "3.2.2"}}}, - {eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {tag, "2.0.32"}}} -]}. -{port_specs, - [{".*", "priv/riak_ensemble.so", - ["c_src/*.c*"], - [{env, [{"CFLAGS", "$CFLAGS"}]}] - }]}. +{profiles, [{test, [{deps, [{riak_test, ".*", {git, "git://github.com/basho/riak_test", {branch, "develop"}}}]}]}]}. + +{plugins, [{rebar_erl_vsn, "~>0.2.0"}]}. +{provider_hooks, [{pre, [{compile, erl_vsn}]}]}. + diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..1e4210c --- /dev/null +++ b/rebar.lock @@ -0,0 +1,10 @@ +{"1.1.0", +[{<<"gen_fsm_compat">>,{pkg,<<"gen_fsm_compat">>,<<"0.3.0">>},0}, + {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, + {<<"lager">>,{pkg,<<"lager">>,<<"3.6.3">>},0}]}. +[ +{pkg_hash,[ + {<<"gen_fsm_compat">>, <<"5903549F67D595F58A7101154CBE0FDD46955FBFBE40813F1E53C23A970FF5F4">>}, + {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, + {<<"lager">>, <<"FE78951D174616273F87F0DBC3374D1430B1952E5EFC4E1C995592D30A207294">>}]} +]. diff --git a/rebar.real b/rebar.real deleted file mode 100755 index 18ccf12..0000000 Binary files a/rebar.real and /dev/null differ diff --git a/rebar.test.config b/rebar.test.config deleted file mode 100644 index 5860da2..0000000 --- a/rebar.test.config +++ /dev/null @@ -1,11 +0,0 @@ -{erl_opts, [debug_info, - warnings_as_errors, - warn_untyped_record, - {parse_transform, lager_transform}]}. -{eunit_opts, [verbose]}. -{cover_enabled, true}. -{xref_checks, [undefined_function_calls]}. -{deps_dir, "deps.test"}. -{lib_dirs, ["deps"]}. -{deps, [{riak_test, ".*", {git, "git://github.com/basho/riak_test", {branch, "riak/2.0"}}}]}. - diff --git a/src/riak_ensemble.app.src b/src/riak_ensemble.app.src index 76b08aa..a8631bd 100644 --- a/src/riak_ensemble.app.src +++ b/src/riak_ensemble.app.src @@ -1,12 +1,11 @@ -{application, riak_ensemble, - [ - {description, ""}, - {vsn, git}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { riak_ensemble_app, []}}, - {env, []} - ]}. +{application,riak_ensemble, + [{description,"Multi-Paxos framework in Erlang"}, + {vsn,"2.4.4"}, + {registered,[]}, + {applications,[kernel,stdlib,lager,gen_fsm_compat]}, + {mod,{riak_ensemble_app,[]}}, + {env,[]}, + {maintainers,["basho","Tristan Sloughter"]}, + {pkg_name,riak_ensemble_ng}, + {licenses,["Apache 2.0"]}, + {links,[{"Github","https://github.com/basho/riak_ensemble"}]}]}. diff --git a/src/riak_ensemble_clock.erl b/src/riak_ensemble_clock.erl index 6bd6b4e..a89a31f 100644 --- a/src/riak_ensemble_clock.erl +++ b/src/riak_ensemble_clock.erl @@ -18,25 +18,11 @@ %% %% ------------------------------------------------------------------- -module(riak_ensemble_clock). --on_load(init/0). + -export([monotonic_time/0, monotonic_time_ms/0]). monotonic_time() -> - erlang:nif_error({error, not_loaded}). + {ok, erlang:monotonic_time()}. monotonic_time_ms() -> - erlang:nif_error({error, not_loaded}). - -init() -> - case code:priv_dir(riak_ensemble) of - {error, bad_name} -> - case code:which(?MODULE) of - Filename when is_list(Filename) -> - SoName = filename:join([filename:dirname(Filename),"../priv", "riak_ensemble"]); - _ -> - SoName = filename:join("../priv", "riak_ensemble") - end; - Dir -> - SoName = filename:join(Dir, "riak_ensemble") - end, - erlang:load_nif(SoName, 0). + {ok, erlang:monotonic_time(milli_seconds)}. diff --git a/src/riak_ensemble_config.erl b/src/riak_ensemble_config.erl index 7d4aa3d..531be4b 100644 --- a/src/riak_ensemble_config.erl +++ b/src/riak_ensemble_config.erl @@ -18,9 +18,15 @@ %% %% ------------------------------------------------------------------- -module(riak_ensemble_config). --compile(export_all). -include_lib("riak_ensemble_types.hrl"). +-export([tick/0, lease/0, trust_lease/0, follower_timeout/0, + election_timeout/0, prefollow_timeout/0, pending_timeout/0, + probe_delay/0, local_get_timeout/0, local_put_timeout/0, + alive_ticks/0, peer_workers/0, storage_delay/0, storage_tick/0, + tree_validation/0, synchronous_tree_updates/0, + notfound_read_delay/0, get_env/2]). + %% @doc %% The primary ensemble tick that determines the rate at which an elected %% leader attempts to refresh its lease. @@ -51,7 +57,7 @@ follower_timeout() -> %% The election timeout used for randomized election. election_timeout() -> Timeout = follower_timeout(), - Timeout + random:uniform(Timeout). + Timeout + riak_ensemble_util:random_uniform(Timeout). %% @doc %% The prefollow timeout determines how long a peer waits to hear from the diff --git a/src/riak_ensemble_exchange.erl b/src/riak_ensemble_exchange.erl index d0a620a..b120f49 100644 --- a/src/riak_ensemble_exchange.erl +++ b/src/riak_ensemble_exchange.erl @@ -18,18 +18,30 @@ %% %% ------------------------------------------------------------------- -module(riak_ensemble_exchange). --compile(export_all). +-export([start_exchange/7, perform_exchange/7, perform_exchange2/5, exchange/5, + exchange_get/4, trust_majority/4, all_trust_majority/3]). +-ifndef('21.0'). start_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) -> spawn(fun() -> try perform_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) catch Class:Reason -> io:format("CAUGHT: ~p/~p~n~p~n", [Class, Reason, erlang:get_stacktrace()]), - gen_fsm:send_event(Peer, exchange_failed) + gen_fsm_compat:send_event(Peer, exchange_failed) end end). - +-else. +start_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) -> + spawn(fun() -> + try + perform_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) + catch Class:Reason:Stack -> + io:format("CAUGHT: ~p/~p~n~p~n", [Class, Reason, Stack]), + gen_fsm_compat:send_event(Peer, exchange_failed) + end + end). +-endif. perform_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) -> Required = case Trusted of true -> quorum; @@ -49,7 +61,7 @@ perform_exchange(Ensemble, Peer, Id, Tree, Peers, Views, Trusted) -> end, case RemotePeers of failed -> - gen_fsm:send_event(Peer, exchange_failed), + gen_fsm_compat:send_event(Peer, exchange_failed), ok; _ -> perform_exchange2(Ensemble, Peer, Id, Tree, RemotePeers) @@ -61,14 +73,14 @@ perform_exchange2(Ensemble, Peer, Id, Tree, RemotePeers) -> exchange(Ensemble, Peer, Id, Tree, RemotePeers); false -> %% io:format(user, "~p: tree_corrupted (perform_exchange2)~n", [Id]), - gen_fsm:sync_send_event(Peer, tree_corrupted, infinity) + gen_fsm_compat:sync_send_event(Peer, tree_corrupted, infinity) end. exchange(_Ensemble, Peer, _Id, _Tree, []) -> - gen_fsm:send_event(Peer, exchange_complete); + gen_fsm_compat:send_event(Peer, exchange_complete); exchange(Ensemble, Peer, Id, Tree, [RemotePeer|RemotePeers]) -> RemotePid = riak_ensemble_manager:get_peer_pid(Ensemble, RemotePeer), - RemoteTree = gen_fsm:sync_send_event(RemotePid, tree_pid, infinity), + RemoteTree = gen_fsm_compat:sync_send_event(RemotePid, tree_pid, infinity), Local = fun(exchange_get, {L,B}) -> exchange_get(L, B, Peer, Tree); (start_exchange_level, _) -> @@ -100,7 +112,7 @@ exchange(Ensemble, Peer, Id, Tree, [RemotePeer|RemotePeers]) -> exchange_get(L, B, PeerPid, Tree) -> case riak_ensemble_peer_tree:exchange_get(L, B, Tree) of corrupted -> - gen_fsm:sync_send_event(PeerPid, tree_corrupted, infinity), + gen_fsm_compat:sync_send_event(PeerPid, tree_corrupted, infinity), throw(corrupted); Hashes -> Hashes diff --git a/src/riak_ensemble_manager.erl b/src/riak_ensemble_manager.erl index f0a1085..1f565f4 100644 --- a/src/riak_ensemble_manager.erl +++ b/src/riak_ensemble_manager.erl @@ -487,7 +487,7 @@ reload_state() -> -spec initial_state() -> state(). initial_state() -> ets:insert(?ETS, {enabled, false}), - ClusterName = {node(), erlang:now()}, + ClusterName = {node(), erlang:unique_integer([positive])}, CS = riak_ensemble_state:new(ClusterName), State=#state{version=0, ensemble_data=[], diff --git a/src/riak_ensemble_msg.erl b/src/riak_ensemble_msg.erl index c4ea0fa..da735b7 100644 --- a/src/riak_ensemble_msg.erl +++ b/src/riak_ensemble_msg.erl @@ -85,7 +85,7 @@ send_all(Msg, Id, Peers, Views) -> -spec send_all(msg(), peer_id(), peer_pids(), views(), required()) -> msg_state(). send_all(_Msg, Id, _Peers=[{Id,_}], _Views, _Required) -> ?OUT("~p: self-sending~n", [Id]), - gen_fsm:send_event(self(), {quorum_met, []}), + gen_fsm_compat:send_event(self(), {quorum_met, []}), #msgstate{awaiting=undefined, timer=undefined, replies=[], id=Id}; send_all(Msg, Id, Peers, Views, Required) -> ?OUT("~p/~p: sending to ~p: ~p~n", [Id, self(), Peers, Msg]), @@ -138,7 +138,7 @@ send_request({PeerId, PeerPid}, ReqId, Event) -> reply(From, PeerId, nack); _ -> ?OUT("~p: Sending to ~p: ~p~n", [self(), PeerId, Event]), - gen_fsm:send_event(PeerPid, Event) + gen_fsm_compat:send_event(PeerPid, Event) end. %%%=================================================================== @@ -172,14 +172,14 @@ send_cast({_PeerId, PeerPid}, Event) -> ok; _ -> ?OUT("~p: Sending to ~p: ~p~n", [self(), _PeerId, Event]), - gen_fsm:send_event(PeerPid, Event) + gen_fsm_compat:send_event(PeerPid, Event) end. %%%=================================================================== -spec reply(msg_from(), peer_id(), any()) -> ok. reply({riak_ensemble_msg, Sender, ReqId}, Id, Reply) -> - gen_fsm:send_all_state_event(Sender, {reply, ReqId, Id, Reply}). + gen_fsm_compat:send_all_state_event(Sender, {reply, ReqId, Id, Reply}). %%%=================================================================== @@ -349,7 +349,7 @@ add_reply(Peer, Reply, MsgState=#msgstate{timer=Timer}) -> true -> cancel_timer(Timer), {Valid, _Nacks} = find_valid(Replies), - gen_fsm:send_event(self(), {quorum_met, Valid}), + gen_fsm_compat:send_event(self(), {quorum_met, Valid}), MsgState#msgstate{replies=[], awaiting=undefined, timer=undefined}; false -> MsgState#msgstate{replies=Replies}; @@ -361,7 +361,7 @@ add_reply(Peer, Reply, MsgState=#msgstate{timer=Timer}) -> -spec quorum_timeout(msg_state()) -> msg_state(). quorum_timeout(#msgstate{replies=Replies}) -> {Valid, _Nacks} = find_valid(Replies), - gen_fsm:send_event(self(), {timeout, Valid}), + gen_fsm_compat:send_event(self(), {timeout, Valid}), #msgstate{awaiting=undefined, timer=undefined, replies=[]}. %%%=================================================================== diff --git a/src/riak_ensemble_peer.erl b/src/riak_ensemble_peer.erl index 5166413..b2dae18 100644 --- a/src/riak_ensemble_peer.erl +++ b/src/riak_ensemble_peer.erl @@ -21,7 +21,7 @@ %% TODO: Before PR. Module + other edocs, general cleanup/refactor. -module(riak_ensemble_peer). --behaviour(gen_fsm). +-behaviour(gen_fsm_compat). -include_lib("riak_ensemble_types.hrl"). @@ -54,9 +54,10 @@ -export([do_kupdate/4, do_kput_once/4, do_kmodify/4]). -compile({pulse_replace_module, - [{gen_fsm, pulse_gen_fsm}]}). + [{gen_fsm, pulse_gen_fsm}, + {gen_fsm_compat, pulse_gen_fsm}]}). -%% gen_fsm callbacks +%% gen_fsm_compat callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). @@ -154,12 +155,12 @@ -spec start_link(module(), ensemble_id(), peer_id(), [any()]) -> ignore | {error, _} | {ok, pid()}. start_link(Mod, Ensemble, Id, Args) -> - gen_fsm:start_link(?MODULE, [Mod, Ensemble, Id, Args], []). + gen_fsm_compat:start_link(?MODULE, [Mod, Ensemble, Id, Args], []). -spec start(module(), ensemble_id(), peer_id(), [any()]) -> ignore | {error, _} | {ok, pid()}. start(Mod, Ensemble, Id, Args) -> - gen_fsm:start(?MODULE, [Mod, Ensemble, Id, Args], []). + gen_fsm_compat:start(?MODULE, [Mod, Ensemble, Id, Args], []). %% TODO: Do we want this to be routable by ensemble/id instead? -spec join(pid(), peer_id()) -> ok | timeout | {error, [{already_member, peer_id()}]}. @@ -207,33 +208,33 @@ stable_views(Ensemble, Timeout) -> -spec get_leader(pid()) -> peer_id(). get_leader(Pid) when is_pid(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, get_leader, infinity). + gen_fsm_compat:sync_send_all_state_event(Pid, get_leader, infinity). -spec watch_leader_status(pid()) -> ok. watch_leader_status(Pid) when is_pid(Pid) -> - gen_fsm:send_all_state_event(Pid, {watch_leader_status, self()}). + gen_fsm_compat:send_all_state_event(Pid, {watch_leader_status, self()}). -spec stop_watching(pid()) -> ok. stop_watching(Pid) when is_pid(Pid) -> - gen_fsm:send_all_state_event(Pid, {stop_watching, self()}). + gen_fsm_compat:send_all_state_event(Pid, {stop_watching, self()}). -ifdef(TEST). -spec get_watchers(pid()) -> [{pid(), reference()}]. get_watchers(Pid) when is_pid(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, get_watchers). + gen_fsm_compat:sync_send_all_state_event(Pid, get_watchers). -endif. get_info(Pid) when is_pid(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, get_info, infinity). + gen_fsm_compat:sync_send_all_state_event(Pid, get_info, infinity). tree_info(Pid) when is_pid(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, tree_info, infinity). + gen_fsm_compat:sync_send_all_state_event(Pid, tree_info, infinity). backend_pong(Pid) when is_pid(Pid) -> - gen_fsm:send_event(Pid, backend_pong). + gen_fsm_compat:send_event(Pid, backend_pong). force_state(Pid, EpochSeq) -> - gen_fsm:sync_send_event(Pid, {force_state, EpochSeq}). + gen_fsm_compat:sync_send_event(Pid, {force_state, EpochSeq}). %%%=================================================================== %%% K/V API @@ -350,7 +351,7 @@ local_put(Pid, Key, Obj, Timeout) when is_pid(Pid) -> %% this function provide no consistency guarantees whatsoever. -spec debug_local_get(pid(), term()) -> std_reply(). debug_local_get(Pid, Key) -> - gen_fsm:sync_send_all_state_event(Pid, {debug_local_get, Key}). + gen_fsm_compat:sync_send_all_state_event(Pid, {debug_local_get, Key}). -endif. %%%=================================================================== @@ -474,7 +475,7 @@ exchange(Msg, State) -> common(Msg, State, exchange). exchange(tree_corrupted, From, State) -> - gen_fsm:reply(From, ok), + gen_fsm_compat:reply(From, ok), repair(init, State); exchange(Msg, From, State) -> common(Msg, From, State, exchange). @@ -698,7 +699,7 @@ leading(ping_quorum, From, State=#state{fact=Fact, id=Id, members=Members, %% io:format("timeout~n"), Extra end, - gen_fsm:reply(From, {Id, TreeReady, Result}) + gen_fsm_compat:reply(From, {Id, TreeReady, Result}) end), {next_state, leading, State3}; leading(stable_views, _From, State=#state{fact=Fact}) -> @@ -863,7 +864,7 @@ following(Msg, From, State) -> -spec forward(_, fsm_from(), state()) -> {next_state, following, state()}. forward(Msg, From, State) -> - catch gen_fsm:send_event(peer(leader(State), State), {forward, From, Msg}), + catch gen_fsm_compat:send_event(peer(leader(State), State), {forward, From, Msg}), {next_state, following, State}. -spec valid_request(_,_,state()) -> boolean(). @@ -1027,13 +1028,13 @@ common(Msg, State, StateName) -> -spec common(_, fsm_from(), state(), StateName) -> {next_state, StateName, state()}. common({force_state, {Epoch, Seq}}, From, State, StateName) -> State2 = set_epoch(Epoch, set_seq(Seq, State)), - gen_fsm:reply(From, ok), + gen_fsm_compat:reply(From, ok), {next_state, StateName, State2}; common(tree_pid, From, State, StateName) -> - gen_fsm:reply(From, State#state.tree), + gen_fsm_compat:reply(From, State#state.tree), {next_state, StateName, State}; common(tree_corrupted, From, State, StateName) -> - gen_fsm:reply(From, ok), + gen_fsm_compat:reply(From, ok), lager:debug("~p: tree_corrupted in state ~p", [State#state.id, StateName]), repair(init, State); common(_Msg, From, State, StateName) -> @@ -1363,7 +1364,7 @@ send_reply(From, Reply) -> nack -> ok; {ok,_} -> ok end, - gen_fsm:reply(From, Reply), + gen_fsm_compat:reply(From, Reply), ok. do_put_fsm(Key, Fun, Args, From, Self, State=#state{tree=Tree}) -> @@ -1371,7 +1372,7 @@ do_put_fsm(Key, Fun, Args, From, Self, State=#state{tree=Tree}) -> corrupted -> %% io:format("Tree corrupted (put)!~n"), send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); KnownHash -> do_put_fsm(Key, Fun, Args, From, Self, KnownHash, State) end. @@ -1383,7 +1384,7 @@ do_put_fsm(Key, Fun, Args, From, Self, KnownHash, State) -> case is_current(Local, Key, KnownHash, State2) of local_timeout -> %% TODO: Should this send a request_failed? - %% gen_fsm:sync_send_event(Self, request_failed, infinity), + %% gen_fsm_compat:sync_send_event(Self, request_failed, infinity), send_reply(From, unavailable); true -> do_modify_fsm(Key, Local, Fun, Args, From, State2); @@ -1393,9 +1394,9 @@ do_put_fsm(Key, Fun, Args, From, Self, KnownHash, State) -> do_modify_fsm(Key, Current, Fun, Args, From, State2); {corrupted, _State2} -> send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); {failed, _State3} -> - gen_fsm:sync_send_event(Self, request_failed, infinity), + gen_fsm_compat:sync_send_event(Self, request_failed, infinity), send_reply(From, unavailable) end end. @@ -1407,11 +1408,11 @@ do_modify_fsm(Key, Current, Fun, Args, From, State=#state{self=Self}) -> send_reply(From, {ok, New}); {corrupted, _State2} -> send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); {precondition, _State2} -> send_reply(From, failed); {failed, _State2} -> - gen_fsm:sync_send_event(Self, request_failed, infinity), + gen_fsm_compat:sync_send_event(Self, request_failed, infinity), send_reply(From, timeout) end. @@ -1425,9 +1426,9 @@ do_overwrite_fsm(Key, Val, From, Self, State0=#state{ets=ETS}) -> send_reply(From, {ok, Result}); {corrupted, _State2} -> send_reply(From, timeout), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); {failed, _State2} -> - gen_fsm:sync_send_event(Self, request_failed, infinity), + gen_fsm_compat:sync_send_event(Self, request_failed, infinity), send_reply(From, timeout) end. @@ -1437,7 +1438,7 @@ do_get_fsm(Key, From, Self, Opts, State=#state{tree=Tree}) -> corrupted -> %% io:format("Tree corrupted (get)!~n"), send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); KnownHash -> do_get_fsm(Key, From, Self, KnownHash, Opts, State) end. @@ -1452,7 +1453,7 @@ do_get_fsm(Key, From, Self, KnownHash, Opts, State0) -> case is_current(Local, Key, KnownHash, State) of local_timeout -> %% TODO: Should this send a request_failed? - %% gen_fsm:sync_send_event(Self, request_failed, infinity), + %% gen_fsm_compat:sync_send_event(Self, request_failed, infinity), send_reply(From, timeout); true -> case LocalOnly of @@ -1464,7 +1465,7 @@ do_get_fsm(Key, From, Self, KnownHash, Opts, State0) -> %% TODO: If there's a new leader, we could forward %% instead of timeout. send_reply(From, timeout), - gen_fsm:sync_send_event(Self, request_failed, infinity) + gen_fsm_compat:sync_send_event(Self, request_failed, infinity) end; false -> case get_latest_obj(Key, Local, KnownHash, State) of @@ -1482,11 +1483,11 @@ do_get_fsm(Key, From, Self, KnownHash, Opts, State0) -> send_reply(From, {ok, Current}); {corrupted, _State2} -> send_reply(From, failed), - gen_fsm:sync_send_event(Self, tree_corrupted, infinity); + gen_fsm_compat:sync_send_event(Self, tree_corrupted, infinity); {failed, _State2} -> %% TODO: Should this be failed or unavailable? send_reply(From, failed), - gen_fsm:sync_send_event(Self, request_failed, infinity) + gen_fsm_compat:sync_send_event(Self, request_failed, infinity) end end. @@ -1675,7 +1676,7 @@ put_obj(Key, Obj, Seq, State=#state{id=Id, members=Members, self=Self}) -> case local_put(Self, Key, Obj2, ?LOCAL_PUT_TIMEOUT) of failed -> lager:warning("Failed local_put for Key ~p, Id = ~p", [Key, Id]), - gen_fsm:sync_send_event(Self, request_failed, infinity), + gen_fsm_compat:sync_send_event(Self, request_failed, infinity), {failed, State2}; Local -> case wait_for_quorum(Future) of @@ -1811,16 +1812,13 @@ get_value(Obj, Default, State) -> end. %%%=================================================================== -%%% gen_fsm callbacks +%%% gen_fsm_compat callbacks %%%=================================================================== -spec init([any(),...]) -> {ok, setup, state()}. init([Mod, Ensemble, Id, Args]) -> lager:debug("~p: starting peer", [Id]), - {A,B,C} = os:timestamp(), - _ = random:seed(A + erlang:phash2(Id), - B + erlang:phash2(node()), - C), + _ = riak_ensemble_util:random_seed(Id), ETS = ets:new(x, [public, {read_concurrency, true}, {write_concurrency, true}]), TreeTrust = case riak_ensemble_config:tree_validation() of false -> @@ -1835,7 +1833,7 @@ init([Mod, Ensemble, Id, Args]) -> tree_trust=TreeTrust, alive=?ALIVE, mod=Mod}, - gen_fsm:send_event(self(), {init, Args}), + gen_fsm_compat:send_event(self(), {init, Args}), riak_ensemble_peer_sup:register_peer(Ensemble, Id, self(), ETS), {ok, setup, State}. @@ -2230,13 +2228,13 @@ save_fact(#state{ensemble=Ensemble, id=Id, fact=Fact}) -> -spec set_timer(non_neg_integer(), any(), state()) -> state(). set_timer(Time, Event, State) -> State2 = cancel_timer(State), - Timer = gen_fsm:send_event_after(Time, Event), + Timer = gen_fsm_compat:send_event_after(Time, Event), State2#state{timer=Timer}. -spec cancel_timer(state()) -> state(). cancel_timer(State=#state{timer=undefined}) -> State; cancel_timer(State=#state{timer=Timer}) -> - %% Note: gen_fsm cancel_timer discards timer message if already sent - catch gen_fsm:cancel_timer(Timer), + %% Note: gen_fsm_compat cancel_timer discards timer message if already sent + catch gen_fsm_compat:cancel_timer(Timer), State#state{timer=undefined}. diff --git a/src/riak_ensemble_peer_tree.erl b/src/riak_ensemble_peer_tree.erl index 9f548e3..5ee9d24 100644 --- a/src/riak_ensemble_peer_tree.erl +++ b/src/riak_ensemble_peer_tree.erl @@ -133,8 +133,9 @@ async_repair(Pid) -> %%%=================================================================== init([Id, TreeId, Path]) -> - Tree = synctree:newdb(Id, [{path, Path}, - {tree_id, TreeId}]), + TreeType = application:get_env(riak_ensemble, synctree_backend, synctree_ets), + Tree = synctree:new(Id, default, default, TreeType, [{path, Path}, + {tree_id, TreeId}]), State = #state{tree=Tree}, {ok, State}. @@ -209,7 +210,7 @@ code_change(_OldVsn, State, _Extra) -> %% Hardcoded to send FSM event as expected by riak_ensemble_peer async_reply(From, Reply) when is_pid(From) -> - gen_fsm:send_event(From, Reply). + gen_fsm_compat:send_event(From, Reply). -spec do_get(_,state()) -> {any(), state()}. do_get(Key, State=#state{tree=Tree}) -> diff --git a/src/riak_ensemble_router.erl b/src/riak_ensemble_router.erl index fa714a0..3b70e61 100644 --- a/src/riak_ensemble_router.erl +++ b/src/riak_ensemble_router.erl @@ -38,18 +38,20 @@ %% additional concurrency and not have a single router bottleneck traffic. %% %% A secondary purpose of this module is to provide an isolated version -%% of `gen_fsm:send_sync_event' that converts timeouts into error tuples +%% of `gen_fsm_compat:send_sync_event' that converts timeouts into error tuples %% rather than exit conditions, as well as discarding late/delayed messages. %% This isolation is provided by spawning an intermediary proxy process. -module(riak_ensemble_router). --compile(export_all). -behaviour(gen_server). -include_lib("riak_ensemble_types.hrl"). %% API --export([start_link/1]). +-export([start_link/1, sync_send_event/3, sync_send_event/4, + sync_proxy/6, sync_proxy_direct/5, sync_proxy_router/6, + cast/2, cast/3, noconnect_cast/2, routers/0, random/1, + fail_cast/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -95,7 +97,7 @@ sync_proxy(From, Ref, Node, Target, Event, Timeout) -> -spec sync_proxy_direct(pid(), reference(), pid(), msg(), timeout()) -> ok. sync_proxy_direct(From, Ref, Pid, Event, Timeout) -> try - Result = gen_fsm:sync_send_event(Pid, Event, Timeout), + Result = gen_fsm_compat:sync_send_event(Pid, Event, Timeout), From ! {Ref, Result}, ok catch @@ -235,7 +237,7 @@ ensemble_cast(Ensemble, Msg) -> handle_ensemble_cast({sync_send_event, From, Ref, Event, Timeout}, Pid) -> spawn(fun() -> try - Result = gen_fsm:sync_send_event(Pid, Event, Timeout), + Result = gen_fsm_compat:sync_send_event(Pid, Event, Timeout), From ! {Ref, Result} catch _:_ -> diff --git a/src/riak_ensemble_sup.erl b/src/riak_ensemble_sup.erl index d3db50c..d3dc870 100644 --- a/src/riak_ensemble_sup.erl +++ b/src/riak_ensemble_sup.erl @@ -47,7 +47,6 @@ start_link() -> init([]) -> riak_ensemble_test:setup(), - synctree_leveldb:init_ets(), Children = [?CHILD(riak_ensemble_router_sup, supervisor), ?CHILD(riak_ensemble_storage, worker), ?CHILD(riak_ensemble_peer_sup, supervisor), diff --git a/src/riak_ensemble_test.erl b/src/riak_ensemble_test.erl index 0f40fa8..c8bf3d0 100644 --- a/src/riak_ensemble_test.erl +++ b/src/riak_ensemble_test.erl @@ -18,8 +18,7 @@ %% %% ------------------------------------------------------------------- -module(riak_ensemble_test). --compile(export_all). - +-export([setup/0, maybe_drop/2]). -define(ETS_TEST, riak_ensemble_test). -ifdef(TEST). diff --git a/src/riak_ensemble_util.erl b/src/riak_ensemble_util.erl index e2e5abd..2140ae8 100644 --- a/src/riak_ensemble_util.erl +++ b/src/riak_ensemble_util.erl @@ -24,7 +24,9 @@ md5/1, orddict_delta/2, shuffle/1, - cast_unreliable/2]). + cast_unreliable/2, + random_seed/1, + random_uniform/1]). %%=================================================================== @@ -148,7 +150,7 @@ shuffle(L=[_]) -> L; shuffle(L) -> Range = length(L), - L2 = [{random:uniform(Range), E} || E <- L], + L2 = [{random_uniform(Range), E} || E <- L], [E || {_, E} <- lists:sort(L2)]. %% Copied from riak_core_send_msg.erl @@ -158,3 +160,21 @@ cast_unreliable(Dest, Request) -> bang_unreliable(Dest, Msg) -> catch erlang:send(Dest, Msg, [noconnect, nosuspend]), Msg. + +-ifdef(rand). +random_seed(Id) -> + {A,B,C} = os:timestamp(), + _ = rand:seed(exsplus, {A + erlang:phash2(Id), B + erlang:phash2(node()), C}). +-else. +random_seed(Id) -> + {A,B,C} = os:timestamp(), + _ = random:seed(A + erlang:phash2(Id), B + erlang:phash2(node()), C). +-endif. + +-ifdef(rand). +random_uniform(Range) -> + rand:uniform(Range). +-else. +random_uniform(Range) -> + random:uniform(Range). +-endif. diff --git a/src/synctree_leveldb.erl b/src/synctree_leveldb.erl index 69d005d..43b0c0d 100644 --- a/src/synctree_leveldb.erl +++ b/src/synctree_leveldb.erl @@ -70,16 +70,16 @@ maybe_open_leveldb(Path, Retries) -> [{_, DB}] -> {ok, DB}; _ -> - ok = filelib:ensure_dir(Path), - case eleveldb:open(Path, leveldb_opts()) of - {ok, DB} -> - %% If eleveldb:open succeeded, we should have the only ref - true = ets:insert_new(?MODULE, {Path, DB}), - {ok, DB}; - _ when Retries > 0 -> - timer:sleep(100), - maybe_open_leveldb(Path, Retries - 1) - end + ok = filelib:ensure_dir(Path), + case eleveldb:open(Path, leveldb_opts()) of + {ok, DB} -> + %% If eleveldb:open succeeded, we should have the only ref + true = ets:insert_new(?MODULE, {Path, DB}), + {ok, DB}; + _ when Retries > 0 -> + timer:sleep(100), + maybe_open_leveldb(Path, Retries - 1) + end end. @@ -87,7 +87,7 @@ get_path(Opts) -> case proplists:get_value(path, Opts) of undefined -> Base = "/tmp/ST", - Name = integer_to_list(timestamp(erlang:now())), + Name = integer_to_list(erlang:system_time(micro_seconds)), filename:join(Base, Name); Path -> Path @@ -151,12 +151,8 @@ store(Updates, State=?STATE{id=Id, db=DB}) -> _ = eleveldb:write(DB, DBUpdates, []), State. -timestamp({Mega, Secs, Micro}) -> - Mega*1000*1000*1000*1000 + Secs * 1000 * 1000 + Micro. - leveldb_opts() -> [{is_internal_db, true}, {write_buffer_size, 4 * 1024 * 1024}, {use_bloomfilter, true}, {create_if_missing, true}]. - diff --git a/tools.mk b/tools.mk deleted file mode 100644 index 8ecd985..0000000 --- a/tools.mk +++ /dev/null @@ -1,112 +0,0 @@ -REBAR ?= ./rebar - -compile-no-deps: - ${REBAR} compile skip_deps=true - -test: compile - ${REBAR} eunit skip_deps=true - -docs: - ${REBAR} doc skip_deps=true - -xref: compile - ${REBAR} xref skip_deps=true - -PLT ?= $(HOME)/.combo_dialyzer_plt -LOCAL_PLT = .local_dialyzer_plt -DIALYZER_FLAGS ?= -Wunmatched_returns - -${PLT}: compile - @if [ -f $(PLT) ]; then \ - dialyzer --check_plt --plt $(PLT) --apps $(DIALYZER_APPS) && \ - dialyzer --add_to_plt --plt $(PLT) --output_plt $(PLT) --apps $(DIALYZER_APPS) ; test $$? -ne 1; \ - else \ - dialyzer --build_plt --output_plt $(PLT) --apps $(DIALYZER_APPS); test $$? -ne 1; \ - fi - -${LOCAL_PLT}: compile - @if [ -d deps ]; then \ - if [ -f $(LOCAL_PLT) ]; then \ - dialyzer --check_plt --plt $(LOCAL_PLT) deps/*/ebin && \ - dialyzer --add_to_plt --plt $(LOCAL_PLT) --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \ - else \ - dialyzer --build_plt --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \ - fi \ - fi - -dialyzer-run: - @echo "==> $(shell basename $(shell pwd)) (dialyzer)" -# The bulk of the code below deals with the dialyzer.ignore-warnings file -# which contains strings to ignore if output by dialyzer. -# Typically the strings include line numbers. Using them exactly is hard -# to maintain as the code changes. This approach instead ignores the line -# numbers, but takes into account the number of times a string is listed -# for a given file. So if one string is listed once, for example, and it -# appears twice in the warnings, the user is alerted. It is possible but -# unlikely that this approach could mask a warning if one ignored warning -# is removed and two warnings of the same kind appear in the file, for -# example. But it is a trade-off that seems worth it. -# Details of the cryptic commands: -# - Remove line numbers from dialyzer.ignore-warnings -# - Pre-pend duplicate count to each warning with sort | uniq -c -# - Remove annoying white space around duplicate count -# - Save in dialyer.ignore-warnings.tmp -# - Do the same to dialyzer_warnings -# - Remove matches from dialyzer.ignore-warnings.tmp from output -# - Remove duplicate count -# - Escape regex special chars to use lines as regex patterns -# - Add pattern to match any line number (file.erl:\d+:) -# - Anchor to match the entire line (^entire line$) -# - Save in dialyzer_unhandled_warnings -# - Output matches for those patterns found in the original warnings - @if [ -f $(LOCAL_PLT) ]; then \ - PLTS="$(PLT) $(LOCAL_PLT)"; \ - else \ - PLTS=$(PLT); \ - fi; \ - if [ -f dialyzer.ignore-warnings ]; then \ - if [ $$(grep -cvE '[^[:space:]]' dialyzer.ignore-warnings) -ne 0 ]; then \ - echo "ERROR: dialyzer.ignore-warnings contains a blank/empty line, this will match all messages!"; \ - exit 1; \ - fi; \ - dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin > dialyzer_warnings ; \ - cat dialyzer.ignore-warnings \ - | sed -E 's/^([^:]+:)[^:]+:/\1/' \ - | sort \ - | uniq -c \ - | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ - > dialyzer.ignore-warnings.tmp ; \ - egrep -v "^[[:space:]]*(done|Checking|Proceeding|Compiling)" dialyzer_warnings \ - | sed -E 's/^([^:]+:)[^:]+:/\1/' \ - | sort \ - | uniq -c \ - | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ - | grep -F -f dialyzer.ignore-warnings.tmp -v \ - | sed -E 's/^[[:space:]]*[0-9]+[[:space:]]*//' \ - | sed -E 's/([]\^:+?|()*.$${}\[])/\\\1/g' \ - | sed -E 's/(\\\.erl\\\:)/\1\\d+:/g' \ - | sed -E 's/^(.*)$$/^\1$$/g' \ - > dialyzer_unhandled_warnings ; \ - rm dialyzer.ignore-warnings.tmp; \ - if [ $$(cat dialyzer_unhandled_warnings | wc -l) -gt 0 ]; then \ - egrep -f dialyzer_unhandled_warnings dialyzer_warnings ; \ - found_warnings=1; \ - fi; \ - [ "$$found_warnings" != 1 ] ; \ - else \ - dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin; \ - fi - -dialyzer-quick: compile-no-deps dialyzer-run - -dialyzer: ${PLT} ${LOCAL_PLT} dialyzer-run - -cleanplt: - @echo - @echo "Are you sure? It takes several minutes to re-build." - @echo Deleting $(PLT) and $(LOCAL_PLT) in 5 seconds. - @echo - sleep 5 - rm $(PLT) - rm $(LOCAL_PLT) -