Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 63 additions & 40 deletions src/modules/presence/hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,51 @@ subs_t *mem_copy_subs_noc(subs_t *s)
return dest;

error:
if(dest)
if(dest) {
if(dest->contact.s)
shm_free(dest->contact.s);
if(dest->record_route.s)
shm_free(dest->record_route.s);
shm_free(dest);
}
return NULL;
}

int insert_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
int insert_and_replace_shtable(
shtable_t htable, unsigned int hash_code, subs_t *subs, int replace)
{
subs_t *new_rec = NULL;

if(pres_delete_same_subs) {
subs_t *rec = NULL, *prev_rec = NULL;
subs_t *rec = NULL, *prev_rec = NULL;

lock_get(&htable[hash_code].lock);
new_rec = mem_copy_subs_noc(subs);
if(new_rec == NULL) {
LM_ERR("copying in share memory a subs_t structure\n");
return -1;
}
if(subs->expires != 0 && subs->expires < ksr_time_sint(NULL, NULL))
new_rec->expires += ksr_time_sint(NULL, NULL);

lock_get(&htable[hash_code].lock);
if(replace || pres_delete_same_subs) {
/* search if there is another record with the same pres_uri & callid */
rec = htable[hash_code].entries->next;
while(rec) {
if(subs->pres_uri.len == rec->pres_uri.len
&& subs->callid.len == rec->callid.len
&& memcmp(subs->pres_uri.s, rec->pres_uri.s,
subs->pres_uri.len)
if(new_rec->pres_uri.len == rec->pres_uri.len
&& new_rec->callid.len == rec->callid.len
&& memcmp(new_rec->pres_uri.s, rec->pres_uri.s,
new_rec->pres_uri.len)
== 0
&& memcmp(subs->callid.s, rec->callid.s, subs->callid.len)
&& memcmp(new_rec->callid.s, rec->callid.s,
new_rec->callid.len)
== 0) {
LM_NOTICE("Found another record with the same pres_uri[%.*s] "
"and callid[%.*s]\n",
subs->pres_uri.len, subs->pres_uri.s, subs->callid.len,
subs->callid.s);
LM_DBG("Found another record with the same pres_uri[%.*s], "
"callid[%.*s],"
" from_tag[%.*s] and to_tag[%.*s]\n",
new_rec->pres_uri.len, new_rec->pres_uri.s,
new_rec->callid.len, new_rec->callid.s,
new_rec->from_tag.len, new_rec->from_tag.s,
new_rec->to_tag.len, new_rec->to_tag.s);
/* delete this record */

if(prev_rec) {
Expand All @@ -294,31 +312,41 @@ int insert_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
if(rec->contact.s != NULL) {
shm_free(rec->contact.s);
}
if(rec->record_route.s != NULL) {
shm_free(rec->record_route.s);
}

shm_free(rec);
break;

if(prev_rec) {
rec = prev_rec->next;
} else {
rec = htable[hash_code].entries->next;
}
} else {
prev_rec = rec;
rec = rec->next;
}
prev_rec = rec;
rec = rec->next;
}
lock_release(&htable[hash_code].lock);
}

new_rec = mem_copy_subs_noc(subs);
if(new_rec == NULL) {
LM_ERR("copying in share memory a subs_t structure\n");
return -1;
}
new_rec->expires += ksr_time_sint(NULL, NULL);

lock_get(&htable[hash_code].lock);
new_rec->next = htable[hash_code].entries->next;
htable[hash_code].entries->next = new_rec;
lock_release(&htable[hash_code].lock);

return 0;
}

int insert_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
{
return insert_and_replace_shtable(htable, hash_code, subs, 0);
}

int replace_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
{
return insert_and_replace_shtable(htable, hash_code, subs, 1);
}

int delete_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
{
subs_t *s = NULL, *ps = NULL;
Expand Down Expand Up @@ -355,15 +383,15 @@ int delete_shtable(shtable_t htable, unsigned int hash_code, subs_t *subs)
if(found == 0) {
found = s->local_cseq + 1;
ps->next = s->next;
if(s->contact.s != NULL) {
shm_free(s->contact.s);
s->contact.s = NULL;
}
if(s->record_route.s != NULL) {
shm_free(s->record_route.s);
s->record_route.s = NULL;
}
if(s) {
if(s->contact.s != NULL) {
shm_free(s->contact.s);
s->contact.s = NULL;
}
if(s->record_route.s != NULL) {
shm_free(s->record_route.s);
s->record_route.s = NULL;
}
shm_free(s);
s = NULL;
}
Expand Down Expand Up @@ -522,7 +550,6 @@ void destroy_phtable(void)
shm_free(pres_htable);
}
/* entry must be locked before calling this function */

pres_entry_t *search_phtable(str *pres_uri, int event, unsigned int hash_code)
{
pres_entry_t *p;
Expand Down Expand Up @@ -1061,15 +1088,11 @@ int ps_ptable_replace(ps_presentity_t *ptm, ps_presentity_t *pt)
} else {
_ps_ptable->slots[idx].plist = ptn->next;
}
break;
ps_presentity_free(ptn, 0);
}
ptn = ptn->next;
}

if(ptn != NULL) {
ps_presentity_free(ptn, 0);
}

ptn = ps_presentity_new(&ptv, 0);
if(ptn == NULL) {
lock_release(&_ps_ptable->slots[idx].lock);
Expand Down
6 changes: 6 additions & 0 deletions src/modules/presence/hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ struct subscription *search_shtable(shtable_t htable, str callid, str to_tag,
int insert_shtable(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

int replace_shtable(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

int delete_shtable(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

Expand All @@ -102,6 +105,9 @@ typedef struct subscription *(*search_shtable_t)(shtable_t htable, str callid,
typedef int (*insert_shtable_t)(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

typedef int (*replace_shtable_t)(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

typedef int (*delete_shtable_t)(
shtable_t htable, unsigned int hash_code, struct subscription *subs);

Expand Down
9 changes: 9 additions & 0 deletions src/modules/presence/notify.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "presence.h"
#include "notify.h"
#include "utils_func.h"
#include "presence_dmq.h"
#include "../../core/receive.h"

#define ALLOC_SIZE 3000
Expand Down Expand Up @@ -1852,6 +1853,10 @@ int notify(subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body,
}
pkg_free(aux_body);
}
if(pres_enable_dmq > 0 && pres_enable_subs_dmq > 0) {
pres_dmq_replicate_subscription(subs, NULL);
}

return 0;
}

Expand Down Expand Up @@ -1997,6 +2002,10 @@ void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
|| pres_get_delete_sub()) {
delete_subs(&subs->pres_uri, &subs->event->name, &subs->to_tag,
&subs->from_tag, &subs->callid);
if(pres_enable_dmq > 0 && pres_enable_subs_dmq > 0) {
subs->expires = 0;
pres_dmq_replicate_subscription(subs, NULL);
}
}

shm_free(subs);
Expand Down
47 changes: 47 additions & 0 deletions src/modules/presence/presence.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ str pres_xavp_cfg = {0};
int pres_retrieve_order = 0;
str pres_retrieve_order_by = str_init("priority");
int pres_enable_dmq = 0;
int pres_enable_pres_dmq = 1;
int pres_enable_pres_sync_dmq = 1;
int pres_enable_subs_dmq = 0;
int pres_enable_subs_sync_dmq = 1;
int pres_skip_notify_dmq = 0;
str pres_default_socket = {0, 0};
int pres_dmq_batch_size = 0;
int pres_dmq_batch_msg_pres = 1;
int pres_dmq_batch_msg_subs = 1;
int pres_dmq_batch_msg_size = 60000;
int pres_dmq_batch_usleep = 0;
int pres_delete_same_subs = 0;
int pres_subs_respond_200 = 1;

Expand Down Expand Up @@ -247,6 +258,17 @@ static param_export_t params[]={
{ "sip_uri_match", PARAM_INT, &pres_uri_match},
{ "cseq_offset", PARAM_INT, &pres_cseq_offset},
{ "enable_dmq", PARAM_INT, &pres_enable_dmq},
{ "enable_pres_dmq", PARAM_INT, &pres_enable_pres_dmq},
{ "enable_pres_sync_dmq", PARAM_INT, &pres_enable_pres_sync_dmq},
{ "enable_subs_dmq", PARAM_INT, &pres_enable_subs_dmq},
{ "enable_subs_sync_dmq", PARAM_INT, &pres_enable_subs_sync_dmq},
{ "batch_msg_pres", PARAM_INT, &pres_dmq_batch_msg_pres},
{ "batch_msg_subs", PARAM_INT, &pres_dmq_batch_msg_subs},
{ "batch_msg_size", PARAM_INT, &pres_dmq_batch_msg_size},
{ "batch_size", PARAM_INT, &pres_dmq_batch_size},
{ "batch_usleep", PARAM_INT, &pres_dmq_batch_usleep},
{ "skip_notify_dmq", PARAM_INT, &pres_skip_notify_dmq},
{ "default_socket", PARAM_STR, &pres_default_socket},
{ "pres_subs_mode", PARAM_INT, &_pres_subs_mode},
{ "delete_same_subs", PARAM_INT, &pres_delete_same_subs},
{ "timer_mode", PARAM_INT, &pres_timer_mode},
Expand Down Expand Up @@ -345,6 +367,30 @@ static int mod_init(void)
LM_DBG("server_address parameter not set in configuration file\n");
}

if(pres_default_socket.s != NULL && pres_default_socket.len != 0
&& lookup_local_socket(&pres_default_socket) == NULL) {
LM_ERR("default_socket is not a socket the proxy is listening on\n");
return -1;
}

if(pres_dmq_batch_msg_size > 60000) {
LM_ERR("batch_msg_size too high[%d] setting to [60000]\n",
pres_dmq_batch_msg_size);
pres_dmq_batch_msg_size = 60000;
}

if(pres_dmq_batch_msg_pres > 150) {
LM_ERR("batch_msg_pres too high[%d] setting to [150]\n",
pres_dmq_batch_msg_pres);
pres_dmq_batch_msg_pres = 150;
}

if(pres_dmq_batch_msg_subs > 150) {
LM_ERR("batch_msg_subs too high[%d] setting to [150]\n",
pres_dmq_batch_msg_subs);
pres_dmq_batch_msg_subs = 150;
}

/* bind the SL API */
if(sl_load_api(&_pres_slb) != 0) {
LM_ERR("cannot bind to SL API\n");
Expand Down Expand Up @@ -1440,6 +1486,7 @@ static int update_pw_dialogs(
if(subs->status == TERMINATED_STATUS) {
ps->next = s->next;
shm_free(s->contact.s);
shm_free(s->record_route.s);
shm_free(s);
LM_DBG(" deleted terminated dialog from hash table\n");
} else
Expand Down
11 changes: 11 additions & 0 deletions src/modules/presence/presence.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ extern str pres_xavp_cfg;
extern int pres_retrieve_order;
extern str pres_retrieve_order_by;
extern int pres_enable_dmq;
extern int pres_enable_pres_dmq;
extern int pres_enable_pres_sync_dmq;
extern int pres_enable_subs_dmq;
extern int pres_enable_subs_sync_dmq;
extern int pres_skip_notify_dmq;
extern str pres_default_socket;
extern int pres_dmq_batch_size;
extern int pres_dmq_batch_msg_pres;
extern int pres_dmq_batch_msg_subs;
extern int pres_dmq_batch_msg_size;
extern int pres_dmq_batch_usleep;
extern int pres_subs_respond_200;

extern int phtable_size;
Expand Down
Loading
Loading