Reduce use of mosquitto_time() and time().

pull/1886/head
Roger A. Light 5 years ago
parent e6b8fc5bbd
commit 108b23ce6d

@ -63,7 +63,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
return 0;
}
int keepalive__update(struct mosquitto *context)
int keepalive__update(struct mosquitto_db *db, struct mosquitto *context)
{
return 0;
}

@ -70,7 +70,7 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq)
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);
if(reason_code < 0x80){
rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
rc = db__message_update_outgoing(db, mosq, mid, mosq_ms_wait_for_pubcomp, 2);
}else{
return db__message_delete_outgoing(db, mosq, mid, mosq_ms_wait_for_pubrec, 2);
}

@ -303,9 +303,13 @@ int packet__write(struct mosquitto *mosq)
packet__cleanup(packet);
mosquitto__free(packet);
#ifdef WITH_BROKER
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
#endif
}
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
@ -466,7 +470,7 @@ int packet__read(struct mosquitto *mosq)
* If a client can't send 1000 bytes in a second it
* probably shouldn't be using a 1 second keep alive. */
#ifdef WITH_BROKER
keepalive__update(mosq);
keepalive__update(db, mosq);
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
@ -502,8 +506,12 @@ int packet__read(struct mosquitto *mosq)
/* Free data and reset values */
packet__cleanup(&mosq->in_packet);
#ifdef WITH_BROKER
keepalive__update(db, mosq);
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
pthread_mutex_unlock(&mosq->msgtime_mutex);
#endif
return rc;
}

