diff --git a/ChangeLog.txt b/ChangeLog.txt index f9620676..30b344bd 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -64,6 +64,9 @@ Broker: versions are allowed for a particular listener. - Add `disable_client_cert_date_checks` option to allow expired client certificate to be considered valid. +- Improve idle performance. The broker now calculates when the next event of + interest is, and uses that as the timeout for e.g. epoll_wait(). This can + reduce the number of process wakeups by 100x on an idle broker. Client library: - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index cdda4b46..3b79bf1a 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -158,7 +158,7 @@ struct mosquitto_evt_message { struct mosquitto_evt_tick { void *future; long now_ns; - long next_ns; + long next_ms; time_t now_s; time_t next_s; void *future2[4]; diff --git a/lib/time_mosq.c b/lib/time_mosq.c index da4a9154..97d23d4b 100644 --- a/lib/time_mosq.c +++ b/lib/time_mosq.c @@ -63,3 +63,26 @@ time_t mosquitto_time(void) #endif } +void mosquitto_time_ns(time_t *s, long *ns) +{ +#ifdef WIN32 + SYSTEMTIME st; + GetLocalTime(&st); + *s = st.wSecond + *ns = st.wMilliseconds*1000000L; + gettimeofday(&tv, NULL); + srand((unsigned int)(tv.tv_sec + tv.tv_usec)); +#elif _POSIX_TIMERS>0 && defined(_POSIX_MONOTONIC_CLOCK) + struct timespec tp; + + clock_gettime(CLOCK_REALTIME, &tp); + *s = tp.tv_sec; + *ns = tp.tv_nsec; +#else + struct timeval tv; + + gettimeofday(&tv, NULL); + *s = tv.tv_sec; + *ns = tv.tv_usec * 1000; +#endif +} diff --git a/lib/time_mosq.h b/lib/time_mosq.h index 175d9732..9cceb672 100644 --- a/lib/time_mosq.h +++ b/lib/time_mosq.h @@ -20,5 +20,6 @@ Contributors: #define TIME_MOSQ_H time_t mosquitto_time(void); +void mosquitto_time_ns(time_t *s, long *ns); #endif diff --git a/plugins/examples/delayed-auth/mosquitto_delayed_auth.c b/plugins/examples/delayed-auth/mosquitto_delayed_auth.c index 16c3dd3a..7725e685 100644 --- a/plugins/examples/delayed-auth/mosquitto_delayed_auth.c +++ b/plugins/examples/delayed-auth/mosquitto_delayed_auth.c @@ -107,16 +107,16 @@ static int basic_auth_callback(int event, void *event_data, void *userdata) static int tick_callback(int event, void *event_data, void *userdata) { + struct mosquitto_evt_tick *ed = event_data; struct client_list *client, *client_tmp; time_t now; long r; UNUSED(event); - UNUSED(event_data); UNUSED(userdata); now = time(NULL); - if(now > last_check){ + if(now >= last_check){ HASH_ITER(hh, clients, client, client_tmp){ if(authentication_check(client, now)){ /* Deny access 1/4 of the time, yes it's biased number generation. */ @@ -134,6 +134,9 @@ static int tick_callback(int event, void *event_data, void *userdata) } last_check = now; } + /* Declare that we want another call in at most 1 second */ + ed->next_s = 1; + return MOSQ_ERR_SUCCESS; } diff --git a/src/bridge.c b/src/bridge.c index a6cd3fb1..5ee500f4 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -383,6 +383,7 @@ int bridge__connect_step3(struct mosquitto *context) if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = db.now_s + 5; + loop__update_next_event(5000); } if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN; @@ -928,7 +929,7 @@ void bridge_check(void) if(context->bridge->round_robin == false && context->bridge->cur_address != 0 && context->bridge->primary_retry - && db.now_s > context->bridge->primary_retry){ + && db.now_s >= context->bridge->primary_retry){ if(context->bridge->primary_retry_sock == INVALID_SOCKET){ rc = net__try_connect(context->bridge->addresses[0].address, @@ -958,11 +959,13 @@ void bridge_check(void) COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = db.now_s+5; + loop__update_next_event(5000); } }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = db.now_s+5; + loop__update_next_event(5000); } } } @@ -978,9 +981,10 @@ void bridge_check(void) if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } + loop__update_next_event(context->bridge->restart_timeout*1000); }else{ if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) - || (context->bridge->start_type == bst_automatic && db.now_s > context->bridge->restart_t)){ + || (context->bridge->start_type == bst_automatic && db.now_s >= context->bridge->restart_t)){ #if defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ @@ -1034,6 +1038,7 @@ void bridge_check(void) if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = db.now_s + 5; + loop__update_next_event(5000); } mux__new(context); if(context->out_packet){ diff --git a/src/keepalive.c b/src/keepalive.c index 2e280e93..ce2d6852 100644 --- a/src/keepalive.c +++ b/src/keepalive.c @@ -36,8 +36,16 @@ int keepalive__add(struct mosquitto *context) void keepalive__check(void) { struct mosquitto *context, *ctxt_tmp; + time_t timeout; - if(last_keepalive_check + 5 < db.now_s){ + if(db.contexts_by_sock){ + timeout = (last_keepalive_check + 5 - db.now_s); + if(timeout <= 0){ + timeout = 5; + } + loop__update_next_event(timeout*1000); + } + if(last_keepalive_check + 5 <= db.now_s){ last_keepalive_check = db.now_s; /* FIXME - this needs replacing with something more efficient */ diff --git a/src/loop.c b/src/loop.c index 443d9924..948fff7f 100644 --- a/src/loop.c +++ b/src/loop.c @@ -161,6 +161,14 @@ static void queue_plugin_msgs(void) } +void loop__update_next_event(time_t new_ms) +{ + if(new_ms > 0 && new_ms < db.next_event_ms){ + db.next_event_ms = (int)new_ms; + } +} + + int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count) { #ifdef WITH_PERSISTENCE @@ -187,6 +195,8 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens while(g_run){ queue_plugin_msgs(); context__free_disused(); + + db.next_event_ms = 86400000; #ifdef WITH_SYS_TREE if(db.config->sys_interval > 0){ sys_tree__update(); @@ -198,6 +208,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens #ifdef WITH_BRIDGE bridge_check(); #endif + plugin__handle_tick(); rc = mux__handle(listensock, listensock_count); if(rc) return rc; @@ -269,7 +280,6 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens } } #endif - plugin__handle_tick(); } mux__cleanup(); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index c8691dd0..5bdfa783 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -471,6 +471,7 @@ struct mosquitto_db{ struct mosquitto_msg_store_load *msg_store_load; time_t now_s; /* Monotonic clock, where possible */ time_t now_real_s; /* Read clock, for measuring session/message expiry */ + int next_event_ms; /* for mux timeout */ int msg_store_count; unsigned long msg_store_bytes; char *config_file; @@ -622,6 +623,7 @@ extern struct mosquitto_db db; * Main functions * ============================================================ */ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count); +void loop__update_next_event(time_t new_ms); /* ============================================================ * Config functions diff --git a/src/mux_epoll.c b/src/mux_epoll.c index 118f0256..466be8df 100644 --- a/src/mux_epoll.c +++ b/src/mux_epoll.c @@ -35,7 +35,6 @@ Contributors: static void loop_handle_reads_writes(struct mosquitto *context, uint32_t events); -static sigset_t my_sigblock; static struct epoll_event ep_events[MAX_EVENTS]; int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_count) @@ -43,13 +42,6 @@ int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_ struct epoll_event ev; int i; - sigemptyset(&my_sigblock); - sigaddset(&my_sigblock, SIGINT); - sigaddset(&my_sigblock, SIGTERM); - sigaddset(&my_sigblock, SIGUSR1); - sigaddset(&my_sigblock, SIGUSR2); - sigaddset(&my_sigblock, SIGHUP); - memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS); db.epollfd = 0; @@ -145,15 +137,16 @@ int mux_epoll__handle(void) { int i; struct epoll_event ev; - sigset_t origsig; struct mosquitto *context; struct mosquitto__listener_sock *listensock; int event_count; memset(&ev, 0, sizeof(struct epoll_event)); - sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); +#if defined(WITH_WEBSOCKETS) event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, 100); - sigprocmask(SIG_SETMASK, &origsig, NULL); +#else + event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, db.next_event_ms); +#endif db.now_s = mosquitto_time(); db.now_real_s = time(NULL); diff --git a/src/mux_kqueue.c b/src/mux_kqueue.c index 46e3ea14..410aa17e 100644 --- a/src/mux_kqueue.c +++ b/src/mux_kqueue.c @@ -32,8 +32,6 @@ Contributors: static void loop_handle_reads_writes(struct mosquitto *context, short events); -static sigset_t my_sigblock; - static struct kevent event_list[MAX_EVENTS]; int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock_count) @@ -41,13 +39,6 @@ int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock struct kevent ev; int i; - sigemptyset(&my_sigblock); - sigaddset(&my_sigblock, SIGINT); - sigaddset(&my_sigblock, SIGTERM); - sigaddset(&my_sigblock, SIGUSR1); - sigaddset(&my_sigblock, SIGUSR2); - sigaddset(&my_sigblock, SIGHUP); - memset(&event_list, 0, sizeof(struct kevent)*MAX_EVENTS); db.kqueuefd = 0; @@ -138,19 +129,24 @@ int mux_kqueue__delete(struct mosquitto *context) int mux_kqueue__handle(void) { int i; - sigset_t origsig; struct mosquitto *context; struct mosquitto__listener_sock *listensock; int event_count; - struct timespec timeout = {0, 100000000}; /* 100 ms */ + struct timespec timeout; + +#ifdef WITH_WEBSOCKETS + timeout.tv_sec = 0; + timeout.tv_nsec = 100000000; /* 100 ms */ +#else + timeout.tv_sec = db.next_event_ms/1000; + timeout.tv_nsec = (db.next_event_ms - timeout.tv_sec*100) * 1000000; +#endif memset(&event_list, 0, sizeof(event_list)); - sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); event_count = kevent(db.kqueuefd, NULL, 0, event_list, MAX_EVENTS, &timeout); - sigprocmask(SIG_SETMASK, &origsig, NULL); db.now_s = mosquitto_time(); db.now_real_s = time(NULL); diff --git a/src/mux_poll.c b/src/mux_poll.c index c9c9c827..9dae7b67 100644 --- a/src/mux_poll.c +++ b/src/mux_poll.c @@ -60,24 +60,12 @@ static void loop_handle_reads_writes(void); static struct pollfd *pollfds = NULL; static size_t pollfd_max, pollfd_current_max; -#ifndef WIN32 -static sigset_t my_sigblock; -#endif int mux_poll__init(struct mosquitto__listener_sock *listensock, int listensock_count) { size_t i; size_t pollfd_index = 0; -#ifndef WIN32 - sigemptyset(&my_sigblock); - sigaddset(&my_sigblock, SIGINT); - sigaddset(&my_sigblock, SIGTERM); - sigaddset(&my_sigblock, SIGUSR1); - sigaddset(&my_sigblock, SIGUSR2); - sigaddset(&my_sigblock, SIGHUP); -#endif - #ifdef WIN32 pollfd_max = (size_t)_getmaxstdio(); #else @@ -195,13 +183,18 @@ int mux_poll__handle(struct mosquitto__listener_sock *listensock, int listensock #ifndef WIN32 sigset_t origsig; #endif + int timeout; + +#ifdef WITH_WEBSOCKETS + timeout = 100; +#else + timeout = db.next_event_ms; +#endif #ifndef WIN32 - sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); - fdcount = poll(pollfds, pollfd_current_max+1, 100); - sigprocmask(SIG_SETMASK, &origsig, NULL); + fdcount = poll(pollfds, pollfd_current_max+1, timeout); #else - fdcount = WSAPoll(pollfds, pollfd_current_max+1, 100); + fdcount = WSAPoll(pollfds, pollfd_current_max+1, timeout); #endif db.now_s = mosquitto_time(); diff --git a/src/plugin.c b/src/plugin.c index d15964d6..19011810 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -219,11 +219,14 @@ static void plugin__handle_tick_single(struct mosquitto__security_options *opts) struct mosquitto_evt_tick event_data; struct mosquitto__callback *cb_base; - /* FIXME - set now_s and now_ns to avoid need for multiple time lookups */ memset(&event_data, 0, sizeof(event_data)); DL_FOREACH(opts->plugin_callbacks.tick, cb_base){ + mosquitto_time_ns(&event_data.now_s, &event_data.now_ns); + event_data.next_s = 0; + event_data.next_ms = 0; cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata); + loop__update_next_event(event_data.next_s * 1000 + event_data.next_ms); } } diff --git a/src/session_expiry.c b/src/session_expiry.c index 1d2beb57..43d20b60 100644 --- a/src/session_expiry.c +++ b/src/session_expiry.c @@ -113,8 +113,20 @@ void session_expiry__check(void) { struct session_expiry_list *item, *tmp; struct mosquitto *context; - - if(db.now_real_s <= last_check) return; + time_t timeout; + + if(db.now_real_s <= last_check){ + if(expiry_list){ + /* Next event is the first item of the list, we must set the timeout even if we aren't + * checking the full list */ + timeout = (expiry_list->context->session_expiry_time - db.now_real_s) * 1000; + if(timeout <= 0){ + timeout = 100; + } + loop__update_next_event(timeout); + } + return; + } last_check = db.now_real_s; @@ -137,6 +149,11 @@ void session_expiry__check(void) context__send_will(context); context__add_to_disused(context); }else{ + timeout = (item->context->session_expiry_time - db.now_real_s + 1) * 1000; + if(timeout <= 0){ + timeout = 100; + } + loop__update_next_event(timeout); return; } } diff --git a/src/sys_tree.c b/src/sys_tree.c index 9a5d8f4e..a946461d 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -46,6 +46,7 @@ unsigned int g_socket_connections = 0; unsigned int g_connection_count = 0; static time_t start_time = 0; +static time_t last_update = 0; void sys_tree__init(void) @@ -62,6 +63,7 @@ void sys_tree__init(void) db__messages_easy_queue(NULL, "$SYS/broker/version", SYS_TREE_QOS, len, buf, 1, 0, NULL); start_time = mosquitto_time(); + last_update = start_time; } static void sys_tree__update_clients(char *buf) @@ -159,7 +161,6 @@ static void calc_load(char *buf, const char *topic, bool initial, double exponen */ void sys_tree__update(void) { - static time_t last_update = 0; time_t uptime; char buf[BUFLEN]; @@ -219,9 +220,18 @@ void sys_tree__update(void) double i_mult; uint32_t len; bool initial_publish; + time_t next_event; + + if(db.config->sys_interval){ + next_event = last_update - db.now_s + db.config->sys_interval; + if(next_event <= 0){ + next_event = db.config->sys_interval; + } + loop__update_next_event(next_event*1000); + } if(db.config->sys_interval - && db.now_s - db.config->sys_interval > last_update){ + && db.now_s - db.config->sys_interval >= last_update){ uptime = db.now_s - start_time; len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime); diff --git a/test/broker/c/auth_plugin_delayed.c b/test/broker/c/auth_plugin_delayed.c index 0a81dfea..05467a7d 100644 --- a/test/broker/c/auth_plugin_delayed.c +++ b/test/broker/c/auth_plugin_delayed.c @@ -51,6 +51,8 @@ int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, i static int tick_callback(int event, void *event_data, void *user_data) { + struct mosquitto_evt_tick *ed = event_data; + if(auth_delay == 0){ if(client_id && username && password && !strcmp(username, "delayed-username") && !strcmp(password, "good")){ @@ -69,6 +71,9 @@ static int tick_callback(int event, void *event_data, void *user_data) auth_delay--; } + /* fast turn around for quick testing */ + ed->next_ms = 100; + return MOSQ_ERR_SUCCESS; }