From 26326dfbe3df9ddca49b1c3ac721df91c522610b Mon Sep 17 00:00:00 2001 From: Jesper Lundgren Date: Sun, 24 Jan 2021 20:31:17 +0800 Subject: [PATCH] Create Keyspace if missing --- README.md | 1 + benchmarks/benchmark.config | 1 + src/erlcass.erl | 71 +++++++++++++++++++++++++++++-------- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 052e3de..9c6a299 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ The cluster options can be set inside your `app.config` file under the `cluster_ {erlcass, [ {log_level, 3}, {keyspace, <<"keyspace">>}, + {keyspace_cql, <<"your CQL query to create keyspace if missing">>}, {cluster_options,[ {contact_points, <<"172.17.3.129,172.17.3.130,172.17.3.131">>}, {latency_aware_routing, true}, diff --git a/benchmarks/benchmark.config b/benchmarks/benchmark.config index 104691c..35dd4a9 100644 --- a/benchmarks/benchmark.config +++ b/benchmarks/benchmark.config @@ -14,6 +14,7 @@ {erlcass, [ {keyspace, <<"load_test_erlcass">>}, + {keyspace_cql, <<"CREATE KEYSPACE load_test_erlcass WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};">>}, {cluster_options,[ {contact_points, <<"172.17.3.129">>}, {latency_aware_routing, true}, diff --git a/src/erlcass.erl b/src/erlcass.erl index bece0e5..8bcab03 100644 --- a/src/erlcass.erl +++ b/src/erlcass.erl @@ -368,13 +368,11 @@ receive_response(Tag) -> {error, timeout} end. +do_connect(Session, Pid, Keyspace) -> + erlcass_nif:cass_session_connect(Session, Pid, Keyspace). + do_connect(Session, Pid) -> - case erlcass_utils:get_env(keyspace) of - {ok, Keyspace} -> - erlcass_nif:cass_session_connect(Session, Pid, Keyspace); - _ -> - erlcass_nif:cass_session_connect(Session, Pid) - end. + erlcass_nif:cass_session_connect(Session, Pid). do_close(undefined, _Pid, _Timeout) -> ok; @@ -393,16 +391,44 @@ session_create() -> case erlcass_nif:cass_session_new() of {ok, Session} -> Self = self(), - case do_connect(Session, Self) of + Keyspace = case erlcass_utils:get_env(keyspace) of + {ok, Space} -> Space; + _ -> "" + end, + KeyspaceCQL = case erlcass_utils:get_env(keyspace_cql) of + {ok, CQL} -> CQL; + _ -> "" + end, + Connect = case Keyspace of + "" -> do_connect(Session, Self); + Keyspace -> do_connect(Session, Self, Keyspace) + end, + case Connect of ok -> - receive - {session_connected, Self, Result} -> - ?INFO_MSG("session ~p connection complete result: ~p", [Self, Result]), - {ok, Session} - - after ?CONNECT_TIMEOUT -> - ?ERROR_MSG("session ~p connection timeout", [Self]), - {error, connect_session_timeout} + case receive_session_connect(Keyspace, Self) of + ok -> {ok, Session}; + {error, missing_keyspace} when KeyspaceCQL =/= "", Keyspace =/= "" -> + ?INFO_MSG("Keyspace '~s' is missing, will create using: '~s'", [Keyspace, KeyspaceCQL]), + ok = do_connect(Session, Self), + case receive_session_connect("", Self) of + ok -> + {ok, StmRef} = query_new_statement(KeyspaceCQL), + erlcass_nif:cass_session_execute(nil, Session, StmRef, Self, init_keyspace), + ?INFO_MSG("Creating Keyspace '~s'", [Keyspace]), + ok = receive_response(init_keyspace), + ?INFO_MSG("Keyspace '~s' Created", [Keyspace]), + ok = do_close(Session, Self, 5000), + ?INFO_MSG("Session Closed", []); + Error -> Error + end, + ?INFO_MSG("Reconnecting with Keyspace", []), + ok = do_connect(Session, Self, Keyspace), + ?INFO_MSG("Waiting for coonection", []), + case receive_session_connect(Keyspace, Self) of + ok -> {ok, Session}; + Err -> Err + end; + Error -> Error end; Error -> Error @@ -411,6 +437,21 @@ session_create() -> Error end. +receive_session_connect(Keyspace, Self) -> + MissingKeyspaceError = list_to_binary(lists:flatten(io_lib:format("Keyspace '~s' does not exist", [Keyspace]))), + receive + {session_connected, Self, {error, MissingKeyspaceError}} -> + {error, missing_keyspace}; + + {session_connected, Self, Result} -> + ?INFO_MSG("session ~p connection complete result: ~p", [Self, Result]), + ok + + after ?CONNECT_TIMEOUT -> + ?ERROR_MSG("session ~p connection timeout", [Self]), + {error, connect_session_timeout} + end. + session_prepare_cached_statements(SessionRef) -> FunPrepareStm = fun({Identifier, Query}) ->