@ -65,13 +65,19 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
{
time_t next_msg_out;
time_t last_msg_in;
time_t now = mosquitto_time();
time_t now;
#ifndef WITH_BROKER
int rc;
#endif
int state;
assert(mosq);
#ifdef WITH_BROKER
now = db->now_s;
#else
now = mosquitto_time();
#endif
#if defined(WITH_BROKER) && defined(WITH_BRIDGE)
/* Check if a lazy bridge should be timed out due to idle. */
if(mosq->bridge && mosq->bridge->start_type == bst_lazy

@ -155,8 +155,8 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context)
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
context->last_msg_in = db->now_s;
context->next_msg_out = db->now_s + context->bridge->keepalive;
context->keepalive = context->bridge->keepalive;
context->clean_start = context->bridge->clean_start;
context->in_packet.payload = NULL;
@ -297,7 +297,7 @@ int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context)
}
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = mosquitto_time() + 5;
context->bridge->primary_retry = db->now_s + 5;
}
rc = send__connect(context, context->keepalive, context->clean_start, NULL);
@ -333,8 +333,8 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
mosquitto__set_state(context, mosq_cs_new);
context->sock = INVALID_SOCKET;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + context->bridge->keepalive;
context->last_msg_in = db->now_s;
context->next_msg_out = db->now_s + context->bridge->keepalive;
context->keepalive = context->bridge->keepalive;
context->clean_start = context->bridge->clean_start;
context->in_packet.payload = NULL;
@ -655,16 +655,13 @@ static void bridge__backoff_reset(struct mosquitto *context)
void bridge_check(struct mosquitto_db *db)
{
static time_t last_check = 0;
time_t now;
struct mosquitto *context = NULL;
socklen_t len;
int i;
int rc;
int err;
now = mosquitto_time();
if(now <= last_check) return;
if(db->now_s <= last_check) return;
for(i=0; i<db->bridge_count; i++){
if(!db->bridges[i]) continue;
@ -679,7 +676,7 @@ void bridge_check(struct mosquitto_db *db)
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){
&& db->now_s > context->bridge->primary_retry){
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address,
@ -706,12 +703,12 @@ void bridge_check(struct mosquitto_db *db)
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
context->bridge->primary_retry = db->now_s+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
context->bridge->primary_retry = db->now_s+5;
}
}
}
@ -722,14 +719,14 @@ void bridge_check(struct mosquitto_db *db)
if(context->sock == INVALID_SOCKET){
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = now+context->bridge->restart_timeout;
context->bridge->restart_t = db->now_s+context->bridge->restart_timeout;
context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
}else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
|| (context->bridge->start_type == bst_automatic && db->now_s > context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){
@ -780,7 +777,7 @@ void bridge_check(struct mosquitto_db *db)
context->bridge->restart_t = 0;
if(rc == MOSQ_ERR_SUCCESS){
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
context->bridge->primary_retry = db->now_s + 5;
}
mux__add_in(db, context);
if(context->current_out_packet){

@ -44,8 +44,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->pollfd_index = -1;
mosquitto__set_state(context, mosq_cs_new);
context->sock = sock;
context->last_msg_in = mosquitto_time();
context->next_msg_out = mosquitto_time() + 60;
context->last_msg_in = db->now_s;
context->next_msg_out = db->now_s + 60;
context->keepalive = 60; /* Default to 60s */
context->clean_start = true;
context->id = NULL;
@ -172,7 +172,7 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt)
{
if(ctxt->state != mosq_cs_disconnecting && ctxt->will){
if(ctxt->will_delay_interval > 0){
will_delay__add(ctxt);
will_delay__add(db, ctxt);
return;
}

@ -331,7 +331,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte
}
msg_index++;
tail->timestamp = mosquitto_time();
tail->timestamp = db->now_s;
switch(tail->qos){
case 0:
tail->state = mosq_ms_publish_qos0;
@ -476,7 +476,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
msg->store = stored;
db__msg_store_ref_inc(msg->store);
msg->mid = mid;
msg->timestamp = mosquitto_time();
msg->timestamp = db->now_s;
msg->direction = dir;
msg->state = state;
msg->dup = false;
@ -543,7 +543,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
return rc;
}
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
int db__message_update_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
{
struct mosquitto_client_msg *tail;
@ -553,7 +553,7 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo
return MOSQ_ERR_PROTOCOL;
}
tail->state = state;
tail->timestamp = mosquitto_time();
tail->timestamp = db->now_s;
return MOSQ_ERR_SUCCESS;
}
}
@ -686,7 +686,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, s
stored->mid = 0;
stored->origin = origin;
if(message_expiry_interval > 0){
stored->message_expiry_time = time(NULL) + message_expiry_interval;
stored->message_expiry_time = db->now_real_s + message_expiry_interval;
}else{
stored->message_expiry_time = 0;
}
@ -921,7 +921,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont
}
msg_index++;
tail->timestamp = mosquitto_time();
tail->timestamp = db->now_s;
if(tail->qos == 2){
send__pubrec(context, tail->mid, 0, NULL);
@ -940,7 +940,6 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con
{
struct mosquitto_client_msg *tail, *tmp;
int rc;
time_t now = 0;
if(context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
@ -948,10 +947,7 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con
DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
if(tail->store->message_expiry_time){
if(now == 0){
now = time(NULL);
}
if(now > tail->store->message_expiry_time){
if(db->now_real_s > tail->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, &context->msgs_in, tail);
if(tail->qos > 0){
@ -1008,15 +1004,11 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct
uint8_t qos;
uint32_t payloadlen;
const void *payload;
time_t now = 0;
uint32_t expiry_interval;
expiry_interval = 0;
if(msg->store->message_expiry_time){
if(now == 0){
now = time(NULL);
}
if(now > msg->store->message_expiry_time){
if(db->now_real_s > msg->store->message_expiry_time){
/* Message is expired, must not send. */
if(msg->direction == mosq_md_out && msg->qos > 0){
util__increment_send_quota(context);
@ -1024,7 +1016,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct
db__message_remove(db, &context->msgs_out, msg);
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(msg->store->message_expiry_time - now);
expiry_interval = (uint32_t)(msg->store->message_expiry_time - db->now_real_s);
}
}
mid = msg->mid;
@ -1050,7 +1042,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct
case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
msg->timestamp = mosquitto_time();
msg->timestamp = db->now_s;
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_puback;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
@ -1063,7 +1055,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct
case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
msg->timestamp = mosquitto_time();
msg->timestamp = db->now_s;
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_pubrec;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){

@ -29,12 +29,12 @@ int keepalive__add(struct mosquitto *context)
}
void keepalive__check(struct mosquitto_db *db, time_t now)
void keepalive__check(struct mosquitto_db *db)
{
struct mosquitto *context, *ctxt_tmp;
if(last_keepalive_check + 5 < now){
last_keepalive_check = now;
if(last_keepalive_check + 5 < db->now_s){
last_keepalive_check = db->now_s;
/* FIXME - this needs replacing with something more efficient */
HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){
@ -42,7 +42,7 @@ void keepalive__check(struct mosquitto_db *db, time_t now)
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge
|| now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
|| db->now_s - context->last_msg_in <= (time_t)(context->keepalive)*3/2){
}else{
/* Client has exceeded keepalive*1.5 */
@ -65,8 +65,8 @@ void keepalive__remove_all(void)
}
int keepalive__update(struct mosquitto *context)
int keepalive__update(struct mosquitto_db *db, struct mosquitto *context)
{
context->last_msg_in = mosquitto_time();
context->last_msg_in = db->now_s;
return MOSQ_ERR_SUCCESS;
}

@ -133,7 +133,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock
#ifdef WITH_PERSISTENCE
time_t last_backup = mosquitto_time();
#endif
time_t now = 0;
#ifdef WITH_WEBSOCKETS
int i;
#endif
@ -144,6 +143,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock
memset(&sul, 0, sizeof(struct lws_sorted_usec_list));
#endif
db->now_s = mosquitto_time();
db->now_real_s = time(NULL);
rc = mux__init(db, listensock, listensock_count);
if(rc) return rc;
@ -161,8 +163,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock
}
#endif
now = mosquitto_time();
keepalive__check(db, now);
keepalive__check(db);
#ifdef WITH_BRIDGE
bridge_check(db);
@ -171,9 +172,8 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock
rc = mux__handle(db, listensock, listensock_count);
if(rc) return rc;
now = time(NULL);
session_expiry__check(db, now);
will_delay__check(db, now);
session_expiry__check(db);
will_delay__check(db);
#ifdef WITH_PERSISTENCE
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
@ -182,9 +182,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock
db->persistence_changes = 0;
}
}else{
if(last_backup + db->config->autosave_interval < mosquitto_time()){
if(last_backup + db->config->autosave_interval < db->now_s){
persist__backup(db, false);
last_backup = mosquitto_time();
last_backup = db->now_s;
}
}
}

@ -450,6 +450,8 @@ int main(int argc, char *argv[])
#endif
memset(&int_db, 0, sizeof(struct mosquitto_db));
int_db.now_s = mosquitto_time();
int_db.now_real_s = time(NULL);
net__broker_init();

@ -522,6 +522,8 @@ struct mosquitto_db{
struct clientid__index_hash *clientid_index_hash;
struct mosquitto_msg_store *msg_store;
struct mosquitto_msg_store_load *msg_store_load;
time_t now_s; /* Monotonic clock, where possible */
time_t now_real_s; /* Read clock, for measuring session/message expiry */
#ifdef WITH_BRIDGE
int bridge_count;
#endif
@ -710,7 +712,7 @@ int db__message_count(int *count);
int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos);
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update);
int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid);
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos);
int db__message_update_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context, bool force_free);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
@ -829,10 +831,10 @@ void plugin__handle_tick(struct mosquitto_db *db);
* Property related functions
* ============================================================ */
int keepalive__add(struct mosquitto *context);
void keepalive__check(struct mosquitto_db *db, time_t now);
void keepalive__check(struct mosquitto_db *db);
int keepalive__remove(struct mosquitto *context);
void keepalive__remove_all(void);
int keepalive__update(struct mosquitto *context);
int keepalive__update(struct mosquitto_db *db, struct mosquitto *context);
/* ============================================================
* Property related functions
@ -879,7 +881,7 @@ void unpwd__free_item(struct mosquitto__unpwd **unpwd, struct mosquitto__unpwd *
int session_expiry__add(struct mosquitto_db *db, struct mosquitto *context);
void session_expiry__remove(struct mosquitto *context);
void session_expiry__remove_all(struct mosquitto_db *db);
void session_expiry__check(struct mosquitto_db *db, time_t now);
void session_expiry__check(struct mosquitto_db *db);
void session_expiry__send_all(struct mosquitto_db *db);
/* ============================================================
@ -908,8 +910,8 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso
/* ============================================================
* Will delay
* ============================================================ */
int will_delay__add(struct mosquitto *context);
void will_delay__check(struct mosquitto_db *db, time_t now);
int will_delay__add(struct mosquitto_db *db, struct mosquitto *context);
void will_delay__check(struct mosquitto_db *db);
void will_delay__send_all(struct mosquitto_db *db);
void will_delay__remove(struct mosquitto *mosq);

@ -191,6 +191,9 @@ int mux_epoll__handle(struct mosquitto_db *db)
event_count = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100);
sigprocmask(SIG_SETMASK, &origsig, NULL);
db->now_s = mosquitto_time();
db->now_real_s = time(NULL);
switch(event_count){
case -1:
if(errno != EINTR){

@ -186,6 +186,10 @@ int mux_poll__handle(struct mosquitto_db *db, struct mosquitto__listener_sock *l
#else
fdcount = WSAPoll(pollfds, pollfd_max, 100);
#endif
db->now_s = mosquitto_time();
db->now_real_s = time(NULL);
if(fdcount == -1){
# ifdef WIN32
if(WSAGetLastError() == WSAEINVAL){

@ -122,7 +122,7 @@ int retain__store(struct mosquitto_db *db, const char *topic, struct mosquitto_m
}
static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier *branch, struct mosquitto *context, uint8_t sub_qos, uint32_t subscription_identifier, time_t now)
static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier *branch, struct mosquitto *context, uint8_t sub_qos, uint32_t subscription_identifier)
{
int rc = 0;
uint8_t qos;
@ -130,7 +130,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier
mosquitto_property *properties = NULL;
struct mosquitto_msg_store *retained;
if(branch->retained->message_expiry_time > 0 && now >= branch->retained->message_expiry_time){
if(branch->retained->message_expiry_time > 0 && db->now_real_s >= branch->retained->message_expiry_time){
db__msg_store_ref_dec(db, &branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
@ -188,7 +188,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier
}
static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier *retainhier, char **split_topics, struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier, time_t now, int level)
static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier *retainhier, char **split_topics, struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier, int level)
{
struct mosquitto__retainhier *branch, *branch_tmp;
int flag = 0;
@ -201,26 +201,26 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier
*/
flag = -1;
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
retain__process(db, branch, context, sub_qos, subscription_identifier);
}
if(branch->children){
retain__search(db, branch, split_topics, context, sub, sub_qos, subscription_identifier, now, level+1);
retain__search(db, branch, split_topics, context, sub, sub_qos, subscription_identifier, level+1);
}
}
}else{
if(!strcmp(split_topics[0], "+")){
HASH_ITER(hh, retainhier->children, branch, branch_tmp){
if(split_topics[1] != NULL){
if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, now, level+1) == -1
if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, level+1) == -1
|| (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
retain__process(db, branch, context, sub_qos, subscription_identifier);
}
}
}else{
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
retain__process(db, branch, context, sub_qos, subscription_identifier);
}
}
}
@ -228,16 +228,16 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier
HASH_FIND(hh, retainhier->children, split_topics[0], strlen(split_topics[0]), branch);
if(branch){
if(split_topics[1] != NULL){
if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, now, level+1) == -1
if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, level+1) == -1
|| (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
retain__process(db, branch, context, sub_qos, subscription_identifier);
}
}
}else{
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
retain__process(db, branch, context, sub_qos, subscription_identifier);
}
}
}
@ -252,7 +252,6 @@ int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char
struct mosquitto__retainhier *retainhier;
char *local_sub;
char **split_topics;
time_t now;
int rc;
assert(db);
@ -265,8 +264,7 @@ int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char
HASH_FIND(hh, db->retains, split_topics[0], strlen(split_topics[0]), retainhier);
if(retainhier){
now = time(NULL);
retain__search(db, retainhier, split_topics, context, sub, sub_qos, subscription_identifier, now, 0);
retain__search(db, retainhier, split_topics, context, sub, sub_qos, subscription_identifier, 0);
}
mosquitto__free(local_sub);
mosquitto__free(split_topics);

