From 108b23ce6d3a3d71bd3ac57bd6712010820c66c5 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Fri, 6 Nov 2020 12:11:40 +0000 Subject: [PATCH] Reduce use of mosquitto_time() and time(). --- apps/db_dump/stubs.c | 2 +- lib/handle_pubrec.c | 2 +- lib/packet_mosq.c | 10 +++++++++- lib/util_mosq.c | 8 +++++++- src/bridge.c | 27 ++++++++++++--------------- src/context.c | 6 +++--- src/database.c | 30 +++++++++++------------------- src/keepalive.c | 12 ++++++------ src/loop.c | 16 ++++++++-------- src/mosquitto.c | 2 ++ src/mosquitto_broker_internal.h | 14 ++++++++------ src/mux_epoll.c | 3 +++ src/mux_poll.c | 4 ++++ src/retain.c | 26 ++++++++++++-------------- src/session_expiry.c | 10 +++++----- src/sys_tree.c | 17 +++++++---------- src/websockets.c | 2 +- src/will_delay.c | 12 ++++++------ 18 files changed, 106 insertions(+), 97 deletions(-) diff --git a/apps/db_dump/stubs.c b/apps/db_dump/stubs.c index db5a1aa7..beefd873 100644 --- a/apps/db_dump/stubs.c +++ b/apps/db_dump/stubs.c @@ -63,7 +63,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch return 0; } -int keepalive__update(struct mosquitto *context) +int keepalive__update(struct mosquitto_db *db, struct mosquitto *context) { return 0; } diff --git a/lib/handle_pubrec.c b/lib/handle_pubrec.c index 14ee776f..d35b8554 100644 --- a/lib/handle_pubrec.c +++ b/lib/handle_pubrec.c @@ -70,7 +70,7 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq) log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid); if(reason_code < 0x80){ - rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2); + rc = db__message_update_outgoing(db, mosq, mid, mosq_ms_wait_for_pubcomp, 2); }else{ return db__message_delete_outgoing(db, mosq, mid, mosq_ms_wait_for_pubrec, 2); } diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 2b9813a7..099d1eb1 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -303,9 +303,13 @@ int packet__write(struct mosquitto *mosq) packet__cleanup(packet); mosquitto__free(packet); +#ifdef WITH_BROKER + mosq->next_msg_out = mosquitto_time() + mosq->keepalive; +#else pthread_mutex_lock(&mosq->msgtime_mutex); mosq->next_msg_out = mosquitto_time() + mosq->keepalive; pthread_mutex_unlock(&mosq->msgtime_mutex); +#endif } pthread_mutex_unlock(&mosq->current_out_packet_mutex); return MOSQ_ERR_SUCCESS; @@ -466,7 +470,7 @@ int packet__read(struct mosquitto *mosq) * If a client can't send 1000 bytes in a second it * probably shouldn't be using a 1 second keep alive. */ #ifdef WITH_BROKER - keepalive__update(mosq); + keepalive__update(db, mosq); #else pthread_mutex_lock(&mosq->msgtime_mutex); mosq->last_msg_in = mosquitto_time(); @@ -502,8 +506,12 @@ int packet__read(struct mosquitto *mosq) /* Free data and reset values */ packet__cleanup(&mosq->in_packet); +#ifdef WITH_BROKER + keepalive__update(db, mosq); +#else pthread_mutex_lock(&mosq->msgtime_mutex); mosq->last_msg_in = mosquitto_time(); pthread_mutex_unlock(&mosq->msgtime_mutex); +#endif return rc; } diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 7c886cf2..cddf64b5 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -65,13 +65,19 @@ int mosquitto__check_keepalive(struct mosquitto *mosq) { time_t next_msg_out; time_t last_msg_in; - time_t now = mosquitto_time(); + time_t now; #ifndef WITH_BROKER int rc; #endif int state; assert(mosq); +#ifdef WITH_BROKER + now = db->now_s; +#else + now = mosquitto_time(); +#endif + #if defined(WITH_BROKER) && defined(WITH_BRIDGE) /* Check if a lazy bridge should be timed out due to idle. */ if(mosq->bridge && mosq->bridge->start_type == bst_lazy diff --git a/src/bridge.c b/src/bridge.c index 6496b6fa..7e3034d4 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -155,8 +155,8 @@ int bridge__connect_step1(struct mosquitto_db *db, struct mosquitto *context) mosquitto__set_state(context, mosq_cs_new); context->sock = INVALID_SOCKET; - context->last_msg_in = mosquitto_time(); - context->next_msg_out = mosquitto_time() + context->bridge->keepalive; + context->last_msg_in = db->now_s; + context->next_msg_out = db->now_s + context->bridge->keepalive; context->keepalive = context->bridge->keepalive; context->clean_start = context->bridge->clean_start; context->in_packet.payload = NULL; @@ -297,7 +297,7 @@ int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context) } if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ - context->bridge->primary_retry = mosquitto_time() + 5; + context->bridge->primary_retry = db->now_s + 5; } rc = send__connect(context, context->keepalive, context->clean_start, NULL); @@ -333,8 +333,8 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context) mosquitto__set_state(context, mosq_cs_new); context->sock = INVALID_SOCKET; - context->last_msg_in = mosquitto_time(); - context->next_msg_out = mosquitto_time() + context->bridge->keepalive; + context->last_msg_in = db->now_s; + context->next_msg_out = db->now_s + context->bridge->keepalive; context->keepalive = context->bridge->keepalive; context->clean_start = context->bridge->clean_start; context->in_packet.payload = NULL; @@ -655,16 +655,13 @@ static void bridge__backoff_reset(struct mosquitto *context) void bridge_check(struct mosquitto_db *db) { static time_t last_check = 0; - time_t now; struct mosquitto *context = NULL; socklen_t len; int i; int rc; int err; - now = mosquitto_time(); - - if(now <= last_check) return; + if(db->now_s <= last_check) return; for(i=0; ibridge_count; i++){ if(!db->bridges[i]) continue; @@ -679,7 +676,7 @@ void bridge_check(struct mosquitto_db *db) if(context->bridge->round_robin == false && context->bridge->cur_address != 0 && context->bridge->primary_retry - && now > context->bridge->primary_retry){ + && db->now_s > context->bridge->primary_retry){ if(context->bridge->primary_retry_sock == INVALID_SOCKET){ rc = net__try_connect(context->bridge->addresses[0].address, @@ -706,12 +703,12 @@ void bridge_check(struct mosquitto_db *db) }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; + context->bridge->primary_retry = db->now_s+5; } }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; - context->bridge->primary_retry = now+5; + context->bridge->primary_retry = db->now_s+5; } } } @@ -722,14 +719,14 @@ void bridge_check(struct mosquitto_db *db) if(context->sock == INVALID_SOCKET){ /* Want to try to restart the bridge connection */ if(!context->bridge->restart_t){ - context->bridge->restart_t = now+context->bridge->restart_timeout; + context->bridge->restart_t = db->now_s+context->bridge->restart_timeout; context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } }else{ if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) - || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ + || (context->bridge->start_type == bst_automatic && db->now_s > context->bridge->restart_t)){ #if defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ @@ -780,7 +777,7 @@ void bridge_check(struct mosquitto_db *db) context->bridge->restart_t = 0; if(rc == MOSQ_ERR_SUCCESS){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ - context->bridge->primary_retry = now + 5; + context->bridge->primary_retry = db->now_s + 5; } mux__add_in(db, context); if(context->current_out_packet){ diff --git a/src/context.c b/src/context.c index e7d53ef1..e1b9d0ab 100644 --- a/src/context.c +++ b/src/context.c @@ -44,8 +44,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context->pollfd_index = -1; mosquitto__set_state(context, mosq_cs_new); context->sock = sock; - context->last_msg_in = mosquitto_time(); - context->next_msg_out = mosquitto_time() + 60; + context->last_msg_in = db->now_s; + context->next_msg_out = db->now_s + 60; context->keepalive = 60; /* Default to 60s */ context->clean_start = true; context->id = NULL; @@ -172,7 +172,7 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt) { if(ctxt->state != mosq_cs_disconnecting && ctxt->will){ if(ctxt->will_delay_interval > 0){ - will_delay__add(ctxt); + will_delay__add(db, ctxt); return; } diff --git a/src/database.c b/src/database.c index 17058961..718a6c96 100644 --- a/src/database.c +++ b/src/database.c @@ -331,7 +331,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte } msg_index++; - tail->timestamp = mosquitto_time(); + tail->timestamp = db->now_s; switch(tail->qos){ case 0: tail->state = mosq_ms_publish_qos0; @@ -476,7 +476,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 msg->store = stored; db__msg_store_ref_inc(msg->store); msg->mid = mid; - msg->timestamp = mosquitto_time(); + msg->timestamp = db->now_s; msg->direction = dir; msg->state = state; msg->dup = false; @@ -543,7 +543,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 return rc; } -int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos) +int db__message_update_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos) { struct mosquitto_client_msg *tail; @@ -553,7 +553,7 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo return MOSQ_ERR_PROTOCOL; } tail->state = state; - tail->timestamp = mosquitto_time(); + tail->timestamp = db->now_s; return MOSQ_ERR_SUCCESS; } } @@ -686,7 +686,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, s stored->mid = 0; stored->origin = origin; if(message_expiry_interval > 0){ - stored->message_expiry_time = time(NULL) + message_expiry_interval; + stored->message_expiry_time = db->now_real_s + message_expiry_interval; }else{ stored->message_expiry_time = 0; } @@ -921,7 +921,7 @@ int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *cont } msg_index++; - tail->timestamp = mosquitto_time(); + tail->timestamp = db->now_s; if(tail->qos == 2){ send__pubrec(context, tail->mid, 0, NULL); @@ -940,7 +940,6 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con { struct mosquitto_client_msg *tail, *tmp; int rc; - time_t now = 0; if(context->state != mosq_cs_active){ return MOSQ_ERR_SUCCESS; @@ -948,10 +947,7 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){ if(tail->store->message_expiry_time){ - if(now == 0){ - now = time(NULL); - } - if(now > tail->store->message_expiry_time){ + if(db->now_real_s > tail->store->message_expiry_time){ /* Message is expired, must not send. */ db__message_remove(db, &context->msgs_in, tail); if(tail->qos > 0){ @@ -1008,15 +1004,11 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct uint8_t qos; uint32_t payloadlen; const void *payload; - time_t now = 0; uint32_t expiry_interval; expiry_interval = 0; if(msg->store->message_expiry_time){ - if(now == 0){ - now = time(NULL); - } - if(now > msg->store->message_expiry_time){ + if(db->now_real_s > msg->store->message_expiry_time){ /* Message is expired, must not send. */ if(msg->direction == mosq_md_out && msg->qos > 0){ util__increment_send_quota(context); @@ -1024,7 +1016,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct db__message_remove(db, &context->msgs_out, msg); return MOSQ_ERR_SUCCESS; }else{ - expiry_interval = (uint32_t)(msg->store->message_expiry_time - now); + expiry_interval = (uint32_t)(msg->store->message_expiry_time - db->now_real_s); } } mid = msg->mid; @@ -1050,7 +1042,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct case mosq_ms_publish_qos1: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS){ - msg->timestamp = mosquitto_time(); + msg->timestamp = db->now_s; msg->dup = 1; /* Any retry attempts are a duplicate. */ msg->state = mosq_ms_wait_for_puback; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ @@ -1063,7 +1055,7 @@ static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct case mosq_ms_publish_qos2: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS){ - msg->timestamp = mosquitto_time(); + msg->timestamp = db->now_s; msg->dup = 1; /* Any retry attempts are a duplicate. */ msg->state = mosq_ms_wait_for_pubrec; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ diff --git a/src/keepalive.c b/src/keepalive.c index 9c6c746d..b39454ce 100644 --- a/src/keepalive.c +++ b/src/keepalive.c @@ -29,12 +29,12 @@ int keepalive__add(struct mosquitto *context) } -void keepalive__check(struct mosquitto_db *db, time_t now) +void keepalive__check(struct mosquitto_db *db) { struct mosquitto *context, *ctxt_tmp; - if(last_keepalive_check + 5 < now){ - last_keepalive_check = now; + if(last_keepalive_check + 5 < db->now_s){ + last_keepalive_check = db->now_s; /* FIXME - this needs replacing with something more efficient */ HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ @@ -42,7 +42,7 @@ void keepalive__check(struct mosquitto_db *db, time_t now) /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge - || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ + || db->now_s - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ }else{ /* Client has exceeded keepalive*1.5 */ @@ -65,8 +65,8 @@ void keepalive__remove_all(void) } -int keepalive__update(struct mosquitto *context) +int keepalive__update(struct mosquitto_db *db, struct mosquitto *context) { - context->last_msg_in = mosquitto_time(); + context->last_msg_in = db->now_s; return MOSQ_ERR_SUCCESS; } diff --git a/src/loop.c b/src/loop.c index a975f4e6..1b917dda 100644 --- a/src/loop.c +++ b/src/loop.c @@ -133,7 +133,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock #ifdef WITH_PERSISTENCE time_t last_backup = mosquitto_time(); #endif - time_t now = 0; #ifdef WITH_WEBSOCKETS int i; #endif @@ -144,6 +143,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock memset(&sul, 0, sizeof(struct lws_sorted_usec_list)); #endif + db->now_s = mosquitto_time(); + db->now_real_s = time(NULL); + rc = mux__init(db, listensock, listensock_count); if(rc) return rc; @@ -161,8 +163,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock } #endif - now = mosquitto_time(); - keepalive__check(db, now); + keepalive__check(db); #ifdef WITH_BRIDGE bridge_check(db); @@ -171,9 +172,8 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock rc = mux__handle(db, listensock, listensock_count); if(rc) return rc; - now = time(NULL); - session_expiry__check(db, now); - will_delay__check(db, now); + session_expiry__check(db); + will_delay__check(db); #ifdef WITH_PERSISTENCE if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ @@ -182,9 +182,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock db->persistence_changes = 0; } }else{ - if(last_backup + db->config->autosave_interval < mosquitto_time()){ + if(last_backup + db->config->autosave_interval < db->now_s){ persist__backup(db, false); - last_backup = mosquitto_time(); + last_backup = db->now_s; } } } diff --git a/src/mosquitto.c b/src/mosquitto.c index 582b6a98..def9a79a 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -450,6 +450,8 @@ int main(int argc, char *argv[]) #endif memset(&int_db, 0, sizeof(struct mosquitto_db)); + int_db.now_s = mosquitto_time(); + int_db.now_real_s = time(NULL); net__broker_init(); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index d7c84cb9..d784e4f9 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -522,6 +522,8 @@ struct mosquitto_db{ struct clientid__index_hash *clientid_index_hash; struct mosquitto_msg_store *msg_store; struct mosquitto_msg_store_load *msg_store_load; + time_t now_s; /* Monotonic clock, where possible */ + time_t now_real_s; /* Read clock, for measuring session/message expiry */ #ifdef WITH_BRIDGE int bridge_count; #endif @@ -710,7 +712,7 @@ int db__message_count(int *count); int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos); int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update); int db__message_release_incoming(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid); -int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos); +int db__message_update_outgoing(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context, bool force_free); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); @@ -829,10 +831,10 @@ void plugin__handle_tick(struct mosquitto_db *db); * Property related functions * ============================================================ */ int keepalive__add(struct mosquitto *context); -void keepalive__check(struct mosquitto_db *db, time_t now); +void keepalive__check(struct mosquitto_db *db); int keepalive__remove(struct mosquitto *context); void keepalive__remove_all(void); -int keepalive__update(struct mosquitto *context); +int keepalive__update(struct mosquitto_db *db, struct mosquitto *context); /* ============================================================ * Property related functions @@ -879,7 +881,7 @@ void unpwd__free_item(struct mosquitto__unpwd **unpwd, struct mosquitto__unpwd * int session_expiry__add(struct mosquitto_db *db, struct mosquitto *context); void session_expiry__remove(struct mosquitto *context); void session_expiry__remove_all(struct mosquitto_db *db); -void session_expiry__check(struct mosquitto_db *db, time_t now); +void session_expiry__check(struct mosquitto_db *db); void session_expiry__send_all(struct mosquitto_db *db); /* ============================================================ @@ -908,8 +910,8 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context, int reaso /* ============================================================ * Will delay * ============================================================ */ -int will_delay__add(struct mosquitto *context); -void will_delay__check(struct mosquitto_db *db, time_t now); +int will_delay__add(struct mosquitto_db *db, struct mosquitto *context); +void will_delay__check(struct mosquitto_db *db); void will_delay__send_all(struct mosquitto_db *db); void will_delay__remove(struct mosquitto *mosq); diff --git a/src/mux_epoll.c b/src/mux_epoll.c index 47e72674..c9ec599e 100644 --- a/src/mux_epoll.c +++ b/src/mux_epoll.c @@ -191,6 +191,9 @@ int mux_epoll__handle(struct mosquitto_db *db) event_count = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100); sigprocmask(SIG_SETMASK, &origsig, NULL); + db->now_s = mosquitto_time(); + db->now_real_s = time(NULL); + switch(event_count){ case -1: if(errno != EINTR){ diff --git a/src/mux_poll.c b/src/mux_poll.c index e2e66bdb..60279f76 100644 --- a/src/mux_poll.c +++ b/src/mux_poll.c @@ -186,6 +186,10 @@ int mux_poll__handle(struct mosquitto_db *db, struct mosquitto__listener_sock *l #else fdcount = WSAPoll(pollfds, pollfd_max, 100); #endif + + db->now_s = mosquitto_time(); + db->now_real_s = time(NULL); + if(fdcount == -1){ # ifdef WIN32 if(WSAGetLastError() == WSAEINVAL){ diff --git a/src/retain.c b/src/retain.c index bbe4460c..da8265db 100644 --- a/src/retain.c +++ b/src/retain.c @@ -122,7 +122,7 @@ int retain__store(struct mosquitto_db *db, const char *topic, struct mosquitto_m } -static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier *branch, struct mosquitto *context, uint8_t sub_qos, uint32_t subscription_identifier, time_t now) +static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier *branch, struct mosquitto *context, uint8_t sub_qos, uint32_t subscription_identifier) { int rc = 0; uint8_t qos; @@ -130,7 +130,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier mosquitto_property *properties = NULL; struct mosquitto_msg_store *retained; - if(branch->retained->message_expiry_time > 0 && now >= branch->retained->message_expiry_time){ + if(branch->retained->message_expiry_time > 0 && db->now_real_s >= branch->retained->message_expiry_time){ db__msg_store_ref_dec(db, &branch->retained); branch->retained = NULL; #ifdef WITH_SYS_TREE @@ -188,7 +188,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__retainhier } -static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier *retainhier, char **split_topics, struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier, time_t now, int level) +static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier *retainhier, char **split_topics, struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier, int level) { struct mosquitto__retainhier *branch, *branch_tmp; int flag = 0; @@ -201,26 +201,26 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier */ flag = -1; if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + retain__process(db, branch, context, sub_qos, subscription_identifier); } if(branch->children){ - retain__search(db, branch, split_topics, context, sub, sub_qos, subscription_identifier, now, level+1); + retain__search(db, branch, split_topics, context, sub, sub_qos, subscription_identifier, level+1); } } }else{ if(!strcmp(split_topics[0], "+")){ HASH_ITER(hh, retainhier->children, branch, branch_tmp){ if(split_topics[1] != NULL){ - if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, level+1) == -1 || (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){ if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + retain__process(db, branch, context, sub_qos, subscription_identifier); } } }else{ if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + retain__process(db, branch, context, sub_qos, subscription_identifier); } } } @@ -228,16 +228,16 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__retainhier HASH_FIND(hh, retainhier->children, split_topics[0], strlen(split_topics[0]), branch); if(branch){ if(split_topics[1] != NULL){ - if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + if(retain__search(db, branch, &(split_topics[1]), context, sub, sub_qos, subscription_identifier, level+1) == -1 || (split_topics[1] != NULL && !strcmp(split_topics[1], "#") && level>0)){ if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + retain__process(db, branch, context, sub_qos, subscription_identifier); } } }else{ if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + retain__process(db, branch, context, sub_qos, subscription_identifier); } } } @@ -252,7 +252,6 @@ int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char struct mosquitto__retainhier *retainhier; char *local_sub; char **split_topics; - time_t now; int rc; assert(db); @@ -265,8 +264,7 @@ int retain__queue(struct mosquitto_db *db, struct mosquitto *context, const char HASH_FIND(hh, db->retains, split_topics[0], strlen(split_topics[0]), retainhier); if(retainhier){ - now = time(NULL); - retain__search(db, retainhier, split_topics, context, sub, sub_qos, subscription_identifier, now, 0); + retain__search(db, retainhier, split_topics, context, sub, sub_qos, subscription_identifier, 0); } mosquitto__free(local_sub); mosquitto__free(split_topics); diff --git a/src/session_expiry.c b/src/session_expiry.c index 462cdab1..a121a9c0 100644 --- a/src/session_expiry.c +++ b/src/session_expiry.c @@ -57,7 +57,7 @@ int session_expiry__add(struct mosquitto_db *db, struct mosquitto *context) if(!item) return MOSQ_ERR_NOMEM; item->context = context; - item->context->session_expiry_time = time(NULL); + item->context->session_expiry_time = db->now_real_s; if(db->config->persistent_client_expiration == 0){ /* No global expiry, so use the client expiration interval */ @@ -107,17 +107,17 @@ void session_expiry__remove_all(struct mosquitto_db *db) } -void session_expiry__check(struct mosquitto_db *db, time_t now) +void session_expiry__check(struct mosquitto_db *db) { struct session_expiry_list *item, *tmp; struct mosquitto *context; - if(now <= last_check) return; + if(db->now_real_s <= last_check) return; - last_check = now; + last_check = db->now_real_s; DL_FOREACH_SAFE(expiry_list, item, tmp){ - if(item->context->session_expiry_time < now){ + if(item->context->session_expiry_time < db->now_real_s){ context = item->context; session_expiry__remove(context); diff --git a/src/sys_tree.c b/src/sys_tree.c index 03c96bd1..af39e137 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -160,7 +160,6 @@ static void calc_load(struct mosquitto_db *db, char *buf, const char *topic, boo void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) { static time_t last_update = 0; - time_t now; time_t uptime; char buf[BUFLEN]; @@ -221,10 +220,8 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) uint32_t len; bool initial_publish; - now = mosquitto_time(); - - if(interval && now - interval > last_update){ - uptime = now - start_time; + if(interval && db->now_s - interval > last_update){ + uptime = db->now_s - start_time; len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime); db__messages_easy_queue(db, NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 60, NULL); @@ -235,7 +232,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) last_update = 1; } if(last_update > 0){ - i_mult = 60.0/(double)(now-last_update); + i_mult = 60.0/(double)(db->now_s-last_update); msgs_received_interval = (double)(g_msgs_received - msgs_received)*i_mult; msgs_sent_interval = (double)(g_msgs_sent - msgs_sent)*i_mult; @@ -253,7 +250,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) g_connection_count = 0; /* 1 minute load */ - exponent = exp(-1.0*(double)(now-last_update)/60.0); + exponent = exp(-1.0*(double)(db->now_s-last_update)/60.0); calc_load(db, buf, "$SYS/broker/load/messages/received/1min", initial_publish, exponent, msgs_received_interval, &msgs_received_load1); calc_load(db, buf, "$SYS/broker/load/messages/sent/1min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load1); @@ -266,7 +263,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) calc_load(db, buf, "$SYS/broker/load/connections/1min", initial_publish, exponent, connection_interval, &connection_load1); /* 5 minute load */ - exponent = exp(-1.0*(double)(now-last_update)/300.0); + exponent = exp(-1.0*(double)(db->now_s-last_update)/300.0); calc_load(db, buf, "$SYS/broker/load/messages/received/5min", initial_publish, exponent, msgs_received_interval, &msgs_received_load5); calc_load(db, buf, "$SYS/broker/load/messages/sent/5min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load5); @@ -279,7 +276,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) calc_load(db, buf, "$SYS/broker/load/connections/5min", initial_publish, exponent, connection_interval, &connection_load5); /* 15 minute load */ - exponent = exp(-1.0*(double)(now-last_update)/900.0); + exponent = exp(-1.0*(double)(db->now_s-last_update)/900.0); calc_load(db, buf, "$SYS/broker/load/messages/received/15min", initial_publish, exponent, msgs_received_interval, &msgs_received_load15); calc_load(db, buf, "$SYS/broker/load/messages/sent/15min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load15); @@ -381,7 +378,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) db__messages_easy_queue(db, NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); } - last_update = mosquitto_time(); + last_update = db->now_s; } } diff --git a/src/websockets.c b/src/websockets.c index 35d81054..5b59e00d 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -351,7 +351,7 @@ static int callback_mqtt(struct libwebsocket_context *context, packet__cleanup(packet); mosquitto__free(packet); - mosq->next_msg_out = mosquitto_time() + mosq->keepalive; + mosq->next_msg_out = db->now_s + mosq->keepalive; } if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting diff --git a/src/will_delay.c b/src/will_delay.c index 8a66f1fa..94f98d7f 100644 --- a/src/will_delay.c +++ b/src/will_delay.c @@ -34,7 +34,7 @@ static int will_delay__cmp(struct will_delay_list *i1, struct will_delay_list *i } -int will_delay__add(struct mosquitto *context) +int will_delay__add(struct mosquitto_db *db, struct mosquitto *context) { struct will_delay_list *item; @@ -43,7 +43,7 @@ int will_delay__add(struct mosquitto *context) item->context = context; context->will_delay_entry = item; - item->context->will_delay_time = time(NULL) + item->context->will_delay_interval; + item->context->will_delay_time = db->now_real_s + item->context->will_delay_interval; DL_INSERT_INORDER(delay_list, item, will_delay__cmp); @@ -66,16 +66,16 @@ void will_delay__send_all(struct mosquitto_db *db) } -void will_delay__check(struct mosquitto_db *db, time_t now) +void will_delay__check(struct mosquitto_db *db) { struct will_delay_list *item, *tmp; - if(now <= last_check) return; + if(db->now_real_s <= last_check) return; - last_check = now; + last_check = db->now_real_s; DL_FOREACH_SAFE(delay_list, item, tmp){ - if(item->context->will_delay_time < now){ + if(item->context->will_delay_time < db->now_real_s){ DL_DELETE(delay_list, item); item->context->will_delay_interval = 0; item->context->will_delay_entry = NULL;