diff --git a/.gitignore b/.gitignore index 7d3bff3..c8b3877 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ ebin/*.beam -ebin/*.app +.eunit diff --git a/Emakefile b/Emakefile index 4311581..d59c341 100644 --- a/Emakefile +++ b/Emakefile @@ -1,5 +1,7 @@ -{"src/*", [ - debug_info, - {i, "include/"}, - {outdir, "ebin/"} -]}. \ No newline at end of file +% -*- mode: erlang -*- + +{[ 'src/*' ], + [ {i, "include"}, + {outdir, "ebin"}, + debug_info ] +}. \ No newline at end of file diff --git a/Makefile b/Makefile index 7d300d3..ce7b429 100644 --- a/Makefile +++ b/Makefile @@ -1,36 +1,13 @@ -PKGNAME=emongo -ROOTDIR=`erl -eval 'io:format("~s~n", [code:root_dir()])' -s init stop -noshell` -LIBDIR=$(shell erl -eval 'io:format("~s~n", [code:lib_dir()])' -s init stop -noshell) -# get application vsn from app file -VERSION=$(shell erl -pa ebin/ -eval 'application:load(${PKGNAME}), {ok, Vsn} = application:get_key(${PKGNAME}, vsn), io:format("~s~n", [Vsn])' -s init stop -noshell) +all: emake -all: src +emake: + erl -make -src: FORCE - @erl -make - @cp src/${PKGNAME}.app.src ebin/${PKGNAME}.app - -test: src +test: emake prove t/*.t -clean: - rm -rf erl_crash.dump *.boot *.rel *.script ebin/*.beam ebin/emongo.app - -package: clean - @mkdir $(PKGNAME)-$(VERSION)/ && cp -rf ebin include Emakefile Makefile priv README.markdown src t $(PKGNAME)-$(VERSION) - @COPYFILE_DISABLE=true tar zcf $(PKGNAME)-$(VERSION).tgz $(PKGNAME)-$(VERSION) - @rm -rf $(PKGNAME)-$(VERSION)/ - -install: src - @mkdir -p $(DESTDIR)/$(LIBDIR)/$(PKGNAME)-$(VERSION)/ebin - @mkdir -p $(DESTDIR)/$(LIBDIR)/$(PKGNAME)-$(VERSION)/include - for i in ebin/*.beam include/*.hrl ebin/*.app; do install $$i $(DESTDIR)/$(LIBDIR)/$(PKGNAME)-$(VERSION)/$$i ; done +check: emake + @./rebar eunit skip_deps=true - -plt: src - @dialyzer --check_plt -q -r . -I include/ - -check: src - @dialyzer --src -r . -I include/ - -FORCE: \ No newline at end of file +clean: + rm -rf $(wildcard ebin/*.beam) erl_crash.dump .eunit/ diff --git a/README.markdown b/README.markdown index 4c70541..1b5c547 100644 --- a/README.markdown +++ b/README.markdown @@ -1,20 +1,19 @@ #### The goal of emongo is to be stable, fast and easy to use. -## Compile and install +## Build make - sudo make install - + ## Start emongo application:start(emongo). - + ## Connecting to mongodb #### Option 1 - Config file example.config: - + [{emongo, [ {pools, [ {pool1, [ @@ -25,7 +24,7 @@ example.config: ]} ]} ]}]. - + specify the config file path when starting Erlang erl -config priv/example @@ -33,17 +32,17 @@ specify the config file path when starting Erlang start the application application:start(emongo). - + #### Option 2 - Add pool start the app and then add as many pools as you like application:start(emongo). emongo:add_pool(pool1, "localhost", 27017, "testdatabase", 1). - + ## API Type Reference -__PoolName__ = atom() +__PoolId__ = atom() __Host__ = string() __Port__ = integer() __Database__ = string() @@ -53,44 +52,42 @@ __Selector__ = Document __Document__ = [{Key, Val}] __Key__ = string() | atom() | binary() | integer() __Val__ = float() | string() | binary() | Document | {array, [term()]} | {binary, BinSubType, binary()} | {oid, binary()} | {oid, string()} | bool() | now() | datetime() | undefined | {regexp, string(), string()} | integer() -__BinSubType__ = integer() +__BinSubType__ = integer() ## Add Pool - emongo:add_pool(PoolName, Host, Port, Database, PoolSize) -> ok + emongo:add_pool(PoolId, Host, Port, Database, PoolSize) -> ok ## Insert -__PoolName__ = atom() +__PoolId__ = atom() __CollectionName__ = string() __Document__ = [{Key, Val}] -__Documents__ = [Document] - - emongo:insert(PoolName, CollectionName, Document) -> ok - emongo:insert(PoolName, CollectionName, Documents) -> ok +__Documents__ = [Document] + emongo:insert(PoolId, CollectionName, Document) -> ok + emongo:insert(PoolId, CollectionName, Documents) -> ok + ### Examples %% insert a single document with two fields into the "collection" collection emongo:insert(test, "collection", [{"field1", "value1"}, {"field2", "value2"}]). - + %% insert two documents, each with a single field into the "collection" collection - emongo:insert(test, "collection", [[{"document1_field1", "value1"}], [{"document2_field1", "value1"}]]). + emongo:insert(test, "collection", [[{"document1_field1", "value1"}], [{"document2_field1", "value1"}]]). ## Update -__PoolName__ = atom() +__PoolId__ = atom() __CollectionName__ = string() __Selector__ = Document __Document__ = [{Key, Val}] __Upsert__ = true | false (insert a new document if the selector does not match an existing document) -__MultiUpdate__ = true | false (if all documents matching selector should be updated) - - %% by default upsert == false and multiupdate == false - emongo:update(PoolName, CollectionName, Selector, Document) -> ok - emongo:update(PoolName, CollectionName, Selector, Document, Upsert) -> ok - emongo:update(PoolName, CollectionName, Selector, Document, Upsert, MultiUpdate) -> ok + %% by default upsert == false + emongo:update(PoolId, CollectionName, Selector, Document) -> ok + emongo:update(PoolId, CollectionName, Selector, Document, Upsert) -> ok + ### Examples %% update the document that matches "field1" == "value1" @@ -98,18 +95,18 @@ __MultiUpdate__ = true | false (if all documents matching selector should be upd ## Delete -__PoolName__ = atom() +__PoolId__ = atom() __CollectionName__ = string() -__Selector__ = Document +__Selector__ = Document %% delete all documents in a collection - emongo:delete(PoolName, CollectionName) -> ok - + emongo:delete(PoolId, CollectionName) -> ok + %% delete all documents in a collection that match a selector - emongo:delete(PoolName, CollectionName, Selector) -> ok - + emongo:delete(PoolId, CollectionName, Selector) -> ok + ## Find - + __Options__ = {timeout, Timeout} | {limit, Limit} | {offset, Offset} | {orderby, Orderby} | {fields, Fields} | response_options __Timeout__ = integer (timeout in milliseconds) __Limit__ = integer @@ -118,12 +115,12 @@ __Orderby__ = [{Key, Direction}] __Direction__ = 1 (Asc) | -1 (Desc) __Fields__ = [Key] = specifies a list of fields to return in the result set __response_options__ = return #response{header, response_flag, cursor_id, offset, limit, documents} -__Result__ = [Document] | response() - - emongo:find_all(PoolName, CollectionName) -> Result - emongo:find_all(PoolName, CollectionName, Selector) -> Result - emongo:find_all(PoolName, CollectionName, Selector, Options) -> Result - +__Result__ = [Document] | response() + + emongo:find(PoolId, CollectionName) -> Result + emongo:find(PoolId, CollectionName, Selector) -> Result + emongo:find(PoolId, CollectionName, Selector, Options) -> Result + ### Examples __limit, offset, timeout, orderby, fields__ @@ -132,81 +129,66 @@ __limit, offset, timeout, orderby, fields__ %% limit the number of results to 100 and offset the first document 10 documents from the beginning %% return documents in ascending order, sorted by the value of field1 %% limit the fields in the return documents to field1 (the _id field is always included in the results) - emongo:find_all(test, "collection", [{"field1", 1}], [{limit, 100}, {offset, 10}, {timeout, 5000}, {orderby, [{"field1", asc}]}, {fields, ["field1"]}]). - + emongo:find(test, "collection", [{"field1", 1}], [{limit, 100}, {offset, 10}, {timeout, 5000}, {orderby, [{"field1", asc}]}, {fields, ["field1"]}]). + __great than, less than, great than or equal, less than or equal__ %% find documents where field1 is greater than 5 and less than 10 - emongo:find_all(test, "collection", [{"field1", [{gt, 5}, {lt, 10}]}]). - + emongo:find(test, "collection", [{"field1", [{gt, 5}, {lt, 10}]}]). + %% find documents where field1 is greater than or equal to 5 and less than or equal to 10 - emongo:find_all(test, "collection", [{"field1", [{gte, 5}, {lte, 10}]}]). - + emongo:find(test, "collection", [{"field1", [{gte, 5}, {lte, 10}]}]). + %% find documents where field1 is greater than 5 and less than 10 - emongo:find_all(test, "collection", [{"field1", [{'>', 5}, {'<', 10}]}]). - + emongo:find(test, "collection", [{"field1", [{'>', 5}, {'<', 10}]}]). + %% find documents where field1 is greater than or equal to 5 and less than or equal to 10 - emongo:find_all(test, "collection", [{"field1", [{'>=', 5}, {'=<', 10}]}]). - + emongo:find(test, "collection", [{"field1", [{'>=', 5}, {'=<', 10}]}]). + __not equal__ %% find documents where field1 is not equal to 5 or 10 - emongo:find_all(test, "collection", [{"field1", [{ne, 5}, {ne, 10}]}]). - + emongo:find(test, "collection", [{"field1", [{ne, 5}, {ne, 10}]}]). + %% find documents where field1 is not equal to 5 - emongo:find_all(test, "collection", [{"field1", [{'=/=', 5}]}]). - + emongo:find(test, "collection", [{"field1", [{'=/=', 5}]}]). + %% find documents where field1 is not equal to 5 - emongo:find_all(test, "collection", [{"field1", [{'/=', 5}]}]). - + emongo:find(test, "collection", [{"field1", [{'/=', 5}]}]). + __in__ %% find documents where the value of field1 is one of the values in the list [1,2,3,4,5] - emongo:find_all(test, "collection", [{"field1", [{in, [1,2,3,4,5]}]}]). + emongo:find(test, "collection", [{"field1", [{in, [1,2,3,4,5]}]}]). __not in__ - + %% find documents where the value of field1 is NOT one of the values in the list [1,2,3,4,5] - emongo:find_all(test, "collection", [{"field1", [{nin, [1,2,3,4,5]}]}]). - + emongo:find(test, "collection", [{"field1", [{nin, [1,2,3,4,5]}]}]). + __all__ %% find documents where the value of field1 is an array and contains all of the values in the list [1,2,3,4,5] - emongo:find_all(test, "collection", [{"field1", [{all, [1,2,3,4,5]}]}]). - + emongo:find(test, "collection", [{"field1", [{all, [1,2,3,4,5]}]}]). + __size__ %% find documents where the value of field1 is an array of size 10 - emongo:find_all(test, "collection", [{"field1", [{size, 10}]}]). - + emongo:find(test, "collection", [{"field1", [{size, 10}]}]). + __exists__ %% find documents where field1 exists - emongo:find_all(test, "collection", [{"field1", [{exists, true}]}]). - + emongo:find(test, "collection", [{"field1", [{exists, true}]}]). + __where__ %% find documents where the value of field1 is greater than 10 - emongo:find_all(test, "collection", [{where, "this.field1 > 10"}]). - + emongo:find(test, "collection", [{where, "this.field1 > 10"}]). + __nested queries__ - %% find documents with an address field containing a sub-document + %% find documents with an address field containing a sub-document %% with street equal to "Maple Drive". %% ie: [{"address", [{"street", "Maple Drive"}, {"zip", 94114}] - emongo:find_all(test, "people", [{"address.street", "Maple Drive"}]). - -## Drop database - - %% drop current database - emongo:drop_database(PoolName) -> ok - -## Tests - -Ensure you have [etap](https://github.com/ngerakines/etap). - - git clone https://github.com/ngerakines/etap.git - cd etap && make && cd .. - export ERL_LIBS="etap" - - make test + emongo:find(test, "people", [{"address.street", "Maple Drive"}]). \ No newline at end of file diff --git a/debian/changelog b/debian/changelog deleted file mode 100644 index ded2c07..0000000 --- a/debian/changelog +++ /dev/null @@ -1,12 +0,0 @@ -emongo (0.2.2-1) unstable; urgency=low - - * Added unique index support to ensure_index. - - -- Oleg Smirnov Wed, 17 Aug 2011 19:25:16 +0300 - -emongo (0.2.1-1) unstable; urgency=low - - * initial prerelease - - -- Oleg Smirnov Sat, 06 Aug 2011 14:58:22 +0300 - diff --git a/debian/compat b/debian/compat deleted file mode 100644 index 7f8f011..0000000 --- a/debian/compat +++ /dev/null @@ -1 +0,0 @@ -7 diff --git a/debian/control b/debian/control deleted file mode 100644 index 3802da5..0000000 --- a/debian/control +++ /dev/null @@ -1,13 +0,0 @@ -Source: emongo -Section: libs -Priority: extra -Maintainer: Oleg Smirnov -Build-Depends: debhelper (>= 7.3~), erlang-base -Standards-Version: 3.9.1 -Homepage: https://github.com/master/emongo - -Package: emongo -Architecture: any -Depends: erlang -Suggests: mongodb -Description: the most Emo of mongo drivers diff --git a/debian/copyright b/debian/copyright deleted file mode 100644 index 14f8a9e..0000000 --- a/debian/copyright +++ /dev/null @@ -1 +0,0 @@ -First debianized by Oleg Smirnov on Sat, 06 Aug 2011 14:58:22 +0300. diff --git a/debian/rules b/debian/rules deleted file mode 100755 index 328323e..0000000 --- a/debian/rules +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/make -f -%: - dh $@ - -override_dh_auto_test: diff --git a/ebin/emongo.app b/ebin/emongo.app new file mode 100644 index 0000000..174397a --- /dev/null +++ b/ebin/emongo.app @@ -0,0 +1,14 @@ +{application, emongo, [ + {description, "Erlang MongoDB Driver"}, + {vsn, "0.2"}, + {modules, [ + emongo, + emongo_app, + emongo_bson, + emongo_conn, + emongo_packet + ]}, + {registered, []}, + {mod, {emongo_app, []}}, + {applications, [kernel, stdlib, sasl]} +]}. diff --git a/ebin/emongo.appup b/ebin/emongo.appup deleted file mode 100644 index c6b184d..0000000 --- a/ebin/emongo.appup +++ /dev/null @@ -1,30 +0,0 @@ -{"0.2.2", - [{"0.0.5", [{load_module, emongo}, - {load_module, emongo_packet}]}], - [{"0.0.5", [{load_module, emongo}, - {load_module, emongo_packet}]}], -}. -{"0.0.5", - [{"0.0.4", [ - {add_module, pqueue}, - {load_module, emongo_sup}, - {load_module, emongo_server}, - {load_module, emongo_router}, - {load_module, emongo_bson}, - {load_module, emongo}, - {update, emongo_pool, {advanced, []}}, - {delete_module, emongo_collection} - ]} - ], - [{"0.0.4", [ - {delete_module, pqueue}, - {load_module, emongo_sup}, - {load_module, emongo_server}, - {load_module, emongo_router}, - {load_module, emongo_bson}, - {load_module, emongo}, - {load_module, emongo_pool}, - {add_module, emongo_collection} - ]} - ] -}. diff --git a/include/emongo.hrl b/include/emongo.hrl index 55a152b..3b6fa0a 100644 --- a/include/emongo.hrl +++ b/include/emongo.hrl @@ -1,15 +1,22 @@ +-include_lib("emongo_public.hrl"). + +-record(pool, {id, host, port, database, size=1, conn_pids=queue:new(), req_id=1}). -record(header, {message_length, request_id, response_to, op_code}). --record(response, { - header, - response_flag, - cursor_id, - offset, - limit, - documents, - pool_id - }). -record(emo_query, {opts=[], offset=0, limit=0, q=[], field_selector=[]}). +-define(IS_DOCUMENT(Doc), (is_list(Doc) andalso (Doc == [] orelse (is_tuple(hd(Doc)) andalso tuple_size(hd(Doc)) == 2)))). +-define(IS_LIST_OF_DOCUMENTS(Docs), ( + is_list(Docs) andalso ( + Docs == [] orelse ( + is_list(hd(Docs)) andalso ( + hd(Docs) == [] orelse ( + is_tuple(hd(hd(Docs))) andalso + tuple_size(hd(hd(Docs))) == 2 + ) + ) + ) + ))). + -define(TIMEOUT, 5000). -define(OP_REPLY, 1). @@ -20,10 +27,3 @@ -define(OP_GET_MORE, 2005). -define(OP_DELETE, 2006). -define(OP_KILL_CURSORS, 2007). - --define(TAILABLE_CURSOR, 2). --define(SLAVE_OK, 4). --define(OPLOG, 8). --define(NO_CURSOR_TIMEOUT, 16). - --define(DUPLICATE_KEY_ERROR, 11000). diff --git a/include/emongo_public.hrl b/include/emongo_public.hrl new file mode 100644 index 0000000..df98e44 --- /dev/null +++ b/include/emongo_public.hrl @@ -0,0 +1,12 @@ +-ifndef(EMONGO_PUBLIC). + +-record(response, {header, response_flag, cursor_id, offset, limit, documents}). + +% Additional options that can be passed to emongo:find() +-define(TAILABLE_CURSOR, 2). +-define(SLAVE_OK, 4). +-define(OPLOG, 8). +-define(NO_CURSOR_TIMEOUT, 16). + +-define(EMONGO_PUBLIC, true). +-endif. diff --git a/priv/erlang-emongo.spec b/priv/erlang-emongo.spec deleted file mode 100644 index 514c3cb..0000000 --- a/priv/erlang-emongo.spec +++ /dev/null @@ -1,65 +0,0 @@ -%define realname emongo -Name: erlang-%{realname} -Version: 0.2.1 -Release: 1%{?dist} -Summary: the most Emo of mongo drivers -Group: Development/Languages -License: MIT -URL: https://github.com/master/emongo -Source0: http://cloud.github.com/downloads/master/%{realname}/%{realname}-%{version}.tar.gz -BuildRoot: %(mktemp -ud %{_tmppath}/%{name}-%{version}-%{release}-XXXXXX) -BuildRequires: erlang -%if 0%{?el4}%{?el5}%{?fc11} -Requires: erlang -%else -Requires: erlang-erts -Requires: erlang-kernel -Requires: erlang-stdlib -%endif -Provides: %{realname} = %{version}-%{release} - - -%description -the most Emo of mongo drivers - - -%prep -%setup -q -n %{realname}-%{version} - - -%build -make %{?_smp_mflags} - -%install -rm -rf $RPM_BUILD_ROOT -mkdir -p $RPM_BUILD_ROOT%{_libdir}/erlang/lib/%{realname}-%{version}/ebin -mkdir -p $RPM_BUILD_ROOT%{_libdir}/erlang/lib/%{realname}-%{version}/include -for i in ebin/*.beam include/*.hrl ebin/*.app ebin/*.appup; do install $i $RPM_BUILD_ROOT%{_libdir}/erlang/lib/%{realname}-%{version}/$i ; done - -%clean -rm -rf $RPM_BUILD_ROOT - -%files -%defattr(-,root,root,-) -%doc README.markdown -%dir %{_libdir}/erlang/lib/%{realname}-%{version} -%dir %{_libdir}/erlang/lib/%{realname}-%{version}/ebin -%dir %{_libdir}/erlang/lib/%{realname}-%{version}/include -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo.app -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_app.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo.appup -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_bson.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_packet.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_pool.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_router.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_server.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/emongo_sup.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/ebin/pqueue.beam -%{_libdir}/erlang/lib/%{realname}-%{version}/include/emongo.hrl - - -%changelog - -* Thu Mar 24 2011 Oleg Smirnov 0.2.1-1 -- Initial package diff --git a/priv/example.config b/priv/example.config index c326de8..c5d41e5 100644 --- a/priv/example.config +++ b/priv/example.config @@ -5,12 +5,6 @@ {host, "localhost"}, {port, 27017}, {database, "testdatabase"} - ]}, - {test2, [ - {size, 10}, - {host, "localhost"}, - {port, 27017}, - {database, "testdatabase"} ]} ]} -]}]. +]}]. \ No newline at end of file diff --git a/rebar b/rebar new file mode 100755 index 0000000..1cac23b Binary files /dev/null and b/rebar differ diff --git a/src/emongo.app.src b/src/emongo.app.src deleted file mode 100644 index 3ab183a..0000000 --- a/src/emongo.app.src +++ /dev/null @@ -1,11 +0,0 @@ -{application, emongo, [ - {description, "Erlang MongoDB Driver"}, - {vsn, "0.2.2"}, - {modules, [ - emongo, emongo_app, emongo_sup, emongo_bson, emongo_packet, - emongo_server, emongo_pool, emongo_router, pqueue - ]}, - {registered, [emongo_sup, emongo]}, - {mod, {emongo_app, []}}, - {applications, [kernel, stdlib]} -]}. diff --git a/src/emongo.erl b/src/emongo.erl index db86e36..3e120ca 100644 --- a/src/emongo.erl +++ b/src/emongo.erl @@ -1,7 +1,4 @@ %% Copyright (c) 2009 Jacob Vorreuter -%% Jacob Perkins -%% Belyaev Dmitry -%% François de Metz %% %% Permission is hereby granted, free of charge, to any person %% obtaining a copy of this software and associated documentation @@ -26,43 +23,29 @@ -module(emongo). -behaviour(gen_server). --export([pools/0, oid/0, add_pool/5, del_pool/1]). - --export([fold_all/6, - find_all/2, find_all/3, find_all/4, - find_one/3, find_one/4, - find_and_modify/5]). - --export([insert/3, update/4, update/5, update/6, delete/2, delete/3]). - --export([ensure_index/3, ensure_index/4, count/2, count/3, distinct/3, - distinct/4]). - --export([dec2hex/1, hex2dec/1]). - --export([sequence/2, synchronous/0, no_response/0, - find_all_seq/3, fold_all_seq/5, - insert_seq/3, update_seq/6, delete_seq/3]). - --export([update_sync/5, update_sync/6, delete_sync/3, insert_sync/3]). - --export([drop_database/1]). - --deprecated([update_sync/5, delete_sync/3]). - -%% internal -export([start_link/0, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3]). + handle_info/2, terminate/2, code_change/3]). + +-export([pools/0, oid/0, oid_generation_time/1, add_pool/5, remove_pool/1, + auth/3, find_all/2, find_all/3, find_all/4, + get_more/4, get_more/5, find_one/3, find_one/4, kill_cursors/2, + insert/3, insert_sync/3, insert_sync/4, update/4, update/5, + update_all/4, update_sync/4, update_sync/5, update_sync/6, + update_all_sync/4, update_all_sync/5, delete/2, delete/3, + delete_sync/2, delete_sync/3, delete_sync/4, ensure_index/3, count/2, + count/3, count/4, find_and_modify/4, find_and_modify/5, dec2hex/1, + hex2dec/1]). -include("emongo.hrl"). --record(state, {oid_index, hashed_hostn}). +-record(state, {pools, oid_index, hashed_hostn}). %%==================================================================== %% Types %%==================================================================== %% pool_id() = atom() %% collection() = string() +%% response() = {response, header, response_flag, cursor_id, offset, limit, documents} %% documents() = [document()] %% document() = [{term(), term()}] @@ -74,279 +57,334 @@ %% Description: Starts the server %%-------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). pools() -> - emongo_sup:pools(). + gen_server:call(?MODULE, pools, infinity). oid() -> - gen_server:call(?MODULE, oid, infinity). + gen_server:call(?MODULE, oid, infinity). -add_pool(PoolId, Host, Port, Database, Size) -> - emongo_sup:start_pool(PoolId, Host, Port, Database, Size). +oid_generation_time({oid, Oid}) -> + oid_generation_time(Oid); +oid_generation_time(Oid) when is_binary(Oid) andalso size(Oid) =:= 12 -> + <> = Oid, + UnixTime. -del_pool(PoolId) -> - emongo_sup:stop_pool(PoolId). +add_pool(PoolId, Host, Port, Database, Size) -> + gen_server:call(?MODULE, {add_pool, PoolId, Host, Port, Database, Size}, infinity). +remove_pool(PoolId) -> + gen_server:call(?MODULE, {remove_pool, PoolId}). %%------------------------------------------------------------------------------ -%% sequences of operations +%% authenticate %%------------------------------------------------------------------------------ - -sequence(_PoolId, []) -> - ok; -sequence(PoolId, Sequence) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, length(Sequence)), - sequence(Sequence, Pid, Database, ReqId). - - -sequence([Operation|Tail], Pid, Database, ReqId) -> - Result = Operation(Pid, Database, ReqId), - case Tail of - [] -> Result; - _ -> sequence(Tail, Pid, Database, ReqId + 1) +auth(PoolId, User, Pass) -> + % Authentication is a two step process. First be must run getnonce command to get + % nonce that we are going to use in step two. We need to authenticate also every + % connection. + Pools = pools(), + {Pool, _} = get_pool(PoolId, Pools), + PoolPids = queue:to_list(Pool#pool.conn_pids), + + F = fun(Pid) -> + case getnonce(Pid, Pool) of + error -> + throw(getnonce); + Nonce -> + do_auth(Nonce, Pid, Pool, User, Pass) + end + end, + lists:foreach(F, PoolPids). + +do_auth(Nonce, Pid, Pool, User, Pass) -> + Hash = emongo:dec2hex(erlang:md5(User ++ ":mongo:" ++ Pass)), + Digest = emongo:dec2hex(erlang:md5(binary_to_list(Nonce) ++ User ++ Hash)), + Query = #emo_query{q=[{<<"authenticate">>, 1}, {<<"user">>, User}, {<<"nonce">>, Nonce}, {<<"key">>, Digest}], limit=1}, + Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query), + + Resp = emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT), + case lists:nth(1, Resp#response.documents) of + [{<<"ok">>, 1.0}] -> + {ok, authenticated}; + L -> + case lists:keyfind(<<"errmsg">>, 1, L) of + false -> + throw(no_error_message); + {<<"errmsg">>, Error} -> + throw(Error) + end end. +getnonce(Pid, Pool) -> + Query1 = #emo_query{q=[{<<"getnonce">>, 1}], limit=1}, + Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query1), + Resp1 = emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT), + case lists:keyfind(<<"nonce">>, 1, lists:nth(1, Resp1#response.documents)) of + false -> + error; + {<<"nonce">>, Nonce} -> + Nonce + end. -synchronous() -> - synchronous(?TIMEOUT). - -synchronous(Timeout) -> - [fun(_, _, _) -> ok end, - fun(Pid, Database, ReqId) -> - PacketGetLastError = emongo_packet:get_last_error(Database, ReqId), - Resp = emongo_server:send_recv(Pid, ReqId, PacketGetLastError, Timeout), - Resp#response.documents - end]. +%%------------------------------------------------------------------------------ +%% find +%%------------------------------------------------------------------------------ +%find(PoolId, Collection) -> +% find(PoolId, Collection, [], [{timeout, ?TIMEOUT}]). -no_response() -> - []. +%find(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) -> +% find(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]); +%%% this function has been deprecated +%find(PoolId, Collection, Query) when is_record(Query, emo_query) -> +% {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), +% Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query), +% emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT). %% @spec find(PoolId, Collection, Selector, Options) -> Result -%% PoolId = atom() -%% Collection = string() -%% Selector = document() -%% Options = [Option] -%% Option = {timeout, Timeout} | {limit, Limit} | {offset, Offset} | {orderby, Orderby} | {fields, Fields} | explain -%% Timeout = integer (timeout in milliseconds) -%% Limit = integer -%% Offset = integer -%% Orderby = [{Key, Direction}] -%% Key = string() | binary() | atom() | integer() -%% Direction = asc | desc -%% Fields = [Field] -%% Field = string() | binary() | atom() | integer() = specifies a field to return in the result set -%% response_options = return {response, header, response_flag, cursor_id, offset, limit, documents} -%% Result = documents() | response() +%% PoolId = atom() +%% Collection = string() +%% Selector = document() +%% Options = [Option] +%% Option = {timeout, Timeout} | {limit, Limit} | {offset, Offset} | {orderby, Orderby} | {fields, Fields} | response_options +%% Timeout = integer (timeout in milliseconds) +%% Limit = integer +%% Offset = integer +%% Orderby = [{Key, Direction}] +%% Key = string() | binary() | atom() | integer() +%% Direction = asc | desc +%% Fields = [Field] +%% Field = string() | binary() | atom() | integer() = specifies a field to return in the result set +%% response_options = return {response, header, response_flag, cursor_id, offset, limit, documents} +%% Result = documents() | response() +find(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Query = create_query(Options, Selector), + Packet = emongo_packet:do_query(Pool#pool.database, Collection, Pool#pool.req_id, Query), + Resp = emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, proplists:get_value(timeout, Options, ?TIMEOUT)), + case lists:member(response_options, Options) of + true -> Resp; + false -> Resp#response.documents + end. %%------------------------------------------------------------------------------ %% find_all %%------------------------------------------------------------------------------ find_all(PoolId, Collection) -> - find_all(PoolId, Collection, [], []). + find_all(PoolId, Collection, [], [{timeout, ?TIMEOUT}]). -find_all(PoolId, Collection, Selector) -> - find_all(PoolId, Collection, Selector, []). +find_all(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) -> + find_all(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]). -find_all(PoolId, Collection, Selector, Options) -> - sequence(PoolId, find_all_seq(Collection, Selector, Options)). +find_all(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) -> + Resp = find(PoolId, Collection, Selector, [response_options|Options]), + find_all(PoolId, Collection, Selector, Options, Resp). +find_all(_PoolId, _Collection, _Selector, Options, Resp) when is_record(Resp, response), Resp#response.cursor_id == 0 -> + case lists:member(response_options, Options) of + true -> Resp; + false -> Resp#response.documents + end; -find_all_seq(Collection, Selector, Options) -> - [Fun0,Fun1] = fold_all_seq(fun(I, A) -> [I | A] end, [], Collection, Selector, Options), - - [Fun0, - fun(Pid, Database, ReqId) -> - lists:reverse(Fun1(Pid, Database, ReqId)) - end]. +find_all(PoolId, Collection, Selector, Options, Resp) when is_record(Resp, response) -> + Resp1 = get_more(PoolId, Collection, Resp#response.cursor_id, proplists:get_value(timeout, Options, ?TIMEOUT)), + Documents = lists:append(Resp#response.documents, Resp1#response.documents), + find_all(PoolId, Collection, Selector, Options, Resp1#response{documents=Documents}). %%------------------------------------------------------------------------------ -%% find_and_modify +%% find_one %%------------------------------------------------------------------------------ -find_and_modify(PoolId, Collection, Selector, Update, Options) -> - Selector1 = transform_selector(Selector), - Collection1 = unicode:characters_to_binary(Collection), - OptionsDoc = fam_options(Options, [{<<"query">>, Selector1}, - {<<"update">>, Update}]), - Query = #emo_query{q=[{<<"findandmodify">>, Collection1} | OptionsDoc], - limit=1}, - {Pid, Database, ReqId} = get_pid_pool(PoolId, 1), - Packet = emongo_packet:do_query(Database, "$cmd", - ReqId, Query), - Resp = emongo_server:send_recv(Pid, ReqId, Packet, - proplists:get_value(timeout, Options, ?TIMEOUT)), - case lists:member(response_options, Options) of - true -> Resp; - false -> Resp#response.documents - end. +find_one(PoolId, Collection, Selector) when ?IS_DOCUMENT(Selector) -> + find_one(PoolId, Collection, Selector, [{timeout, ?TIMEOUT}]). + +find_one(PoolId, Collection, Selector, Options) when ?IS_DOCUMENT(Selector), is_list(Options) -> + Options1 = [{limit, 1} | lists:keydelete(limit, 1, Options)], + find(PoolId, Collection, Selector, Options1). %%------------------------------------------------------------------------------ -%% fold_all +%% get_more %%------------------------------------------------------------------------------ -fold_all(F, Value, PoolId, Collection, Selector, Options) -> - sequence(PoolId, fold_all_seq(F, Value, Collection, Selector, Options)). - - -fold_all_seq(F, Value, Collection, Selector, Options) -> - Timeout = proplists:get_value(timeout, Options, ?TIMEOUT), - Query = create_query(Options, Selector), - [fun(_, _, _) -> ok end, - fun(Pid, Database, ReqId) -> - Packet = emongo_packet:do_query(Database, Collection, ReqId, Query), - Resp = emongo_server:send_recv(Pid, ReqId, Packet, Timeout), - - NewValue = fold_documents(F, Value, Resp), - case Query#emo_query.limit of - 0 -> - fold_more(F, NewValue, Collection, Resp#response{documents=[]}, Timeout); - _ -> - kill_cursor(Resp#response.pool_id, Resp#response.cursor_id), - NewValue - end - end]. +get_more(PoolId, Collection, CursorID, Timeout) -> + get_more(PoolId, Collection, CursorID, 0, Timeout). -fold_more(_F, Value, _Collection, #response{cursor_id=0}, _Timeout) -> - Value; - -fold_more(F, Value, Collection, #response{pool_id=PoolId, cursor_id=CursorID}, Timeout) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, 2), - Packet = emongo_packet:get_more(Database, Collection, ReqId, 0, CursorID), - Resp1 = emongo_server:send_recv(Pid, ReqId, Packet, Timeout), - - NewValue = fold_documents(F, Value, Resp1), - fold_more(F, NewValue, Collection, Resp1#response{documents=[]}, Timeout). +get_more(PoolId, Collection, CursorID, NumToReturn, Timeout) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:get_more(Pool#pool.database, Collection, Pool#pool.req_id, NumToReturn, CursorID), + emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, Timeout). %%------------------------------------------------------------------------------ -%% find_one +%% kill_cursors %%------------------------------------------------------------------------------ -find_one(PoolId, Collection, Selector) -> - find_one(PoolId, Collection, Selector, []). +kill_cursors(PoolId, CursorID) when is_integer(CursorID) -> + kill_cursors(PoolId, [CursorID]); -find_one(PoolId, Collection, Selector, Options) -> - Options1 = [{limit, 1} | lists:keydelete(limit, 1, Options)], - find_all(PoolId, Collection, Selector, Options1). +kill_cursors(PoolId, CursorIDs) when is_list(CursorIDs) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:kill_cursors(Pool#pool.req_id, CursorIDs), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). %%------------------------------------------------------------------------------ %% insert %%------------------------------------------------------------------------------ -insert(PoolId, Collection, Documents) -> - sequence(PoolId, insert_seq(Collection, Documents, no_response())). - -insert_seq(Collection, [[_|_]|_]=Documents, Next) -> - [fun(Pid, Database, ReqId) -> - Packet = emongo_packet:insert(Database, Collection, ReqId, Documents), - emongo_server:send(Pid, Packet) - end | Next]; -insert_seq(Collection, Document, Next) -> - insert_seq(Collection, [Document], Next). +insert(PoolId, Collection, Document) when ?IS_DOCUMENT(Document) -> + insert(PoolId, Collection, [Document]); +insert(PoolId, Collection, Documents) when ?IS_LIST_OF_DOCUMENTS(Documents) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). +%%------------------------------------------------------------------------------ +%% insert_sync that runs db.$cmd.findOne({getlasterror: 1}); +%%------------------------------------------------------------------------------ insert_sync(PoolId, Collection, Documents) -> - sequence(PoolId, insert_seq(Collection, Documents, synchronous())). + insert_sync(PoolId, Collection, Documents, []). + +insert_sync(PoolId, Collection, DocumentsIn, Options) -> + Documents = if + ?IS_LIST_OF_DOCUMENTS(DocumentsIn) -> DocumentsIn; + ?IS_DOCUMENT(DocumentsIn) -> [DocumentsIn] + end, + + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet1 = emongo_packet:insert(Pool#pool.database, Collection, Pool#pool.req_id, Documents), + sync_command({Pid, Pool}, Packet1, Options). %%------------------------------------------------------------------------------ %% update %%------------------------------------------------------------------------------ -update(PoolId, Collection, Selector, Document) -> - update(PoolId, Collection, Selector, Document, false). +update(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + update(PoolId, Collection, Selector, Document, false). -update(PoolId, Collection, Selector, Document, Upsert) -> - update(PoolId, Collection, Selector, Document, Upsert, false). +update(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, false, Selector, Document), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). -update(PoolId, Collection, Selector, Document, Upsert, MultiUpdate) -> - sequence(PoolId, update_seq(Collection, Selector, Document, Upsert, - MultiUpdate, no_response())). +%%------------------------------------------------------------------------------ +%% update_all +%%------------------------------------------------------------------------------ +update_all(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, false, true, Selector, Document), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). -update_seq(Collection, Selector, Document, Upsert, MultiUpdate, Next) -> - [fun(Pid, Database, ReqId) -> - Packet = emongo_packet:update(Database, Collection, ReqId, Upsert, - MultiUpdate, - transform_selector(Selector), Document), - emongo_server:send(Pid, Packet) - end | Next]. +%%------------------------------------------------------------------------------ +%% update_sync that runs db.$cmd.findOne({getlasterror: 1}); +%%------------------------------------------------------------------------------ +update_sync(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + update_sync(PoolId, Collection, Selector, Document, false). +update_sync(PoolId, Collection, Selector, Document, Upsert) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + update_sync(PoolId, Collection, Selector, Document, Upsert, []). -update_sync(PoolId, Collection, Selector, Document, Upsert) -> - update_sync(PoolId, Collection, Selector, Document, Upsert, false). +update_sync(PoolId, Collection, Selector, Document, Upsert, Options) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet1 = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, Upsert, false, Selector, Document), + sync_command({Pid, Pool}, Packet1, Options). -update_sync(PoolId, Collection, Selector, Document, Upsert, MultiUpdate) -> - sequence(PoolId, update_seq(Collection, Selector, Document, Upsert, MultiUpdate, synchronous())). +%%------------------------------------------------------------------------------ +%% update_all_sync that runs db.$cmd.findOne({getlasterror: 1}); +%%------------------------------------------------------------------------------ +update_all_sync(PoolId, Collection, Selector, Document) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + update_all_sync(PoolId, Collection, Selector, Document, []). + +update_all_sync(PoolId, Collection, Selector, Document, Options) when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Document) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet1 = emongo_packet:update(Pool#pool.database, Collection, Pool#pool.req_id, false, true, Selector, Document), + % We could check <<"n">> as the update_sync(...) functions do, but + % update_all_sync(...) isn't targeting a specific number of documents, so 0 + % updates is legitimate. + sync_command({Pid, Pool}, Packet1, Options). %%------------------------------------------------------------------------------ %% delete %%------------------------------------------------------------------------------ delete(PoolId, Collection) -> - delete(PoolId, Collection, []). + delete(PoolId, Collection, []). delete(PoolId, Collection, Selector) -> - sequence(PoolId, delete_seq(Collection, Selector, no_response())). - - -delete_seq(Collection, Selector, Next) -> - [fun(Pid, Database, ReqId) -> - Packet = emongo_packet:delete(Database, Collection, ReqId, transform_selector(Selector)), - emongo_server:send(Pid, Packet) - end | Next]. + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, transform_selector(Selector)), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). +%%------------------------------------------------------------------------------ +%% delete_sync that runs db.$cmd.findOne({getlasterror: 1}); +%%------------------------------------------------------------------------------ +delete_sync(PoolId, Collection) -> + delete_sync(PoolId, Collection, []). delete_sync(PoolId, Collection, Selector) -> - sequence(PoolId, delete_seq(Collection, Selector, synchronous())). + delete_sync(PoolId, Collection, Selector, []). +delete_sync(PoolId, Collection, Selector, Options) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet1 = emongo_packet:delete(Pool#pool.database, Collection, Pool#pool.req_id, transform_selector(Selector)), + sync_command({Pid, Pool}, Packet1, Options). %%------------------------------------------------------------------------------ %% ensure index %%------------------------------------------------------------------------------ -%% ensure_index(pool, "collection", [{"fieldname1", 1}, {"fieldname2", -1}]). -ensure_index(PoolId, Collection, Keys) -> - ensure_index(PoolId, Collection, Keys, false). - -ensure_index(PoolId, Collection, Keys, Unique) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, 1), - Packet = emongo_packet:ensure_index(Database, Collection, ReqId, Keys, Unique), - emongo_server:send(Pid, Packet). - - -count(PoolId, Collection) -> count(PoolId, Collection, []). +ensure_index(PoolId, Collection, Keys) when ?IS_DOCUMENT(Keys)-> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Packet = emongo_packet:ensure_index(Pool#pool.database, Collection, Pool#pool.req_id, Keys), + emongo_conn:send(Pid, Pool#pool.req_id, Packet). +%%------------------------------------------------------------------------------ +%% count +%%------------------------------------------------------------------------------ +count(PoolId, Collection) -> + count(PoolId, Collection, [], []). count(PoolId, Collection, Selector) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, 2), - Q = [{<<"count">>, Collection}, {<<"ns">>, Database}, - {<<"query">>, transform_selector(Selector)}], - Query = #emo_query{q=Q, limit=1}, - Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query), - case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of - #response{documents=[[{<<"n">>,Count}|_]]} -> - round(Count); - _ -> - undefined - end. - - -distinct(PoolId, Collection, Key) -> distinct(PoolId, Collection, Key, []). - -distinct(PoolId, Collection, Key, Selector) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, 2), - Q = [{<<"distinct">>, Collection}, {<<"key">>, Key}, {<<"ns">>, Database}, - {<<"query">>, transform_selector(Selector)}], - Query = #emo_query{q=Q, limit=1}, - Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query), - case emongo_server:send_recv(Pid, ReqId, Packet, ?TIMEOUT) of - #response{documents=[[{<<"values">>, {array, Vals}} | _]]} -> - Vals; - _ -> - undefined - end. + count(PoolId, Collection, Selector, []). + +count(PoolId, Collection, Selector, Options) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Query = create_query([{<<"count">>, Collection}, {limit, 1} | Options], + Selector), + Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, + Query), + case emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, ?TIMEOUT) of + #response{documents=[[{<<"n">>,Count}|_]]} -> + round(Count); + _ -> + undefined + end. %%------------------------------------------------------------------------------ -%% drop database +%% find_and_modify %%------------------------------------------------------------------------------ -drop_database(PoolId) -> - {Pid, Database, ReqId} = get_pid_pool(PoolId, 1), - Query = #emo_query{q=[{<<"dropDatabase">>, 1}], limit=1}, - Packet = emongo_packet:do_query(Database, "$cmd", ReqId, Query), - emongo_server:send(Pid, ReqId, Packet). +find_and_modify(PoolId, Collection, Selector, Update) -> + find_and_modify(PoolId, Collection, Selector, Update, []). + +find_and_modify(PoolId, Collection, Selector, Update, Options) + when ?IS_DOCUMENT(Selector), ?IS_DOCUMENT(Update), is_list(Options) -> + {Pid, Pool} = gen_server:call(?MODULE, {pid, PoolId}, infinity), + Collection1 = to_binary(Collection), + Selector1 = transform_selector(Selector), + Fields = proplists:get_value(fields, Options, []), + FieldSelector = convert_fields(Fields), + Options1 = proplists:delete(fields, Options), + Options2 = [{to_binary(Opt), Val} || {Opt, Val} <- Options1], + Query = #emo_query{q = [{<<"findandmodify">>, Collection1}, + {<<"query">>, Selector1}, + {<<"update">>, Update}, + {<<"fields">>, FieldSelector} + | Options2], + limit = 1}, + Packet = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, + Query), + Resp = emongo_conn:send_recv(Pid, Pool#pool.req_id, Packet, + proplists:get_value(timeout, Options, ?TIMEOUT)), + case lists:member(response_options, Options) of + true -> Resp; + false -> Resp#response.documents + end. + +%drop_collection(PoolId, Collection) when is_atom(PoolId), is_list(Collection) -> %%==================================================================== %% gen_server callbacks @@ -360,9 +398,11 @@ drop_database(PoolId) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init(_) -> - {ok, HN} = inet:gethostname(), - <> = erlang:md5(HN), - {ok, #state{oid_index=1, hashed_hostn=HashedHN}}. + process_flag(trap_exit, true), + Pools = initialize_pools(), + {ok, HN} = inet:gethostname(), + <> = erlang:md5(HN), + {ok, #state{pools=Pools, oid_index=1, hashed_hostn=HashedHN}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -373,12 +413,60 @@ init(_) -> %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- +handle_call(pools, _From, State) -> + {reply, State#state.pools, State}; + handle_call(oid, _From, State) -> - {Total_Wallclock_Time, _} = erlang:statistics(wall_clock), - Front = Total_Wallclock_Time rem 16#ffffffff, - <<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()), - Index = State#state.oid_index rem 16#ffffff, - {reply, <>, State#state{oid_index = State#state.oid_index + 1}}; + {MegaSecs, Secs, _} = now(), + UnixTime = MegaSecs * 1000000 + Secs, + <<_:20/binary,PID:2/binary,_/binary>> = term_to_binary(self()), + Index = State#state.oid_index rem 16#ffffff, + {reply, <>, State#state{oid_index = State#state.oid_index + 1}}; + +handle_call({add_pool, PoolId, Host, Port, Database, Size}, _From, #state{pools=Pools}=State) -> + {Result, Pools1} = + case proplists:is_defined(PoolId, Pools) of + true -> + Pool = proplists:get_value(PoolId, Pools), + Pool1 = do_open_connections(Pool), + {ok, [{PoolId, Pool1}|proplists:delete(PoolId, Pools)]}; + false -> + Pool = #pool{ + id=PoolId, + host=Host, + port=Port, + database=Database, + size=Size + }, + Pool1 = do_open_connections(Pool), + {ok, [{PoolId, Pool1}|Pools]} + end, + {reply, Result, State#state{pools=Pools1}}; + +handle_call({remove_pool, PoolId}, _From, #state{pools=Pools}=State) -> + {Result, Pools1} = + case proplists:is_defined(PoolId, Pools) of + true -> + {ok, lists:keydelete(PoolId, 1, Pools)}; + false -> + {not_found, Pools} + end, + {reply, Result, State#state{pools=Pools1}}; + +handle_call({pid, PoolId}, _From, #state{pools=Pools}=State) -> + case get_pool(PoolId, Pools) of + undefined -> + {reply, {undefined, undefined}, State}; + {Pool, Others} -> + case queue:out(Pool#pool.conn_pids) of + {{value, Pid}, Q2} -> + Pool1 = Pool#pool{conn_pids = queue:in(Pid, Q2), req_id = ((Pool#pool.req_id)+1)}, + Pools1 = [{PoolId, Pool1}|Others], + {reply, {Pid, Pool}, State#state{pools=Pools1}}; + {empty, _} -> + {reply, {undefined, Pool}, State} + end + end; handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}. @@ -397,8 +485,29 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, {emongo_conn, PoolId, Error}}, #state{pools=Pools}=State) -> + io:format("EXIT ~p, {emongo_conn, ~p, ~p} in ~p~n", + [Pid, PoolId, Error, ?MODULE]), + State1 = + case get_pool(PoolId, Pools) of + undefined -> + State; + {Pool, Others} -> + Pids1 = queue:filter(fun(Item) -> Item =/= Pid end, Pool#pool.conn_pids), + Pool1 = Pool#pool{conn_pids = Pids1}, + case do_open_connections(Pool1) of + {error, _Reason} -> + Pools1 = Others; + Pool2 -> + Pools1 = [{PoolId, Pool2}|Others] + end, + State#state{pools=Pools1} + end, + {noreply, State1}; + +handle_info(Info, State) -> + io:format("WARNING: unrecognized message in ~p: ~p~n", [?MODULE, Info]), + {noreply, State}. %%-------------------------------------------------------------------- %% Function: terminate(Reason, State) -> void() @@ -420,154 +529,158 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -get_pid_pool(PoolId, RequestCount) -> - case emongo_sup:worker_pid(PoolId, emongo_sup:pools(), RequestCount) of - undefined -> throw(emongo_busy); - Val -> Val - end. - - -fold_documents(F, Value, Resp) -> - try - lists:foldl(F, Value, Resp#response.documents) - catch - Class:Exception -> - kill_cursor(Resp#response.pool_id, Resp#response.cursor_id), - erlang:Class(Exception) - end. - - -kill_cursor(_, 0) -> - ok; -kill_cursor(PoolId, CursorID) -> - {Pid, _Database, ReqId} = get_pid_pool(PoolId, 1), - Packet = emongo_packet:kill_cursors(ReqId, [CursorID]), - emongo_server:send(Pid, ReqId, Packet). - +initialize_pools() -> + case application:get_env(emongo, pools) of + undefined -> + []; + {ok, Pools} -> + [begin + Pool = #pool{ + id = PoolId, + size = proplists:get_value(size, Props, 1), + host = proplists:get_value(host, Props, "localhost"), + port = proplists:get_value(port, Props, 27017), + database = proplists:get_value(database, Props, "test") + }, + {PoolId, do_open_connections(Pool)} + end || {PoolId, Props} <- Pools] + end. + +do_open_connections(#pool{conn_pids=Pids, size=Size}=Pool) -> + case queue:len(Pids) < Size of + true -> + case emongo_conn:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port) of + {error, Reason} -> + throw({error, Reason}); + Pid -> + do_open_connections(Pool#pool{conn_pids = queue:in(Pid, Pids)}) + end; + false -> + Pool + end. + +get_pool(PoolId, Pools) -> + get_pool(PoolId, Pools, []). + +get_pool(_, [], _) -> + undefined; + +get_pool(PoolId, [{PoolId, Pool}|Tail], Others) -> + {Pool, lists:append(Tail, Others)}; + +get_pool(PoolId, [Pool|Tail], Others) -> + get_pool(PoolId, Tail, [Pool|Others]). dec2hex(Dec) -> - dec2hex(<<>>, Dec). + dec2hex(<<>>, Dec). dec2hex(N, <>) -> - dec2hex(<>, Rem); + dec2hex(<>, Rem); dec2hex(N,<<>>) -> - N. + N. hex2dec(Hex) when is_list(Hex) -> - hex2dec(list_to_binary(Hex)); + hex2dec(list_to_binary(Hex)); hex2dec(Hex) -> - hex2dec(<<>>, Hex). + hex2dec(<<>>, Hex). hex2dec(N,<>) -> - hex2dec(<>, Rem); + hex2dec(<>, Rem); hex2dec(N,<<>>) -> - N. + N. create_query(Options, Selector) -> - Selector1 = transform_selector(Selector), - create_query(Options, #emo_query{}, Selector1, []). + Selector1 = transform_selector(Selector), + create_query(Options, #emo_query{}, Selector1, []). create_query([], QueryRec, QueryDoc, []) -> - QueryRec#emo_query{q=QueryDoc}; + QueryRec#emo_query{q=QueryDoc}; create_query([], QueryRec, [], OptDoc) -> - QueryRec#emo_query{q=(OptDoc ++ [{<<"$query">>, [{none, none}]}])}; + QueryRec#emo_query{q=OptDoc}; create_query([], QueryRec, QueryDoc, OptDoc) -> - QueryRec#emo_query{q=(OptDoc ++ [{<<"$query">>, QueryDoc}])}; + QueryRec#emo_query{q=(OptDoc ++ [{<<"query">>, QueryDoc}])}; create_query([{limit, Limit}|Options], QueryRec, QueryDoc, OptDoc) -> - QueryRec1 = QueryRec#emo_query{limit=Limit}, - create_query(Options, QueryRec1, QueryDoc, OptDoc); + QueryRec1 = QueryRec#emo_query{limit=Limit}, + create_query(Options, QueryRec1, QueryDoc, OptDoc); create_query([{offset, Offset}|Options], QueryRec, QueryDoc, OptDoc) -> - QueryRec1 = QueryRec#emo_query{offset=Offset}, - create_query(Options, QueryRec1, QueryDoc, OptDoc); + QueryRec1 = QueryRec#emo_query{offset=Offset}, + create_query(Options, QueryRec1, QueryDoc, OptDoc); create_query([{orderby, Orderby}|Options], QueryRec, QueryDoc, OptDoc) -> - Orderby1 = [{Key, case Dir of desc -> -1; _ -> 1 end}|| {Key, Dir} <- Orderby], - OptDoc1 = [{<<"$orderby">>, Orderby1}|OptDoc], - create_query(Options, QueryRec, QueryDoc, OptDoc1); + Orderby1 = [{Key, case Dir of desc -> -1; _ -> 1 end}|| {Key, Dir} <- Orderby], + OptDoc1 = [{<<"orderby">>, Orderby1}|OptDoc], + create_query(Options, QueryRec, QueryDoc, OptDoc1); create_query([{fields, Fields}|Options], QueryRec, QueryDoc, OptDoc) -> - QueryRec1 = QueryRec#emo_query{field_selector=[{Field, 1} || Field <- Fields]}, - create_query(Options, QueryRec1, QueryDoc, OptDoc); + QueryRec1 = QueryRec#emo_query{field_selector=convert_fields(Fields)}, + create_query(Options, QueryRec1, QueryDoc, OptDoc); -create_query([explain | Options], QueryRec, QueryDoc, OptDoc) -> - create_query(Options, QueryRec, QueryDoc, [{<<"$explain">>,true}|OptDoc]); +create_query([Opt|Options], QueryRec, QueryDoc, OptDoc) when is_integer(Opt) -> + QueryRec1 = QueryRec#emo_query{opts=[Opt|QueryRec#emo_query.opts]}, + create_query(Options, QueryRec1, QueryDoc, OptDoc); + +create_query([{<<_/binary>>, _} = NV | Options], QueryRec, QueryDoc, OptDoc) -> + create_query(Options, QueryRec, QueryDoc, [NV | OptDoc]); create_query([_|Options], QueryRec, QueryDoc, OptDoc) -> - create_query(Options, QueryRec, QueryDoc, OptDoc). - -fam_options([], OptDoc) -> OptDoc; -fam_options([{sort, _}=Opt | Options], OptDoc) -> - fam_options(Options, [opt(Opt) | OptDoc]); -fam_options([{remove, _}=Opt | Options], OptDoc) -> - fam_options(Options, [opt(Opt) | OptDoc]); -fam_options([{update, _} | Options], OptDoc) -> - fam_options(Options, OptDoc); % update is a param to find_and_modify/5 -fam_options([{new, _}=Opt | Options], OptDoc) -> - fam_options(Options, [opt(Opt) | OptDoc]); -fam_options([{fields, _}=Opt | Options], OptDoc) -> - fam_options(Options, [opt(Opt) | OptDoc]); -fam_options([{upsert, _}=Opt | Options], OptDoc) -> - fam_options(Options, [opt(Opt) | OptDoc]); -fam_options([_ | Options], OptDoc) -> - fam_options(Options, OptDoc). - -opt({Atom, Val}) when is_atom(Atom) -> - {list_to_binary(atom_to_list(Atom)), Val}. + create_query(Options, QueryRec, QueryDoc, OptDoc). transform_selector(Selector) -> - transform_selector(Selector, []). + transform_selector(Selector, []). transform_selector([], Acc) -> - lists:reverse(Acc); + lists:reverse(Acc); transform_selector([{where, Val}|Tail], Acc) when is_list(Val) -> - transform_selector(Tail, [{<<"$where">>, Val}|Acc]); + transform_selector(Tail, [{<<"$where">>, Val}|Acc]); transform_selector([{Key, [{_,_}|_]=Vals}|Tail], Acc) -> - Vals1 = - [case Operator of - O when O == '>'; O == gt -> - {<<"$gt">>, Val}; - O when O == '<'; O == lt -> - {<<"$lt">>, Val}; - O when O == '>='; O == gte -> - {<<"$gte">>, Val}; - O when O == '=<'; O == lte -> - {<<"$lte">>, Val}; - O when O == '=/='; O == '/='; O == ne -> - {<<"$ne">>, Val}; - in when is_list(Val) -> - {<<"$in">>, {array, Val}}; - nin when is_list(Val) -> - {<<"$nin">>, {array, Val}}; - mod when is_list(Val), length(Val) == 2 -> - {<<"$mod">>, {array, Val}}; - all when is_list(Val) -> - {<<"$all">>, {array, Val}}; - size when is_integer(Val) -> - {<<"$size">>, Val}; - exists when is_boolean(Val) -> - {<<"$exists">>, Val}; - _ -> - {Operator, Val} - end || {Operator, Val} <- Vals], - transform_selector(Tail, [{Key, Vals1}|Acc]); + Vals1 = + [case Operator of + O when O == '>'; O == gt -> + {<<"$gt">>, Val}; + O when O == '<'; O == lt -> + {<<"$lt">>, Val}; + O when O == '>='; O == gte -> + {<<"$gte">>, Val}; + O when O == '=<'; O == lte -> + {<<"$lte">>, Val}; + O when O == '=/='; O == '/='; O == ne -> + {<<"$ne">>, Val}; + in when is_list(Val) -> + {<<"$in">>, {array, Val}}; + nin when is_list(Val) -> + {<<"$nin">>, {array, Val}}; + mod when is_list(Val), length(Val) == 2 -> + {<<"$mod">>, {array, Val}}; + all when is_list(Val) -> + {<<"$all">>, {array, Val}}; + size when is_integer(Val) -> + {<<"$size">>, Val}; + exists when is_boolean(Val) -> + {<<"$exists">>, Val}; + near when is_list(Val) -> + {<<"$near">>, {array, Val}}; + _ -> + {Operator, Val} + end || {Operator, Val} <- Vals], + transform_selector(Tail, [{Key, Vals1}|Acc]); transform_selector([Other|Tail], Acc) -> - transform_selector(Tail, [Other|Acc]). + transform_selector(Tail, [Other|Acc]). -dec0($a) -> 10; -dec0($b) -> 11; -dec0($c) -> 12; -dec0($d) -> 13; -dec0($e) -> 14; -dec0($f) -> 15; -dec0(X) -> X - $0. +dec0($a) -> 10; +dec0($b) -> 11; +dec0($c) -> 12; +dec0($d) -> 13; +dec0($e) -> 14; +dec0($f) -> 15; +dec0(X) -> X - $0. hex0(10) -> $a; hex0(11) -> $b; @@ -576,3 +689,28 @@ hex0(13) -> $d; hex0(14) -> $e; hex0(15) -> $f; hex0(I) -> $0 + I. + +sync_command({Pid, Pool}, Packet1, Options) -> + Query1 = #emo_query{q=[{<<"getlasterror">>, 1}], limit=1}, + Packet2 = emongo_packet:do_query(Pool#pool.database, "$cmd", Pool#pool.req_id, Query1), + Resp = emongo_conn:send_sync(Pid, Pool#pool.req_id, Packet1, Packet2, ?TIMEOUT), + case lists:member(response_options, Options) of + true -> Resp; + false -> get_sync_result(Resp) + end. + +get_sync_result(#response{documents = [Doc]}) -> + case lists:keysearch(<<"err">>, 1, Doc) of + {value, {_, undefined}} -> ok; + {value, {_, Msg}} -> {error, Msg}; + _ -> {error, {invalid_error_message, Doc}} + end; +get_sync_result(Resp) -> + {error, {invalid_response, Resp}}. + +to_binary(V) when is_binary(V) -> V; +to_binary(V) when is_list(V) -> list_to_binary(V); +to_binary(V) when is_atom(V) -> list_to_binary(atom_to_list(V)). + +convert_fields(Fields) -> + [{Field, 1} || Field <- Fields]. diff --git a/src/emongo_app.erl b/src/emongo_app.erl index b749ae2..b216dc5 100644 --- a/src/emongo_app.erl +++ b/src/emongo_app.erl @@ -21,35 +21,17 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR %% OTHER DEALINGS IN THE SOFTWARE. -module(emongo_app). - -behaviour(application). --include("emongo.hrl"). - --export([start/2, stop/1, initialize_pools/0]). +-export([start/2, stop/1]). start(_, _) -> - {ok, Pid} = emongo_sup:start_link(), - % Pools must be initialized after emongo_sup is started instead of in - % emongo:init, because emongo_server_sup instances are dynamically added - % to the emongo_sup supervisor, which also supervises emongo gen_server. - % (otherwise get a deadlock where emongo is waiting on emongo_sup, which - % is waiting on emongo) - initialize_pools(), - {ok, Pid}. + emongo:start_link(). + %supervisor:start_link({local, ?MODULE}, ?MODULE, []). stop(_) -> ok. -initialize_pools() -> - F = fun({PoolId, Props}) -> - Host = proplists:get_value(host, Props, "localhost"), - Port = proplists:get_value(port, Props, 27017), - Database = proplists:get_value(database, Props, "test"), - Size = proplists:get_value(size, Props, 1), - emongo:add_pool(PoolId, Host, Port, Database, Size) - end, - - case application:get_env(emongo, pools) of - undefined -> ok; - {ok, Pools} -> lists:foreach(F, Pools) - end. +%init(_) -> +% {ok, {{one_for_one, 10, 10}, [ +% {emongo, {emongo, start_link, []}, permanent, 5000, worker, [emongo]} +% ]}}. diff --git a/src/emongo_bson.erl b/src/emongo_bson.erl index 7ae0048..4ab8fac 100644 --- a/src/emongo_bson.erl +++ b/src/emongo_bson.erl @@ -31,21 +31,13 @@ encode([{_,_}|_]=List) when is_list(List) -> Bin = iolist_to_binary([encode_key_value(Key, Val) || {Key, Val} <- List]), <<(size(Bin)+5):32/little-signed, Bin/binary, 0:8>>. -encode_key_value(none, none) -> - <<>>; - %% FLOAT encode_key_value(Key, Val) when is_float(Val) -> Key1 = encode_key(Key), <<1, Key1/binary, 0, Val:64/little-signed-float>>; %% STRING -%% binary string must be already in utf8 -encode_key_value(Key, Val) when is_binary(Val) -> - Key1 = encode_key(Key), - <<2, Key1/binary, 0, (byte_size(Val)+1):32/little-signed, Val/binary, 0:8>>; - -encode_key_value(Key, Val) when Val == [] orelse (is_list(Val) andalso length(Val) > 0 andalso is_integer(hd(Val))) -> +encode_key_value(Key, Val) when is_binary(Val) orelse Val == [] orelse (is_list(Val) andalso length(Val) > 0 andalso is_integer(hd(Val))) -> Key1 = encode_key(Key), case unicode:characters_to_binary(Val) of {error, Bin, RestData} -> @@ -84,11 +76,6 @@ encode_key_value(Key, {binary, SubType, Val}) when is_integer(SubType), is_binar encode_key_value(Key, {oid, HexString}) when is_list(HexString) -> encode_key_value(Key, {oid, emongo:hex2dec(HexString)}); -%% UNDEFINED -encode_key_value(Key, undefined) -> - Key1 = encode_key(Key), - <<6, Key1/binary, 0>>; - encode_key_value(Key, {oid, OID}) when is_binary(OID) -> Key1 = encode_key(Key), <<7, Key1/binary, 0, OID/binary>>; @@ -119,8 +106,8 @@ encode_key_value(Key, {datetime, Val}) -> encode_key_value(Key, {{Year, Month, Day}, {Hour, Min, Secs}}) when is_integer(Year), is_integer(Month), is_integer(Day), is_integer(Hour), is_integer(Min), is_integer(Secs) -> encode_key_value(Key, {datetime, {{Year, Month, Day}, {Hour, Min, Secs}}}); -%% NULL -encode_key_value(Key, null) -> +%% VOID +encode_key_value(Key, undefined) -> Key1 = encode_key(Key), <<10, Key1/binary, 0>>; @@ -142,7 +129,7 @@ encode_key_value(Key, Val) when is_integer(Val) -> <<18, Key1/binary, 0, Val:64/little-signed>>; encode_key_value(Key, Val) -> - exit({oh_balls, Key, Val}). + exit({emongo_bson_encode_error, Key, Val}). encode_key(Key) when is_binary(Key) -> Key; @@ -211,10 +198,6 @@ decode_value(5, <<_Size:32/little-signed, 2:8/little, BinSize:32/little-signed, decode_value(5, <>) -> {{binary, SubType, BinData}, Tail}; -%% VOID -decode_value(6, Tail) -> - {undefined, Tail}; - %% OID decode_value(7, <>) -> {{oid, OID}, Tail}; @@ -236,19 +219,15 @@ decode_value(9, <>) -> %% VOID decode_value(10, Tail) -> - {null, Tail}; + {undefined, Tail}; %% INT decode_value(16, <>) -> {Int, Tail}; -%% Timestamp -decode_value(17, <>) -> - {{timestamp, Inc, Timestamp}, Tail}; - %% LONG decode_value(18, <>) -> {Int, Tail}; -decode_value(Type, Tail) -> - exit({emongo_unknown_type, Type, Tail}). +decode_value(Type, Value) -> + exit({emongo_bson_decode_error, Type, Value}). diff --git a/src/emongo_conn.erl b/src/emongo_conn.erl new file mode 100644 index 0000000..01b5bfc --- /dev/null +++ b/src/emongo_conn.erl @@ -0,0 +1,152 @@ +%% Copyright (c) 2009 Jacob Vorreuter +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without +%% restriction, including without limitation the rights to use, +%% copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the +%% Software is furnished to do so, subject to the following +%% conditions: +%% +%% The above copyright notice and this permission notice shall be +%% included in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +%% OTHER DEALINGS IN THE SOFTWARE. +-module(emongo_conn). + +-export([start_link/3, init/4, send/3, send_sync/5, send_recv/4]). + +-record(request, {req_id, requestor}). +-record(state, {pool_id, socket, requests}). + +-include("emongo.hrl"). + +start_link(PoolId, Host, Port) -> + proc_lib:start_link(?MODULE, init, [PoolId, Host, Port, self()]). + +init(PoolId, Host, Port, Parent) -> + Socket = open_socket(Host, Port), + proc_lib:init_ack(Parent, self()), + loop(#state{pool_id=PoolId, socket=Socket, requests=[]}, <<>>). + +send(Pid, ReqID, Packet) -> + case gen:call(Pid, '$emongo_conn_send', {ReqID, Packet}) of + {ok, Result} -> Result; + {error, Reason} -> exit(Reason) + end. + +send_sync(Pid, ReqID, Packet1, Packet2, Timeout) -> + try + {ok, Resp} = gen:call(Pid, '$emongo_conn_send_sync', {ReqID, Packet1, Packet2}, Timeout), + Documents = emongo_bson:decode(Resp#response.documents), + Resp#response{documents=Documents} + catch + exit:timeout-> + %Clear the state from the timed out call + gen:call(Pid, '$emongo_recv_timeout', ReqID, Timeout), + exit(timeout) + end. + +send_recv(Pid, ReqID, Packet, Timeout) -> + try + {ok, Resp} = gen:call(Pid, '$emongo_conn_send_recv', {ReqID, Packet}, Timeout), + Documents = emongo_bson:decode(Resp#response.documents), + Resp#response{documents=Documents} + catch + exit:timeout-> + %Clear the state from the timed out call + gen:call(Pid, '$emongo_recv_timeout', ReqID, Timeout), + exit(timeout) + end. + +loop(State, Leftover) -> + {NewState, NewLeftover} = try + Socket = State#state.socket, + receive + {'$emongo_conn_send', {From, Mref}, {_ReqID, Packet}} -> + gen_tcp:send(Socket, Packet), + gen:reply({From, Mref}, ok), + {State, Leftover}; + {'$emongo_conn_send_sync', {From, Mref}, {ReqID, Packet1, Packet2}} -> + % Packet2 is the packet containing getlasterror. + % Send both packets in the same TCP packet for performance reasons. It's + % about 3 times faster. + gen_tcp:send(Socket, <>), + Request = #request{req_id=ReqID, requestor={From, Mref}}, + State1 = State#state{requests=[{ReqID, Request}|State#state.requests]}, + {State1, Leftover}; + {'$emongo_conn_send_recv', {From, Mref}, {ReqID, Packet}} -> + gen_tcp:send(Socket, Packet), + Request = #request{req_id=ReqID, requestor={From, Mref}}, + State1 = State#state{requests=[{ReqID, Request}|State#state.requests]}, + {State1, Leftover}; + {'$emongo_recv_timeout', {From, Mref}, ReqID} -> + case lists:keytake(ReqID, 1, State#state.requests) of + false -> + gen:reply({From, Mref}, ok), + {State, Leftover}; + {value, _, Others} -> + gen:reply({From, Mref}, ok), + %Loop again, but drop any leftovers to + %prevent the loop response processing + %from getting out of sync and causing all + %subsequent calls to send_recv to fail. + %loop(State#state{requests=Others}, <<>>) + + % Leave Leftover there because it could be from a different + % request than the one timing out. This Pid is still in the + % pool and can still be used by other processes. If the + % data gets out of sync, the socket needs to be closed and + % reopened. + {State#state{requests=Others}, Leftover} + end; + {tcp, Socket, Data} -> + {_NewState, _NewLeftover} = + process_bin(State, <>); + {tcp_closed, Socket} -> + exit(tcp_closed); + {tcp_error, Socket, Reason} -> + exit(Reason) + end + catch _:Error -> + % The exit message has to include the pool_id and follow a format the emongo + % module expects so this process can be restarted. + exit({?MODULE, State#state.pool_id, Error}) + end, + loop(NewState, NewLeftover). + +open_socket(Host, Port) -> + case gen_tcp:connect(Host, Port, [binary, {active, true}]) of + {ok, Sock} -> + Sock; + {error, Reason} -> + exit({failed_to_open_socket, Reason}) + end. + +process_bin(State, <<>>) -> + {State, <<>>}; +process_bin(State, Bin) -> + case emongo_packet:decode_response(Bin) of + undefined -> + {State, Bin}; + {Resp, Tail} -> + ResponseTo = (Resp#response.header)#header.response_to, + NewState = case lists:keytake(ResponseTo, 1, State#state.requests) of + false -> + State; + {value, {_ReqID, Request}, Others} -> + gen:reply(Request#request.requestor, Resp), + State#state{requests=Others} + end, + % Continue processing Tail in case there's another complete message + % in it. + process_bin(NewState, Tail) + end. diff --git a/src/emongo_packet.erl b/src/emongo_packet.erl index d4b96b6..736602f 100644 --- a/src/emongo_packet.erl +++ b/src/emongo_packet.erl @@ -22,40 +22,23 @@ %% OTHER DEALINGS IN THE SOFTWARE. -module(emongo_packet). --export([update/7, insert/4, do_query/4, get_more/5, +-export([update/7, insert/4, do_query/4, get_more/5, delete/4, kill_cursors/2, msg/2, decode_response/1, - ensure_index/5, get_last_error/2, server_status/2]). + ensure_index/4]). -include("emongo.hrl"). -get_last_error(Database, ReqId) -> - %%Query = #emo_query{q=[{<<"getlasterror">>, 1}], limit=1}, - %%do_query(Database, "$cmd", ReqId, Query). - DatabaseLength = byte_size(Database), - <<(57+DatabaseLength):32/little-signed, ReqId:32/little-signed, 0:32, - ?OP_QUERY:32/little-signed, 0:32, Database/binary, ".$cmd", 0, 0:32, 1:32/little-signed, - %% Encoded document - 23:32/little-signed, 16, "getlasterror", 0, 1:32/little-signed, 0>>. - -server_status(Database, ReqId) -> - %%Query = #emo_query{q=[{<<"serverStatus">>, 1}], limit=1}, - %%do_query(Database, "$cmd", ReqId, Query). - DatabaseLength = byte_size(Database), - <<(57+DatabaseLength):32/little-signed, ReqId:32/little-signed, 0:32, - ?OP_QUERY:32/little-signed, 0:32, Database/binary, ".$cmd", 0, 0:32, 1:32/little-signed, - %% Encoded document - 23:32/little-signed, 16, "serverStatus", 0, 1:32/little-signed, 0>>. - -update(Database, Collection, ReqID, Upsert, MultiUpdate, Selector, Document) -> +update(Database, Collection, ReqID, Upsert, Multi, Selector, Document) -> FullName = unicode:characters_to_binary([Database, ".", Collection]), EncodedSelector = emongo_bson:encode(Selector), EncodedDocument = emongo_bson:encode(Document), BinUpsert = if Upsert == true -> 1; true -> 0 end, - BinMultiUpdate = if MultiUpdate == true -> 2#10; true -> 0 end, - OptsSum = BinUpsert + BinMultiUpdate, - Message = <<0:32, FullName/binary, 0, OptsSum:32/little-signed, EncodedSelector/binary, EncodedDocument/binary>>, + BinMulti = if Multi == true -> 1; true -> 0 end, + Flags = (BinMulti bsl 1) bor BinUpsert, + Message = <<0:32, FullName/binary, 0, Flags:32/little-signed, EncodedSelector/binary, EncodedDocument/binary>>, Length = byte_size(Message), - <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_UPDATE:32/little-signed, Message/binary>>. + <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, + ?OP_UPDATE:32/little-signed, Message/binary>>. insert(Database, Collection, ReqID, Documents) -> FullName = unicode:characters_to_binary([Database, ".", Collection]), @@ -95,13 +78,12 @@ delete(Database, Collection, ReqID, Selector) -> Length = byte_size(Message), <<(Length+16):32/little-signed, ReqID:32/little-signed, 0:32, ?OP_DELETE:32/little-signed, Message/binary>>. -ensure_index(Database, Collection, ReqID, Keys, Unique) -> +ensure_index(Database, Collection, ReqID, Keys) -> FullName = unicode:characters_to_binary([Database, ".system.indexes"]), Selector = [ {<<"name">>, index_name(Keys, <<>>)}, {<<"ns">>, unicode:characters_to_binary([Database, ".", Collection])}, - {<<"key">>, Keys}, - {<<"unique">>, Unique}], + {<<"key">>, Keys}], EncodedDocument = emongo_bson:encode(Selector), Message = <<0:32, FullName/binary, 0, EncodedDocument/binary>>, Length = byte_size(Message), @@ -131,10 +113,7 @@ decode_response(<> = Message, Resp = #response{ - header = #header{message_length = Length, - request_id = ReqID, - response_to = RespTo, - op_code = Op}, + header = {header, Length, ReqID, RespTo, Op}, response_flag = RespFlag, cursor_id = CursorID, offset = StartingFrom, @@ -143,9 +122,10 @@ decode_response(< - undefined. + undefined. index_name([], Bin) -> Bin; index_name([{Key, Val}|Tail], Bin) -> diff --git a/src/emongo_pool.erl b/src/emongo_pool.erl deleted file mode 100644 index a4b1451..0000000 --- a/src/emongo_pool.erl +++ /dev/null @@ -1,213 +0,0 @@ -%%%------------------------------------------------------------------- -%%% Description : emongo pool supervisor -%%%------------------------------------------------------------------- --module(emongo_pool). - --behaviour(gen_server). - -%% API --export([start_link/5, pid/1, pid/2]). - --deprecated([pid/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --include("emongo.hrl"). - --define(POLL_INTERVAL, 10000). --define(POLL_TIMEOUT, 9000). - --record(pool, {id, - host, - port, - database, - size, - active=true, - poll=none, - conn_pid=pqueue:new(), - req_id=1}). - -%% messages --define(pid(RequestCount), {pid, RequestCount}). --define(poll(), poll). --define(poll_timeout(Pid, ReqId, Tag), {poll_timeout, Pid, ReqId, Tag}). - -%% to be removed next release --define(old_pid(), pid). - - -%%%%%%%%%%%%%%%% -%% public api %% -%%%%%%%%%%%%%%%% - -start_link(PoolId, Host, Port, Database, Size) -> - gen_server:start_link(?MODULE, [PoolId, Host, Port, Database, Size], []). - -pid(Pid) -> - gen_server:call(Pid, pid). - -pid(Pid, RequestCount) -> - gen_server:call(Pid, {pid, RequestCount}). - -%%%%%%%%%%%%%%%%%%%%%%%%%% -%% gen_server callbacks %% -%%%%%%%%%%%%%%%%%%%%%%%%%% - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([PoolId, Host, Port, Database, Size]) -> - process_flag(trap_exit, true), - - Pool0 = #pool{id = PoolId, - host = Host, - port = Port, - database = unicode:characters_to_binary(Database), - size = Size - }, - - {noreply, Pool} = handle_info(?poll(), Pool0), - {ok, Pool}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call(?old_pid(), _From, #pool{active=true}=State) -> - {Reply, NewState} = get_pid(State, 1), - {reply, Reply, NewState}; - -handle_call(?pid(RequestCount), _From, #pool{active=true}=State) -> - {Reply, NewState} = get_pid(State, RequestCount), - {reply, Reply, NewState}; - -handle_call(_Request, _From, State) -> - {reply, undefined, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({'EXIT', Pid, Reason}, #pool{conn_pid=Pids}=State) -> - error_logger:error_msg("Pool ~p deactivated by worker death: ~p~n", - [State#pool.id, Reason]), - - Pids1 = pqueue:filter(fun(Item) -> Item =/= Pid end, Pids), - {noreply, State#pool{conn_pid = Pids1, active=false}}; - -handle_info(?poll(), State) -> - erlang:send_after(?POLL_INTERVAL, self(), poll), - NewState = do_open_connections(State), - {noreply, NewState}; - -handle_info(?poll_timeout(Pid, ReqId, Tag), #pool{poll={Tag, _}}=State) -> - case catch emongo_server:recv(Pid, ReqId, 0, Tag) of - #response{} -> - {noreply, State#pool{active=true, poll=none}}; - _ -> - {noreply, State#pool{active=false, poll=none}} - end; - -handle_info({Tag, _}, #pool{poll={Tag, TimerRef}}=State) -> - _Time = erlang:cancel_timer(TimerRef), - %%io:format("polling ~p success: ~p~n", [State#pool.id, Time]), - {noreply, State#pool{active=true, poll=none}}; - -handle_info(Info, State) -> - error_logger:info_msg("Pool ~p unknown message: ~p~n", - [State#pool.id, Info]), - - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(OldVsn, State, _Extra) -> - error_logger:info_msg("emongo_pool:code_change(~p, ...)~n", [OldVsn]), - - State1 = case queue:is_queue(State#pool.conn_pid) of - false -> - State; - true -> - State#pool{conn_pid = queue2pqueue(State#pool.conn_pid, pqueue:new())} - end, - - {ok, State1}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -queue2pqueue(Queue, PQueue) -> - case queue:out(Queue) of - {empty, _} -> - PQueue; - {{value, Item}, NewQueue} -> - queue2pqueue(NewQueue, pqueue:push(1, Item, PQueue)) - end. - -get_pid(#pool{database=Database, conn_pid=Pids, req_id=ReqId}=State, RequestCount) -> - case pqueue:pop(Pids) of - {Pid, Q2} -> - NewState = State#pool{conn_pid=pqueue:push(RequestCount, Pid, Q2), - req_id=(ReqId + RequestCount)}, - {{Pid, Database, ReqId}, NewState}; - empty -> - {undefined, State} - end. - -do_open_connections(#pool{conn_pid=Pids, size=Size}=Pool) -> - case pqueue:size(Pids) < Size of - true -> - case emongo_server:start_link(Pool#pool.id, Pool#pool.host, Pool#pool.port) of - {error, _Reason} -> - Pool#pool{active=false}; - {ok, Pid} -> - do_open_connections(Pool#pool{conn_pid = pqueue:push(1, Pid, Pids)}) - end; - false -> - do_poll(Pool) - end. - -do_poll(Pool) -> - case get_pid(Pool, 2) of - {{Pid, Database, ReqId}, NewPool} -> - PacketLast = emongo_packet:get_last_error(Database, ReqId), - Tag = emongo_server:send_recv_nowait(Pid, ReqId, PacketLast), - TimerRef = erlang:send_after(?POLL_TIMEOUT, self(), ?poll_timeout(Pid, ReqId, Tag)), - NewPool#pool{poll={Tag, TimerRef}}; - _ -> - Pool#pool{active=false} - end. diff --git a/src/emongo_router.erl b/src/emongo_router.erl deleted file mode 100644 index 87effd7..0000000 --- a/src/emongo_router.erl +++ /dev/null @@ -1,191 +0,0 @@ -%%%------------------------------------------------------------------- -%%% Description : balancer for emongo_pool connections -%%%------------------------------------------------------------------- --module(emongo_router). - --behaviour(gen_server). - -%% API --export([start_link/2, pid/1, pid/2]). - --deprecated([pid/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {id, - active, - passive, - timer=undefined - }). - --define(POOL_ID(BalancerId, PoolIdx), {BalancerId, PoolIdx}). --define(RECHECK_TIME, 9500). - -%% messages --define(pid(RequestCount), {pid, RequestCount}). - -%% to be removed next release --define(old_pid(), pid). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link(BalId, Pools) -> - gen_server:start_link(?MODULE, [BalId, Pools], []). - - -pid(BalancerPid) -> - pid(BalancerPid, 1). - - -pid(BalancerPid, RequestCount) -> - gen_server:call(BalancerPid, {pid, RequestCount}). -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init([BalId, Pools]) -> - self() ! {init, Pools}, - {ok, #state{id = BalId, - active = [], - passive = []}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messagesp -%%-------------------------------------------------------------------- -handle_call(?old_pid(), _From, State) -> - {Pid, NewState} = get_pid(State, emongo_sup:pools(), 1), - {reply, Pid, NewState}; - -handle_call(?pid(RequestCount), _From, State) -> - {Pid, NewState} = get_pid(State, emongo_sup:pools(), RequestCount), - {reply, Pid, NewState}; - -handle_call(stop_children, _, #state{id=BalId, active=Active, passive=Passive}=State) -> - Fun = fun(PoolIdx) -> - emongo_sup:stop_pool(?POOL_ID(BalId, PoolIdx)), - false - end, - lists:foreach(Fun, Passive), - lists:foreach(Fun, Active), - - {reply, ok, State#state{active=[], passive=[]}}; - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({init, Pools}, #state{id=BalId, active=Active}=State) -> - Fun = fun({Host, Port, Database, Size}, {PoolIdx, PoolList}) -> - case emongo_sup:start_pool(?POOL_ID(BalId, PoolIdx), - Host, Port, Database, Size) of - {ok, _} -> - ok; - {error, {already_started, _}} -> - ok - end, - {PoolIdx + 1, [PoolIdx | PoolList]} - end, - {_, PoolList} = lists:foldl(Fun, {1, Active}, Pools), - {noreply, State#state{active=lists:sort(PoolList)}}; - -handle_info(recheck, State) -> - {noreply, activate(State, [], emongo_sup:pools())}; - -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -get_pid(#state{id=BalId, active=Active, passive=Passive, timer=Timer}=State, Pools, RequestCount) -> - case Active of - [PoolIdx | Active2] -> - case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools, RequestCount) of - undefined -> - error_logger:info_msg("pool ~p is disabled!~n", [?POOL_ID(BalId, PoolIdx)]), - - get_pid(State#state{active=Active2, - passive=[PoolIdx | Passive], - timer=set_timer(Timer) - }, Pools, RequestCount); - Pid -> - {Pid, State} - end; - [] -> - {undefined, State} - end. - - -set_timer(undefined) -> - erlang:send_after(?RECHECK_TIME, self(), recheck); -set_timer(TimerRef) -> - TimerRef. - - -activate(#state{passive=[], timer=_TimerRef}=State, [], _) -> - State#state{timer=undefined}; - -activate(#state{passive=[]}=State, Passive, _) -> - State#state{passive=Passive, timer=erlang:send_after(?RECHECK_TIME, self(), recheck)}; - -activate(#state{id=BalId, active=Active, passive=[PoolIdx | Passive]}=State, Acc, Pools) -> - case emongo_sup:worker_pid(?POOL_ID(BalId, PoolIdx), Pools, 0) of - undefined -> - activate(State#state{passive=Passive}, [PoolIdx | Acc], Pools); - _ -> - error_logger:info_msg("pool ~p is enabled!~n", [?POOL_ID(BalId, PoolIdx)]), - activate(State#state{active=lists:umerge([PoolIdx], Active), passive=Passive}, Acc, Pools) - end. diff --git a/src/emongo_server.erl b/src/emongo_server.erl deleted file mode 100644 index 638d927..0000000 --- a/src/emongo_server.erl +++ /dev/null @@ -1,171 +0,0 @@ --module(emongo_server). - --behaviour(gen_server). - --include("emongo.hrl"). - --export([start_link/3]). - --export([send/3, send/2, send_recv/4]). --export([send_recv_nowait/3, recv/4]). - --deprecated([send/3]). - -%% gen_server --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {pool_id, socket, requests, leftover}). - -%% messages --define(abort(ReqId), {abort, ReqId}). --define(send(Packet), {send, Packet}). --define(send_recv(ReqId, Packet, From), - {send_recv, ReqID, Packet, From}). - -%% to be removed next release --define(old_send(ReqId, Packet), {send, ReqId, Packet}). - - -start_link(PoolId, Host, Port) -> - gen_server:start_link(?MODULE, [PoolId, Host, Port], []). - - -send(Pid, _ReqID, Packet) -> - send(Pid, Packet). - -send(Pid, Packet) -> - gen_server:cast(Pid, ?send(Packet)). - - -send_recv_nowait(Pid, ReqID, Packet) -> - Tag = make_ref(), - gen_server:cast(Pid, ?send_recv(ReqID, Packet, {self(), Tag})), - Tag. - - -recv(Pid, ReqID, 0, Tag) -> - Pid ! ?abort(ReqID), - receive - {Tag, Resp} -> - Documents = emongo_bson:decode(Resp#response.documents), - Resp#response{documents=Documents} - after 0 -> - exit(emongo_timeout) - end; - -recv(Pid, ReqID, Timeout, Tag) -> - receive - {Tag, Resp} -> - Documents = emongo_bson:decode(Resp#response.documents), - Resp#response{documents=Documents} - after Timeout -> - recv(Pid, ReqID, 0, Tag) - end. - - -send_recv(Pid, ReqID, Packet, Timeout) -> - Tag = send_recv_nowait(Pid, ReqID, Packet), - recv(Pid, ReqID, Timeout, Tag). - - -%% gen_server %% - -init([PoolId, Host, Port]) -> - case gen_tcp:connect(Host, Port, [binary, {active, true}, {nodelay, true}], ?TIMEOUT) of - {ok, Socket} -> - {ok, #state{pool_id=PoolId, socket=Socket, requests=[], leftover = <<>>}}; - {error, Reason} -> - {stop, {failed_to_open_socket, Reason}} - end. - - -handle_call(_Request, _From, State) -> - {reply, undefined, State}. - - -handle_cast(?send_recv(ReqID, Packet, From), State) -> - case is_aborted(ReqID) of - true -> - {noreply, State}; - _ -> - gen_tcp:send(State#state.socket, Packet), - State1 = State#state{requests=[{ReqID, From} | State#state.requests]}, - {noreply, State1} - end; - -handle_cast(?old_send(_ReqId, Packet), State) -> - gen_tcp:send(State#state.socket, Packet), - {noreply, State}; - -handle_cast(?send(Packet), State) -> - gen_tcp:send(State#state.socket, Packet), - {noreply, State}. - - -handle_info(?abort(ReqId), #state{requests=Requests}=State) -> - State1 = State#state{requests=lists:keydelete(ReqId, 1, Requests)}, - {noreply, State1}; - -handle_info({tcp, _Socket, Data}, State) -> - Leftover = <<(State#state.leftover)/binary, Data/binary>>, - {noreply, process_bin(State#state{leftover= <<>>}, Leftover)}; - -handle_info({tcp_closed, _Socket}, _State) -> - exit(tcp_closed); - -handle_info({tcp_error, _Socket, Reason}, _State) -> - exit({tcp_error, Reason}). - - -terminate(_, State) -> gen_tcp:close(State#state.socket). - - -code_change(_Old, State, _Extra) -> {ok, State}. - -%% internal - - -process_bin(State, <<>>) -> - State; - -process_bin(State, Bin) -> - case emongo_packet:decode_response(Bin) of - undefined -> - State#state{leftover=Bin}; - - {Resp, Tail} -> - ResponseTo = (Resp#response.header)#header.response_to, - - case lists:keytake(ResponseTo, 1, State#state.requests) of - false -> - cleanup_cursor(Resp, ResponseTo, State), - process_bin(State, Tail); - - {value, {_, From}, Requests} -> - case is_aborted(ResponseTo) of - false -> - gen_server:reply( - From, - Resp#response{pool_id=State#state.pool_id}); - true -> - cleanup_cursor(Resp, ResponseTo, State) - end, - process_bin(State#state{requests=Requests}, Tail) - end - end. - - -is_aborted(ReqId) -> - receive - ?abort(ReqId) -> - true - after 0 -> - false - end. - -cleanup_cursor(#response{cursor_id=0}, _, _) -> - ok; -cleanup_cursor(#response{cursor_id=CursorID}, ReqId, State) -> - Packet = emongo_packet:kill_cursors(ReqId, [CursorID]), - gen_tcp:send(State#state.socket, Packet). diff --git a/src/emongo_sup.erl b/src/emongo_sup.erl deleted file mode 100644 index 97645ad..0000000 --- a/src/emongo_sup.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(emongo_sup). - --behaviour(supervisor). - --export([start_link/0, pools/0, worker_pid/2, worker_pid/3]). --export([start_pool/5, stop_pool/1]). --export([start_router/2, stop_router/1]). - --deprecated([worker_pid/2]). - -%% supervisor exports --export([init/1]). - -%%%%%%%%%%%%%%%% -%% public api %% -%%%%%%%%%%%%%%%% - -start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -start_router(BalId, Pools) -> - supervisor:start_child(?MODULE, - {BalId, - {emongo_router, start_link, [BalId, Pools]}, - permanent, 10000, worker, [emongo_router] - }). - -stop_router(BalId) -> - case [Pid || {PoolId, Pid, _, [emongo_router]} <- supervisor:which_children(?MODULE), PoolId =:= BalId] of - [Pid] -> - gen_server:call(Pid, stop_children), - stop_pool(BalId) - end. - - -start_pool(PoolId, Host, Port, Database, Size) -> - supervisor:start_child(?MODULE, {PoolId, - {emongo_pool, start_link, [PoolId, Host, Port, Database, Size]}, - permanent, 10000, worker, [emongo_pool] - }). - - -stop_pool(PoolPid) when is_pid(PoolPid) -> - case [PoolId || {PoolId, Pid,_,_} <- supervisor:which_children(?MODULE), Pid =:= PoolPid] of - [PoolId] -> stop_pool(PoolId); - _ -> {error, not_found} - end; - -stop_pool(PoolId) -> - supervisor:terminate_child(?MODULE, PoolId), - supervisor:delete_child(?MODULE, PoolId). - - -pools() -> - [{Id, Pid, Module} || {Id, Pid, _, [Module]} - <- supervisor:which_children(?MODULE), Module == emongo_pool]. - -worker_pid(PoolId, Pools) -> - worker_pid(PoolId, Pools, 1). - - -worker_pid(PoolPid, Pools, RequestCount) when is_pid(PoolPid) -> - case lists:keyfind(PoolPid, 2, Pools) of - {_, Pid, Module} -> - Module:pid(Pid, RequestCount); - _ -> - undefined - end; - -worker_pid(PoolId, Pools, RequestCount) -> - case lists:keyfind(PoolId, 1, Pools) of - {_, Pid, Module} -> - Module:pid(Pid, RequestCount); - _ -> - undefined - end. - - -%%%%%%%%%%%%%%%%%%%%%%%%%% -%% supervisor callbacks %% -%%%%%%%%%%%%%%%%%%%%%%%%%% - -init(_) -> - {ok, {{one_for_one, 10, 10}, [ - {emongo, {emongo, start_link, []}, - permanent, 5000, worker, [emongo]} - ]}}. diff --git a/src/pqueue.erl b/src/pqueue.erl deleted file mode 100644 index ab6feea..0000000 --- a/src/pqueue.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% Copyright (c) 2010 Belyaev Dmitry -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in -%% all copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -%% THE SOFTWARE. - -%% Is pairing heap a better realization? --module(pqueue). - --export([ - new/0, - size/1, - push/3, - pop/1, - filter/2 - ]). - --type tail() :: {integer(), [term()]}. - --record(pqueue, {size :: integer(), - head :: [term()], - tails :: [tail()]}). - --spec new() -> #pqueue{}. -new() -> - #pqueue{size = 0, head = [], tails = []}. - - --spec size(#pqueue{}) -> integer(). -size(#pqueue{size = Size}) -> - Size. - - --spec push(integer(), term(), #pqueue{}) -> #pqueue{}. -push(Priority, Item, #pqueue{size = Size, head = Head, tails = Tails}) -> - #pqueue{size = Size + 1, head = Head, tails = push_item(Tails, Priority, Item)}. - - --spec pop(#pqueue{}) -> {term(), #pqueue{}} | empty. -pop(#pqueue{size = Size, head = [Item | Head], tails = Tails}) -> - {Item, #pqueue{size = Size - 1, head = Head, tails = Tails}}; - -pop(#pqueue{size = Size, head = [], tails = [{_, Tail} | Tails]}) -> - [Item | Head] = lists:reverse(Tail), - {Item, #pqueue{size = Size - 1, head = Head, tails = Tails}}; - -pop(_) -> - empty. - - --spec filter(fun((term()) -> boolean()), #pqueue{}) -> #pqueue{}. -filter(_, #pqueue{size = 0} = Q) -> - Q; -filter(Fun, #pqueue{head = Head, tails = Tails}) -> - NewHead = [I || I <- Head, Fun(I)], - NewTails = filter_tails(Fun, Tails), - - NewSize = length(Head) + lists:sum([length(Items) || {_, Items} <- NewTails]), - #pqueue{size = NewSize, head = NewHead, tails = NewTails}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec push_item([tail()], integer(), term()) -> [tail()]. -push_item([], Priority, Item) -> - [{Priority, [Item]}]; - -push_item([{Priority, Items} | Tails], Priority, Item) -> - [{Priority, [Item | Items]} | Tails]; - -push_item([{LowerP, _} = LowerI | Tails], Priority, Item) - when LowerP < Priority -> - [LowerI | push_item(Tails, Priority - LowerP, Item)]; - -push_item([{HigherP, Items} | Tails], Priority, Item) -> - [{Priority, [Item]}, {HigherP - Priority, Items} | Tails]. - - --spec filter_tails(fun((term()) -> boolean()), [tail()]) -> [tail()]. -filter_tails(_, []) -> - []; -filter_tails(Fun, [{Priority, Items} | Tails]) -> - case [I || I <- Items, Fun(I)] of - [] -> - filter_tails(Fun, add_priority(Priority, Tails)); - NewItems -> - [{Priority, NewItems} | filter_tails(Fun, Tails)] - end. - - --spec add_priority(integer(), [tail()]) -> [tail()]. -add_priority(_, []) -> - []; -add_priority(Priority, [{NextP, Items} | Tails]) -> - [{NextP + Priority, Items} | Tails]. diff --git a/t/001-load.t b/t/001-load.t index de3f44f..b15bf5a 100644 --- a/t/001-load.t +++ b/t/001-load.t @@ -1,12 +1,12 @@ -#!/usr/bin/env escript +#!/usr/local/bin/escript %% -*- erlang -*- %%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example main(_) -> - etap:plan(unknown), + etap:plan(unknown), error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), + etap_application:start_ok(emongo, "application 'emongo' started ok"), - etap:is(length(emongo:pools()), 2, "two pools exist in state"), + etap:is(length(emongo:pools()), 1, "one pool exists in state"), - etap:end_tests(). + etap:end_tests(). \ No newline at end of file diff --git a/t/002-bson.t b/t/002-bson.t index cb95f7a..621af01 100644 --- a/t/002-bson.t +++ b/t/002-bson.t @@ -1,153 +1,142 @@ -#!/usr/bin/env escript +#!/usr/local/bin/escript %% -*- erlang -*- %%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example main(_) -> - etap:plan(unknown), + etap:plan(unknown), error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), - - %% 1) data_number - (fun() -> - Val = 1.1, - BinVal = <>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_number encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_number decodes ok"), - ok - end)(), - - %% 2) data_string - (fun() -> - Val = "abc", - Val1 = unicode:characters_to_binary(Val), - BinVal = <<(byte_size(Val1)+1):32/little-signed, Val1/binary, 0:8>>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_string encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, list_to_binary(Val)}], "data_string decodes ok"), - ok - end)(), - - %% 3) data_object - (fun() -> - Val = [{"b", "c"}], - BinVal = emongo_bson:encode(Val), - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_object encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, [{<<"b">>, <<"c">>}]}], "data_object decodes ok"), - ok - end)(), - - %% 4) data_array - (fun() -> - Val = {array, ["a", "b", "c"]}, - BinVal = emongo_bson:encode([{0, "a"}, {1, "b"}, {2, "c"}]), - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_array encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, {array, [<<"a">>, <<"b">>, <<"c">>]}}], "data_array decodes ok"), - ok - end)(), - - %% 5) data_binary - (fun() -> - Val = {binary, 2, <<"abc">>}, - BinVal = <<7:32/little-signed, 2:8, 3:32/little-signed, <<"abc">>/binary>>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_binary encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_binary decodes ok"), - ok - end)(), - - %% 6) data_undefined - (fun() -> - Val = undefined, - BinVal = <<>>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_undefined encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_null decodes ok"), - ok - end)(), - - %% 7) data_oid - (fun() -> - Val1 = {oid, <<255,255,255,255,255,255,255,255,255,255,255,255>>}, - BinVal1 = <<255,255,255,255,255,255,255,255,255,255,255,255>>, - Size1 = size(BinVal1) + 8, - Encoded1 = emongo_bson:encode([{<<"a">>, Val1}]), - etap:is(Encoded1, <>, "data_oid dec encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded1)), [{<<"a">>, Val1}], "data_oid decodes ok"), - - Val2 = {oid, "ffffffffffffffffffffffff"}, - BinVal2 = emongo:hex2dec("ffffffffffffffffffffffff"), - Size2 = size(BinVal2) + 8, - Encoded2 = emongo_bson:encode([{<<"a">>, Val2}]), - etap:is(Encoded2, <>, "data_oid hex encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded2)), [{<<"a">>, Val1}], "data_oid decodes ok"), - - ok - end)(), - - %% 8) data_boolean - (fun() -> - Val = true, - BinVal = <<1:8>>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_boolean encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_boolean decodes ok"), - ok - end)(), - - %% 9) data_date - (fun() -> - {MegaSecs,Secs,MicroSecs} = Val = now(), - Secs1 = (MegaSecs * 1000000) + Secs, - Epoch = Secs1 * 1000 + trunc(MicroSecs / 1000), - BinVal = <>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_date encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, {MegaSecs,Secs,erlang:trunc(MicroSecs / 1000) * 1000}}], "data_date decodes ok"), - ok - end)(), - - %% 10) data_null - (fun() -> - Val = null, - BinVal = <<>>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_null encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_null decodes ok"), - ok - end)(), - - %% 16) data_int - (fun() -> - Val = 11, - BinVal = <>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_int encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_int decodes ok"), - ok - end)(), - - %% 18) data_long - (fun() -> - Val = 5275387038659964208, - BinVal = <>, - Size = size(BinVal) + 8, - Encoded = emongo_bson:encode([{<<"a">>, Val}]), - etap:is(Encoded, <>, "data_number encodes ok"), - etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_number decodes ok"), - ok - end)(), - - etap:end_tests(). + etap_application:start_ok(emongo, "application 'emongo' started ok"), + + %% 1) data_number + (fun() -> + Val = 1.1, + BinVal = <>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_number encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_number decodes ok"), + ok + end)(), + + %% 2) data_string + (fun() -> + Val = "abc", + Val1 = unicode:characters_to_binary(Val), + BinVal = <<(byte_size(Val1)+1):32/little-signed, Val1/binary, 0:8>>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_string encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, list_to_binary(Val)}], "data_string decodes ok"), + ok + end)(), + + %% 3) data_object + (fun() -> + Val = [{"b", "c"}], + BinVal = emongo_bson:encode(Val), + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_object encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, [{<<"b">>, <<"c">>}]}], "data_object decodes ok"), + ok + end)(), + + %% 4) data_array + (fun() -> + Val = {array, ["a", "b", "c"]}, + BinVal = emongo_bson:encode([{0, "a"}, {1, "b"}, {2, "c"}]), + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_array encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, {array, [<<"a">>, <<"b">>, <<"c">>]}}], "data_array decodes ok"), + ok + end)(), + + %% 5) data_binary + (fun() -> + Val = {binary, 2, <<"abc">>}, + BinVal = <<7:32/little-signed, 2:8, 3:32/little-signed, <<"abc">>/binary>>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_binary encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_binary decodes ok"), + ok + end)(), + + %% 7) data_oid + (fun() -> + Val1 = {oid, <<255,255,255,255,255,255,255,255,255,255,255,255>>}, + BinVal1 = <<255,255,255,255,255,255,255,255,255,255,255,255>>, + Size1 = size(BinVal1) + 8, + Encoded1 = emongo_bson:encode([{<<"a">>, Val1}]), + etap:is(Encoded1, <>, "data_oid dec encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded1)), [{<<"a">>, Val1}], "data_oid decodes ok"), + + Val2 = {oid, "ffffffffffffffffffffffff"}, + BinVal2 = emongo:hex2dec("ffffffffffffffffffffffff"), + Size2 = size(BinVal2) + 8, + Encoded2 = emongo_bson:encode([{<<"a">>, Val2}]), + etap:is(Encoded2, <>, "data_oid hex encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded2)), [{<<"a">>, Val1}], "data_oid decodes ok"), + + ok + end)(), + + %% 8) data_boolean + (fun() -> + Val = true, + BinVal = <<1:8>>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_boolean encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_boolean decodes ok"), + ok + end)(), + + %% 9) data_date + (fun() -> + {MegaSecs,Secs,MicroSecs} = Val = now(), + Secs1 = (MegaSecs * 1000000) + Secs, + Epoch = Secs1 * 1000 + trunc(MicroSecs / 1000), + BinVal = <>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_date encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, {MegaSecs,Secs,erlang:trunc(MicroSecs / 1000) * 1000}}], "data_date decodes ok"), + ok + end)(), + + %% 10) data_null + (fun() -> + Val = undefined, + BinVal = <<>>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_null encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_null decodes ok"), + ok + end)(), + + %% 16) data_int + (fun() -> + Val = 11, + BinVal = <>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_int encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_int decodes ok"), + ok + end)(), + + %% 18) data_long + (fun() -> + Val = 5275387038659964208, + BinVal = <>, + Size = size(BinVal) + 8, + Encoded = emongo_bson:encode([{<<"a">>, Val}]), + etap:is(Encoded, <>, "data_number encodes ok"), + etap:is(hd(emongo_bson:decode(Encoded)), [{<<"a">>, Val}], "data_number decodes ok"), + ok + end)(), + + etap:end_tests(). \ No newline at end of file diff --git a/t/003-find.t b/t/003-find.t index 80737ef..4e3e97d 100644 --- a/t/003-find.t +++ b/t/003-find.t @@ -1,65 +1,64 @@ -#!/usr/bin/env escript +#!/usr/local/bin/escript %% -*- erlang -*- %%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example main(_) -> - etap:plan(unknown), + etap:plan(unknown), error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), + etap_application:start_ok(emongo, "application 'emongo' started ok"), - emongo:delete(test1, "sushi"), - etap:is(emongo:find_all(test1, "sushi"), [], "sushi collection is empty"), + emongo:delete(test1, "sushi"), + etap:is(emongo:find(test1, "sushi"), [], "sushi collection is empty"), + + [emongo:insert(test1, "sushi", [{<<"rolls">>, I}, {<<"uni">>, <<"nigiri">>}]) || I <- lists:seq(1, 1000)], + + All = emongo:find_all(test1, "sushi", [], [{orderby, [{"rolls", asc}]}]), + etap:is(length(All), 1000, "inserted documents into sushi collection"), + + (fun() -> + OrderOK = lists:foldl( + fun(Item, I) -> + case Item of + [_, {<<"rolls">>, I}, _] -> I + 1; + _ -> -1 + end + end, 1, All), + etap:is(OrderOK, 1001, "sushi collection returned in correct order"), + ok + end)(), + + %% LIMIT && ORDERBY + (fun() -> + Docs = emongo:find(test1, "sushi", [], [{limit, 1}, {orderby, [{"rolls", asc}]}, {fields, [<<"rolls">>]}]), + etap:is(length(Docs), 1, "limit returns one document"), + etap:is(length(hd(Docs)), 2, "correct number of fields"), + etap:is(lists:nth(2, hd(Docs)), {<<"rolls">>, 1}, "returned correct document from limit query"), + ok + end)(), + + %% LIMIT && OFFSET && ORDERBY + (fun() -> + Docs = emongo:find(test1, "sushi", [], [{limit, 1}, {offset, 1}, {orderby, [{"rolls", desc}]}]), + etap:is(length(Docs), 1, "limit returns one document"), + etap:is(lists:nth(2, hd(Docs)), {<<"rolls">>, 999}, "returned correct document from offset query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{"rolls", 100}], [{orderby, [{"rolls", desc}]}]), + etap:is(proplists:get_value(<<"rolls">>, hd(Docs)), 100, "query returned correct value"), + ok + end)(), + + %% NESTED QUERIES + [emongo:insert(test1, "sushi", [{<<"seaweed">>, [{<<"sheets">>, I}]}]) || I <- lists:seq(1,10)], + + (fun() -> + Docs = emongo:find(test1, "sushi", [{"seaweed.sheets", 5}]), + etap:is(length(Docs), 1, "correct number of results from nested query"), + etap:is(proplists:get_value(<<"seaweed">>, hd(Docs)), [{<<"sheets">>, 5}], "correct result returned"), + ok + end)(), + - [emongo:insert(test1, "sushi", [{<<"rolls">>, I}, {<<"uni">>, <<"nigiri">>}]) || I <- lists:seq(1, 1000)], - - All = emongo:find_all(test1, "sushi", [], [{orderby, [{"rolls", asc}]}]), - etap:is(length(All), 1000, "inserted documents into sushi collection"), - - (fun() -> - OrderOK = lists:foldl( - fun(Item, I) -> - case Item of - [_, {<<"rolls">>, I}, _] -> I + 1; - _ -> -1 - end - end, 1, All), - etap:is(OrderOK, 1001, "sushi collection returned in correct order"), - ok - end)(), - - %% LIMIT && ORDERBY - (fun() -> - Docs = emongo:find_all(test1, "sushi", [], [{limit, 1}, {orderby, [{"rolls", asc}]}, {fields, [<<"rolls">>]}]), - etap:is(length(Docs), 1, "limit returns one document"), - etap:is(length(hd(Docs)), 2, "correct number of fields"), - etap:is(lists:nth(2, hd(Docs)), {<<"rolls">>, 1}, "returned correct document from limit query"), - ok - end)(), - - %% LIMIT && OFFSET && ORDERBY - (fun() -> - Docs = emongo:find_all(test1, "sushi", [], [{limit, 1}, {offset, 1}, {orderby, [{"rolls", desc}]}]), - etap:is(length(Docs), 1, "limit returns one document"), - etap:is(lists:nth(2, hd(Docs)), {<<"rolls">>, 999}, "returned correct document from offset query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{"rolls", 100}], [{orderby, [{"rolls", desc}]}]), - etap:is(proplists:get_value(<<"rolls">>, hd(Docs)), 100, "query returned correct value"), - ok - end)(), - - %% NESTED QUERIES - [emongo:insert(test1, "sushi", [{<<"seaweed">>, [{<<"sheets">>, I}]}]) || I <- lists:seq(1,10)], - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{"seaweed.sheets", 5}]), - etap:is(length(Docs), 1, "correct number of results from nested query"), - etap:is(proplists:get_value(<<"seaweed">>, hd(Docs)), [{<<"sheets">>, 5}], "correct result returned"), - ok - end)(), - - emongo:delete(test1, "sushi"), - - etap:end_tests(). + etap:end_tests(). \ No newline at end of file diff --git a/t/004-cond-exprs.t b/t/004-cond-exprs.t index 56b37b7..0bbe127 100644 --- a/t/004-cond-exprs.t +++ b/t/004-cond-exprs.t @@ -1,112 +1,110 @@ -#!/usr/bin/env escript +#!/usr/local/bin/escript %% -*- erlang -*- %%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example main(_) -> - etap:plan(unknown), + etap:plan(unknown), error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), - - emongo:delete(test1, "sushi"), - etap:is(emongo:find_all(test1, "sushi"), [], "sushi collection is empty"), - - [emongo:insert(test1, "sushi", [{<<"rolls">>, I}]) || I <- lists:seq(1, 50)], - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{gt, 45}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 5, "correct number of results from gt query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [46,47,48,49,50], "correct results from gt query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{lt, 5}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 4, "correct number of results from lt query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [1, 2, 3, 4], "correct results from lt query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{gte, 45}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 6, "correct number of results from gte query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [45,46,47,48,49,50], "correct results from gte query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{lte, 5}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 5, "correct number of results from lte query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [1, 2, 3, 4, 5], "correct results from lte query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{ne, 1}, {ne, 50}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 48, "correct number of results from ne query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), lists:seq(2, 49), "correct results from ne query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{in, [1,2,3,4,5]}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 5, "correct number of results from in query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [1,2,3,4,5], "correct results from in query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, [{nin, [1,2,3,4,5]}]}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 45, "correct number of results from nin query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), lists:seq(6,50), "correct results from nin query"), - ok - end)(), - - [emongo:insert(test1, "sushi", [{<<"maki">>, {array, [I,I+1,I+2]}}]) || I <- lists:seq(1, 10)], - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"maki">>, [{all, [2,3]}]}], [orderby, [{"maki", asc}]]), - etap:is(length(Docs), 2, "correct number of results from all query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [{array, [1,2,3]}, {array, [2,3,4]}], "correct results from all query"), - ok - end)(), - - emongo:insert(test1, "sushi", [{<<"maki">>, {array, [1,2,3,4,5]}}]), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"maki">>, [{size, 5}]}], [orderby, [{"maki", asc}]]), - etap:is(length(Docs), 1, "correct number of results from size query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [{array, [1,2,3,4,5]}], "correct results from size query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"maki">>, [{exists, true}]}], [orderby, [{"maki", asc}]]), - etap:is(length(Docs), 11, "correct number of results from exists query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{<<"maki">>, [{exists, false}]}], [orderby, [{"maki", asc}]]), - etap:is(length(Docs), 50, "correct number of results from exists query"), - ok - end)(), - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{where, "this.rolls > 45"}], [orderby, [{"rolls", asc}]]), - etap:is(length(Docs), 5, "correct number of results from where query"), - etap:is(lists:sort([I || [_, {_, I}] <- Docs]), [46,47,48,49,50], "correct results from where query"), - ok - end)(), - - [emongo:insert(test1, "sushi", [{<<"seaweed">>, [{<<"sheets">>, I}]}]) || I <- lists:seq(1,10)], - - (fun() -> - Docs = emongo:find_all(test1, "sushi", [{"seaweed.sheets", [{in, [3,4,5]}]}]), - etap:is(length(Docs), 3, "correct number of results from nested query"), - etap:is(lists:sort([I || [_, {<<"seaweed">>, [{<<"sheets">>, I}]}] <- Docs]), [3,4,5], "correct results from where query"), - ok - end)(), - - emongo:delete(test1, "sushi"), - - etap:end_tests(). + etap_application:start_ok(emongo, "application 'emongo' started ok"), + + emongo:delete(test1, "sushi"), + etap:is(emongo:find(test1, "sushi"), [], "sushi collection is empty"), + + [emongo:insert(test1, "sushi", [{<<"rolls">>, I}]) || I <- lists:seq(1, 50)], + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{gt, 45}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 5, "correct number of results from gt query"), + etap:is([I || [_, {_, I}] <- Docs], [46,47,48,49,50], "correct results from gt query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{lt, 5}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 4, "correct number of results from lt query"), + etap:is([I || [_, {_, I}] <- Docs], [1, 2, 3, 4], "correct results from lt query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{gte, 45}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 6, "correct number of results from gte query"), + etap:is([I || [_, {_, I}] <- Docs], [45,46,47,48,49,50], "correct results from gte query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{lte, 5}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 5, "correct number of results from lte query"), + etap:is([I || [_, {_, I}] <- Docs], [1, 2, 3, 4, 5], "correct results from lte query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{ne, 1}, {ne, 50}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 48, "correct number of results from ne query"), + etap:is([I || [_, {_, I}] <- Docs], lists:seq(2, 49), "correct results from ne query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{in, [1,2,3,4,5]}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 5, "correct number of results from in query"), + etap:is([I || [_, {_, I}] <- Docs], [1,2,3,4,5], "correct results from in query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"rolls">>, [{nin, [1,2,3,4,5]}]}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 45, "correct number of results from nin query"), + etap:is([I || [_, {_, I}] <- Docs], lists:seq(6,50), "correct results from nin query"), + ok + end)(), + + [emongo:insert(test1, "sushi", [{<<"maki">>, {array, [I,I+1,I+2]}}]) || I <- lists:seq(1, 10)], + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"maki">>, [{all, [2,3]}]}], [orderby, [{"maki", asc}]]), + etap:is(length(Docs), 2, "correct number of results from all query"), + etap:is([I || [_, {_, I}] <- Docs], [{array, [1,2,3]}, {array, [2,3,4]}], "correct results from all query"), + ok + end)(), + + emongo:insert(test1, "sushi", [{<<"maki">>, {array, [1,2,3,4,5]}}]), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"maki">>, [{size, 5}]}], [orderby, [{"maki", asc}]]), + etap:is(length(Docs), 1, "correct number of results from size query"), + etap:is([I || [_, {_, I}] <- Docs], [{array, [1,2,3,4,5]}], "correct results from size query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"maki">>, [{exists, true}]}], [orderby, [{"maki", asc}]]), + etap:is(length(Docs), 11, "correct number of results from exists query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{<<"maki">>, [{exists, false}]}], [orderby, [{"maki", asc}]]), + etap:is(length(Docs), 50, "correct number of results from exists query"), + ok + end)(), + + (fun() -> + Docs = emongo:find(test1, "sushi", [{where, "this.rolls > 45"}], [orderby, [{"rolls", asc}]]), + etap:is(length(Docs), 5, "correct number of results from where query"), + etap:is([I || [_, {_, I}] <- Docs], [46,47,48,49,50], "correct results from where query"), + ok + end)(), + + [emongo:insert(test1, "sushi", [{<<"seaweed">>, [{<<"sheets">>, I}]}]) || I <- lists:seq(1,10)], + + (fun() -> + Docs = emongo:find(test1, "sushi", [{"seaweed.sheets", [{in, [3,4,5]}]}]), + etap:is(length(Docs), 3, "correct number of results from nested query"), + etap:is([I || [_, {<<"seaweed">>, [{<<"sheets">>, I}]}] <- Docs], [3,4,5], "correct results from where query"), + ok + end)(), + + etap:end_tests(). \ No newline at end of file diff --git a/t/005-drop.t b/t/005-drop.t deleted file mode 100644 index e127511..0000000 --- a/t/005-drop.t +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example - -main(_) -> - etap:plan(unknown), - error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), - - [emongo:insert(test1, "sushi", [{<<"rolls">>, I}]) || I <- lists:seq(1, 50)], - [emongo:insert(test1, "sushi2", [{<<"rolls">>, I}]) || I <- lists:seq(1, 50)], - - ok = emongo:drop_database(test1), - - emongo:insert(test1, "sushi", [{<<"rolls">>, 1}]), - etap:is(length(emongo:find_all(test1, "sushi")), 1, "There is 1 doc"), - etap:is(length(emongo:find_all(test1, "sushi2")), 0, "there is 0 doc"), - - emongo:delete(test1, "sushi"), - - etap:end_tests(). diff --git a/t/006-multiup.t b/t/006-multiup.t deleted file mode 100644 index 1a22869..0000000 --- a/t/006-multiup.t +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example - -main(_) -> - etap:plan(unknown), - error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), - - emongo:delete(test1, "sushi"), - etap:is(emongo:find_all(test1, "sushi"), [], "sushi collection is empty"), - - [emongo:insert(test1, "sushi", [{<<"rolls">>, I}]) || I <- lists:seq(1, 50)], - - (fun() -> - emongo:update(test1, "sushi", [{<<"rolls">>, [{gt, 45}]}], [{<<"$set">>, [{<<"rolls">>, 100}]}], false, true), - Docs = emongo:find_all(test1, "sushi", [{<<"rolls">>, 100}], []), - etap:is(length(Docs), 5, "correct number of results after multiupdate"), - etap:is([I || [_, {_, I}] <- Docs], [100,100,100,100,100], "correct results after multiupdate"), - ok - end)(), - - emongo:delete(test1, "sushi"), - - etap:end_tests(). diff --git a/t/007-performance.t b/t/007-performance.t deleted file mode 100644 index 44be02e..0000000 --- a/t/007-performance.t +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -pa ebin -sasl errlog_type error -boot start_sasl -noshell -config priv/example - --define(NUM_PROCESSES, 500). --define(NUM_TESTS_PER_PID, 10). --define(POOL, test2). --define(COLL, <<"sushi">>). --define(TIMEOUT, 10000). --define(OUT(Format, Data), io:format(Format ++ "\n", Data)). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -main(_) -> - etap:plan(unknown), - error_logger:tty(false), - etap:ok(application:start(emongo) == ok, "application 'emongo' started ok"), - emongo:delete(?POOL, ?COLL, []), - (fun() -> - test_performance() - end)(), - emongo:delete(?POOL, ?COLL, []), - etap:end_tests(). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -test_performance() -> - ?OUT("Testing performance", []), - Start = cur_time_ms(), - try - start_processes(?NUM_PROCESSES), - block_until_done(?NUM_PROCESSES) - after - % Clean up in case something failed. - emongo:delete_sync(?POOL, ?COLL, []) - end, - End = cur_time_ms(), - ?OUT("Test passed in ~p ms\n", [End - Start]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -start_processes(X) when X =< 0 -> ok; -start_processes(X) -> - Pid = self(), - proc_lib:spawn(fun() -> - run_tests(Pid, X, ?NUM_TESTS_PER_PID) - end), - start_processes(X - 1). - -run_tests(Pid, _, Y) when Y =< 0 -> - Pid ! done; -run_tests(Pid, X, Y) -> - Num = (X bsl 16) bor Y, % Make up a unique number for this run - try - IRes = emongo:insert_sync(?POOL, ?COLL, [{"_id", Num}]), - ok = check_result(IRes), - %etap:is(check_result(IRes), ok, "insert_sync ok"), - - FMRes = emongo:find_and_modify(?POOL, ?COLL, [{"_id", Num}], - [{<<"$set">>, [{<<"fm">>, Num}]}], [{new, true}]), - [[{<<"value">>, [{<<"_id">>, Num}, {<<"fm">>, Num}]}, {<<"ok">>, 1.0}]] = - FMRes, - %etap:is(FMRes, [[{<<"value">>, [{<<"_id">>, Num}, {<<"fm">>, Num}]}, - % {<<"ok">>, 1.0}]], "find_all ok"), - - URes = emongo:update_sync(?POOL, ?COLL, [{"_id", Num}], - [{<<"$set">>, [{<<"us">>, Num}]}], false), - ok = check_result(URes), - %etap:is(check_result(URes), ok, "update_sync ok"), - - FARes = emongo:find_all(?POOL, ?COLL, [{"_id", Num}]), - [[{<<"_id">>, Num}, {<<"fm">>, Num}, {<<"us">>, Num}]] = FARes, - %etap:is(FARes, [[{<<"_id">>, Num}, {<<"fm">>, Num}, {<<"us">>, Num}]], - % "find_all ok"), - - DRes = emongo:delete_sync(?POOL, ?COLL, [{"_id", Num}]), - ok = check_result(DRes) - %etap:is(check_result(DRes), ok, "delete_sync ok") - catch _:E -> - ?OUT("Exception occurred for test ~.16b: ~p\n~p\n", - [Num, E, erlang:get_stacktrace()]), - throw(test_failed) - end, - run_tests(Pid, X, Y - 1). - -block_until_done(X) when X =< 0 -> ok; -block_until_done(X) -> - receive done -> ok - after ?TIMEOUT -> - ?OUT("No response\n", []), - throw(test_failed) - end, - block_until_done(X - 1). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -ensure_started(App) -> - case application:start(App) of - ok -> ok; - {error, {already_started, App}} -> ok - end. - -cur_time_ms() -> - {MegaSec, Sec, MicroSec} = erlang:now(), - MegaSec * 1000000000 + Sec * 1000 + erlang:round(MicroSec / 1000). - -check_result([List]) when is_list(List) -> check_result_int(List); -check_result(_) -> {error, invalid_result}. - -check_result_int([]) -> {error, result_unknown}; -check_result_int([{<<"err">>, null} | _]) -> ok; -check_result_int([{<<"err">>, Error} | _]) -> {error, Error}; -check_result_int([_ | Rest]) -> check_result_int(Rest). diff --git a/t/pqueue_test.erl b/t/pqueue_test.erl deleted file mode 100644 index a62e704..0000000 --- a/t/pqueue_test.erl +++ /dev/null @@ -1,69 +0,0 @@ --module(pqueue_test). - --compile(export_all). - -test(Data, F) -> - {Time1, Sorted1} = timer:tc(lists, sort, [Data]), - {Time21, Pushed1} = timer:tc(lists, foldl, [fun({P, I}, Q) -> - pqueue:push(P, I, Q) - end, pqueue:new(), Data]), - - Sorted = [I || {_, V} = I <- Sorted1, F(V)], - Pushed = pqueue:filter(F, Pushed1), - - {Time22, _} = timer:tc(?MODULE, pop_items, [Pushed]), - Time2 = Time21 + Time22, - - io:format("Time sorted: ~p~nTime pushed: ~p~nDivide: ~p~n", [Time1, Time2, Time1/Time2]), - - LenS = length(Sorted), - LenP = pqueue:size(Pushed), - - LenS = LenP, - io:format("Length: ~p~n", [LenS]), - - lists:foldl(fun({_, I}, Q) -> - {I, NQ} = pqueue:pop(Q), - NQ - end, Pushed, Sorted). - -test() -> - Len = 10000, - Dev = 10, - Fun = fun(I) -> I rem 3 =/= 0 end, - - Data = [{random:uniform(Dev), I} || I <- lists:seq(1, Len)], - test(Data, Fun). - -test2() -> - Queue0 = push_items([{1, 1}, {1, 2}, {5, 3}, {2, 4}], pqueue:new()), - 4 = pqueue:size(Queue0), - - Queue1 = ensure_items([1, 2, 4], Queue0), - 1 = pqueue:size(Queue1), - - Queue2 = push_items([{2, 5}, {3, 6}], Queue1), - 3 = pqueue:size(Queue2), - - Queue3 = ensure_items([5, 3, 6], Queue2), - - Queue3 = pqueue:new(). - -push_items(Data, Queue) -> - lists:foldl(fun({P, I}, Q) -> pqueue:push(P, I, Q) end, - Queue, - Data). - -ensure_items(Items, Queue) -> - lists:foldl(fun(I, Q) -> {I, NQ} = pqueue:pop(Q), NQ end, - Queue, - Items). - -pop_items(Queue) -> - case pqueue:size(Queue) of - 0 -> - ok; - _ -> - {_Item, NewQueue} = pqueue:pop(Queue), - pop_items(NewQueue) - end. diff --git a/test/emongo_test.erl b/test/emongo_test.erl new file mode 100644 index 0000000..13d3a99 --- /dev/null +++ b/test/emongo_test.erl @@ -0,0 +1,113 @@ +-module(emongo_test). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). + +-define(NUM_PROCESSES, 500). +-define(NUM_TESTS_PER_PID, 10). +-define(POOL, pool1). +-define(COLL, <<"test">>). +-define(TIMEOUT, 5000). +-define(OUT(F, D), ?debugFmt(F, D)). + +setup() -> + ensure_started(sasl), + ensure_started(emongo), + emongo:add_pool(?POOL, "localhost", 27017, "testdatabase", 10), + ok. + +cleanup(_) -> + ok. + +run_test_() -> + [{setup, + fun setup/0, + fun cleanup/1, + [ + fun test_performance/0 + ] + }]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +test_performance() -> + ?OUT("Testing performance.", []), + emongo:delete_sync(?POOL, ?COLL), + Start = cur_time_ms(), + try + start_processes(?NUM_PROCESSES), + block_until_done(?NUM_PROCESSES) + after + % Clean up in case something failed. + emongo:delete_sync(?POOL, ?COLL) + end, + End = cur_time_ms(), + ?OUT("Test passed in ~p ms\n", [End - Start]). + +start_processes(X) when X =< 0 -> ok; +start_processes(X) -> + Pid = self(), + proc_lib:spawn(fun() -> + run_tests(Pid, X, ?NUM_TESTS_PER_PID) + end), + start_processes(X - 1). + +run_tests(Pid, _, Y) when Y =< 0 -> + Pid ! done; +run_tests(Pid, X, Y) -> + Num = (X bsl 16) bor Y, % Make up a unique number for this run + try + IRes = emongo:insert_sync(?POOL, ?COLL, [{"_id", Num}], [response_options]), + ok = check_result("insert_sync", IRes, 0), + + [FMRes] = emongo:find_and_modify(?POOL, ?COLL, [{"_id", Num}], + [{<<"$set">>, [{<<"fm">>, Num}]}], [{new, true}]), + FMVal = proplists:get_value(<<"value">>, FMRes), + [{<<"_id">>, Num}, {<<"fm">>, Num}] = FMVal, + + URes = emongo:update_sync(?POOL, ?COLL, [{"_id", Num}], + [{<<"$set">>, [{<<"us">>, Num}]}], false, [response_options]), + ok = check_result("update_sync", URes, 1), + + FARes = emongo:find_all(?POOL, ?COLL, [{"_id", Num}]), + [[{<<"_id">>, Num}, {<<"fm">>, Num}, {<<"us">>, Num}]] = FARes, + + DRes = emongo:delete_sync(?POOL, ?COLL, [{"_id", Num}], [response_options]), + ok = check_result("delete_sync", DRes, 1) + catch _:E -> + ?OUT("Exception occurred for test ~.16b: ~p\n~p\n", + [Num, E, erlang:get_stacktrace()]), + throw(test_failed) + end, + run_tests(Pid, X, Y - 1). + +block_until_done(X) when X =< 0 -> ok; +block_until_done(X) -> + receive done -> ok + after ?TIMEOUT -> + ?OUT("No response\n", []), + throw(test_failed) + end, + block_until_done(X - 1). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +ensure_started(App) -> + case application:start(App) of + ok -> ok; + {error, {already_started, App}} -> ok + end. + +cur_time_ms() -> + {MegaSec, Sec, MicroSec} = erlang:now(), + MegaSec * 1000000000 + Sec * 1000 + erlang:round(MicroSec / 1000). + +check_result(Desc, + {response, _,_,_,_,_, [List]}, + ExpectedN) when is_list(List) -> + {_, Err} = lists:keyfind(<<"err">>, 1, List), + {_, N} = lists:keyfind(<<"n">>, 1, List), + if Err == undefined, N == ExpectedN -> ok; + true -> + ?OUT("Unexpected result for ~p: Err = ~p; N = ~p", [Desc, Err, N]), + throw({error, invalid_db_response}) + end.