@ -57,7 +57,7 @@ int session_expiry__add(struct mosquitto_db *db, struct mosquitto *context)
if(!item) return MOSQ_ERR_NOMEM;
item->context = context;
item->context->session_expiry_time = time(NULL);
item->context->session_expiry_time = db->now_real_s;
if(db->config->persistent_client_expiration == 0){
/* No global expiry, so use the client expiration interval */
@ -107,17 +107,17 @@ void session_expiry__remove_all(struct mosquitto_db *db)
}
void session_expiry__check(struct mosquitto_db *db, time_t now)
void session_expiry__check(struct mosquitto_db *db)
{
struct session_expiry_list *item, *tmp;
struct mosquitto *context;
if(now <= last_check) return;
if(db->now_real_s <= last_check) return;
last_check = now;
last_check = db->now_real_s;
DL_FOREACH_SAFE(expiry_list, item, tmp){
if(item->context->session_expiry_time < now){
if(item->context->session_expiry_time < db->now_real_s){
context = item->context;
session_expiry__remove(context);

@ -160,7 +160,6 @@ static void calc_load(struct mosquitto_db *db, char *buf, const char *topic, boo
void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
{
static time_t last_update = 0;
time_t now;
time_t uptime;
char buf[BUFLEN];
@ -221,10 +220,8 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
uint32_t len;
bool initial_publish;
now = mosquitto_time();
if(interval && now - interval > last_update){
uptime = now - start_time;
if(interval && db->now_s - interval > last_update){
uptime = db->now_s - start_time;
len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime);
db__messages_easy_queue(db, NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 60, NULL);
@ -235,7 +232,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
last_update = 1;
}
if(last_update > 0){
i_mult = 60.0/(double)(now-last_update);
i_mult = 60.0/(double)(db->now_s-last_update);
msgs_received_interval = (double)(g_msgs_received - msgs_received)*i_mult;
msgs_sent_interval = (double)(g_msgs_sent - msgs_sent)*i_mult;
@ -253,7 +250,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
g_connection_count = 0;
/* 1 minute load */
exponent = exp(-1.0*(double)(now-last_update)/60.0);
exponent = exp(-1.0*(double)(db->now_s-last_update)/60.0);
calc_load(db, buf, "$SYS/broker/load/messages/received/1min", initial_publish, exponent, msgs_received_interval, &msgs_received_load1);
calc_load(db, buf, "$SYS/broker/load/messages/sent/1min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load1);
@ -266,7 +263,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
calc_load(db, buf, "$SYS/broker/load/connections/1min", initial_publish, exponent, connection_interval, &connection_load1);
/* 5 minute load */
exponent = exp(-1.0*(double)(now-last_update)/300.0);
exponent = exp(-1.0*(double)(db->now_s-last_update)/300.0);
calc_load(db, buf, "$SYS/broker/load/messages/received/5min", initial_publish, exponent, msgs_received_interval, &msgs_received_load5);
calc_load(db, buf, "$SYS/broker/load/messages/sent/5min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load5);
@ -279,7 +276,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
calc_load(db, buf, "$SYS/broker/load/connections/5min", initial_publish, exponent, connection_interval, &connection_load5);
/* 15 minute load */
exponent = exp(-1.0*(double)(now-last_update)/900.0);
exponent = exp(-1.0*(double)(db->now_s-last_update)/900.0);
calc_load(db, buf, "$SYS/broker/load/messages/received/15min", initial_publish, exponent, msgs_received_interval, &msgs_received_load15);
calc_load(db, buf, "$SYS/broker/load/messages/sent/15min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load15);
@ -381,7 +378,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time)
db__messages_easy_queue(db, NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL);
}
last_update = mosquitto_time();
last_update = db->now_s;
}
}

@ -351,7 +351,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
packet__cleanup(packet);
mosquitto__free(packet);
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
mosq->next_msg_out = db->now_s + mosq->keepalive;
}
if (mosq->state == mosq_cs_disconnect_ws
|| mosq->state == mosq_cs_disconnecting

@ -34,7 +34,7 @@ static int will_delay__cmp(struct will_delay_list *i1, struct will_delay_list *i
}
int will_delay__add(struct mosquitto *context)
int will_delay__add(struct mosquitto_db *db, struct mosquitto *context)
{
struct will_delay_list *item;
@ -43,7 +43,7 @@ int will_delay__add(struct mosquitto *context)
item->context = context;
context->will_delay_entry = item;
item->context->will_delay_time = time(NULL) + item->context->will_delay_interval;
item->context->will_delay_time = db->now_real_s + item->context->will_delay_interval;
DL_INSERT_INORDER(delay_list, item, will_delay__cmp);
@ -66,16 +66,16 @@ void will_delay__send_all(struct mosquitto_db *db)
}
void will_delay__check(struct mosquitto_db *db, time_t now)
void will_delay__check(struct mosquitto_db *db)
{
struct will_delay_list *item, *tmp;
if(now <= last_check) return;
if(db->now_real_s <= last_check) return;
last_check = now;
last_check = db->now_real_s;
DL_FOREACH_SAFE(delay_list, item, tmp){
if(item->context->will_delay_time < now){
if(item->context->will_delay_time < db->now_real_s){
DL_DELETE(delay_list, item);
item->context->will_delay_interval = 0;
item->context->will_delay_entry = NULL;

Loading…
Cancel
Save