Improve idle / mux timeout performance.

pull/2386/head
Roger A. Light 4 years ago
parent 88dfac8e88
commit 0f2f77c945

@ -64,6 +64,9 @@ Broker:
versions are allowed for a particular listener. versions are allowed for a particular listener.
- Add `disable_client_cert_date_checks` option to allow expired client - Add `disable_client_cert_date_checks` option to allow expired client
certificate to be considered valid. certificate to be considered valid.
- Improve idle performance. The broker now calculates when the next event of
interest is, and uses that as the timeout for e.g. epoll_wait(). This can
reduce the number of process wakeups by 100x on an idle broker.
Client library: Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair

@ -158,7 +158,7 @@ struct mosquitto_evt_message {
struct mosquitto_evt_tick { struct mosquitto_evt_tick {
void *future; void *future;
long now_ns; long now_ns;
long next_ns; long next_ms;
time_t now_s; time_t now_s;
time_t next_s; time_t next_s;
void *future2[4]; void *future2[4];

@ -63,3 +63,26 @@ time_t mosquitto_time(void)
#endif #endif
} }
void mosquitto_time_ns(time_t *s, long *ns)
{
#ifdef WIN32
SYSTEMTIME st;
GetLocalTime(&st);
*s = st.wSecond
*ns = st.wMilliseconds*1000000L;
gettimeofday(&tv, NULL);
srand((unsigned int)(tv.tv_sec + tv.tv_usec));
#elif _POSIX_TIMERS>0 && defined(_POSIX_MONOTONIC_CLOCK)
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
*s = tp.tv_sec;
*ns = tp.tv_nsec;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
*s = tv.tv_sec;
*ns = tv.tv_usec * 1000;
#endif
}

@ -20,5 +20,6 @@ Contributors:
#define TIME_MOSQ_H #define TIME_MOSQ_H
time_t mosquitto_time(void); time_t mosquitto_time(void);
void mosquitto_time_ns(time_t *s, long *ns);
#endif #endif

@ -107,16 +107,16 @@ static int basic_auth_callback(int event, void *event_data, void *userdata)
static int tick_callback(int event, void *event_data, void *userdata) static int tick_callback(int event, void *event_data, void *userdata)
{ {
struct mosquitto_evt_tick *ed = event_data;
struct client_list *client, *client_tmp; struct client_list *client, *client_tmp;
time_t now; time_t now;
long r; long r;
UNUSED(event); UNUSED(event);
UNUSED(event_data);
UNUSED(userdata); UNUSED(userdata);
now = time(NULL); now = time(NULL);
if(now > last_check){ if(now >= last_check){
HASH_ITER(hh, clients, client, client_tmp){ HASH_ITER(hh, clients, client, client_tmp){
if(authentication_check(client, now)){ if(authentication_check(client, now)){
/* Deny access 1/4 of the time, yes it's biased number generation. */ /* Deny access 1/4 of the time, yes it's biased number generation. */
@ -134,6 +134,9 @@ static int tick_callback(int event, void *event_data, void *userdata)
} }
last_check = now; last_check = now;
} }
/* Declare that we want another call in at most 1 second */
ed->next_s = 1;
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }

@ -383,6 +383,7 @@ int bridge__connect_step3(struct mosquitto *context)
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = db.now_s + 5; context->bridge->primary_retry = db.now_s + 5;
loop__update_next_event(5000);
} }
if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN; if (bridge__set_tcp_keepalive(context) != MOSQ_ERR_SUCCESS) return MOSQ_ERR_UNKNOWN;
@ -928,7 +929,7 @@ void bridge_check(void)
if(context->bridge->round_robin == false if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0 && context->bridge->cur_address != 0
&& context->bridge->primary_retry && context->bridge->primary_retry
&& db.now_s > context->bridge->primary_retry){ && db.now_s >= context->bridge->primary_retry){
if(context->bridge->primary_retry_sock == INVALID_SOCKET){ if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context->bridge->addresses[0].address, rc = net__try_connect(context->bridge->addresses[0].address,
@ -958,11 +959,13 @@ void bridge_check(void)
COMPAT_CLOSE(context->bridge->primary_retry_sock); COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = db.now_s+5; context->bridge->primary_retry = db.now_s+5;
loop__update_next_event(5000);
} }
}else{ }else{
COMPAT_CLOSE(context->bridge->primary_retry_sock); COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = db.now_s+5; context->bridge->primary_retry = db.now_s+5;
loop__update_next_event(5000);
} }
} }
} }
@ -978,9 +981,10 @@ void bridge_check(void)
if(context->bridge->cur_address == context->bridge->address_count){ if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0; context->bridge->cur_address = 0;
} }
loop__update_next_event(context->bridge->restart_timeout*1000);
}else{ }else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && db.now_s > context->bridge->restart_t)){ || (context->bridge->start_type == bst_automatic && db.now_s >= context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS) #if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){ if(context->adns){
@ -1034,6 +1038,7 @@ void bridge_check(void)
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){ if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING){
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = db.now_s + 5; context->bridge->primary_retry = db.now_s + 5;
loop__update_next_event(5000);
} }
mux__new(context); mux__new(context);
if(context->out_packet){ if(context->out_packet){

@ -36,8 +36,16 @@ int keepalive__add(struct mosquitto *context)
void keepalive__check(void) void keepalive__check(void)
{ {
struct mosquitto *context, *ctxt_tmp; struct mosquitto *context, *ctxt_tmp;
time_t timeout;
if(last_keepalive_check + 5 < db.now_s){ if(db.contexts_by_sock){
timeout = (last_keepalive_check + 5 - db.now_s);
if(timeout <= 0){
timeout = 5;
}
loop__update_next_event(timeout*1000);
}
if(last_keepalive_check + 5 <= db.now_s){
last_keepalive_check = db.now_s; last_keepalive_check = db.now_s;
/* FIXME - this needs replacing with something more efficient */ /* FIXME - this needs replacing with something more efficient */

@ -161,6 +161,14 @@ static void queue_plugin_msgs(void)
} }
void loop__update_next_event(time_t new_ms)
{
if(new_ms > 0 && new_ms < db.next_event_ms){
db.next_event_ms = (int)new_ms;
}
}
int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count) int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count)
{ {
#ifdef WITH_PERSISTENCE #ifdef WITH_PERSISTENCE
@ -187,6 +195,8 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
while(g_run){ while(g_run){
queue_plugin_msgs(); queue_plugin_msgs();
context__free_disused(); context__free_disused();
db.next_event_ms = 86400000;
#ifdef WITH_SYS_TREE #ifdef WITH_SYS_TREE
if(db.config->sys_interval > 0){ if(db.config->sys_interval > 0){
sys_tree__update(); sys_tree__update();
@ -198,6 +208,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
bridge_check(); bridge_check();
#endif #endif
plugin__handle_tick();
rc = mux__handle(listensock, listensock_count); rc = mux__handle(listensock, listensock_count);
if(rc) return rc; if(rc) return rc;
@ -269,7 +280,6 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
} }
} }
#endif #endif
plugin__handle_tick();
} }
mux__cleanup(); mux__cleanup();

@ -471,6 +471,7 @@ struct mosquitto_db{
struct mosquitto_msg_store_load *msg_store_load; struct mosquitto_msg_store_load *msg_store_load;
time_t now_s; /* Monotonic clock, where possible */ time_t now_s; /* Monotonic clock, where possible */
time_t now_real_s; /* Read clock, for measuring session/message expiry */ time_t now_real_s; /* Read clock, for measuring session/message expiry */
int next_event_ms; /* for mux timeout */
int msg_store_count; int msg_store_count;
unsigned long msg_store_bytes; unsigned long msg_store_bytes;
char *config_file; char *config_file;
@ -622,6 +623,7 @@ extern struct mosquitto_db db;
* Main functions * Main functions
* ============================================================ */ * ============================================================ */
int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count); int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listensock_count);
void loop__update_next_event(time_t new_ms);
/* ============================================================ /* ============================================================
* Config functions * Config functions

@ -35,7 +35,6 @@ Contributors:
static void loop_handle_reads_writes(struct mosquitto *context, uint32_t events); static void loop_handle_reads_writes(struct mosquitto *context, uint32_t events);
static sigset_t my_sigblock;
static struct epoll_event ep_events[MAX_EVENTS]; static struct epoll_event ep_events[MAX_EVENTS];
int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_count) int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_count)
@ -43,13 +42,6 @@ int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_
struct epoll_event ev; struct epoll_event ev;
int i; int i;
sigemptyset(&my_sigblock);
sigaddset(&my_sigblock, SIGINT);
sigaddset(&my_sigblock, SIGTERM);
sigaddset(&my_sigblock, SIGUSR1);
sigaddset(&my_sigblock, SIGUSR2);
sigaddset(&my_sigblock, SIGHUP);
memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS); memset(&ep_events, 0, sizeof(struct epoll_event)*MAX_EVENTS);
db.epollfd = 0; db.epollfd = 0;
@ -145,15 +137,16 @@ int mux_epoll__handle(void)
{ {
int i; int i;
struct epoll_event ev; struct epoll_event ev;
sigset_t origsig;
struct mosquitto *context; struct mosquitto *context;
struct mosquitto__listener_sock *listensock; struct mosquitto__listener_sock *listensock;
int event_count; int event_count;
memset(&ev, 0, sizeof(struct epoll_event)); memset(&ev, 0, sizeof(struct epoll_event));
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); #if defined(WITH_WEBSOCKETS)
event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, 100); event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, 100);
sigprocmask(SIG_SETMASK, &origsig, NULL); #else
event_count = epoll_wait(db.epollfd, ep_events, MAX_EVENTS, db.next_event_ms);
#endif
db.now_s = mosquitto_time(); db.now_s = mosquitto_time();
db.now_real_s = time(NULL); db.now_real_s = time(NULL);

@ -32,8 +32,6 @@ Contributors:
static void loop_handle_reads_writes(struct mosquitto *context, short events); static void loop_handle_reads_writes(struct mosquitto *context, short events);
static sigset_t my_sigblock;
static struct kevent event_list[MAX_EVENTS]; static struct kevent event_list[MAX_EVENTS];
int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock_count) int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock_count)
@ -41,13 +39,6 @@ int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock
struct kevent ev; struct kevent ev;
int i; int i;
sigemptyset(&my_sigblock);
sigaddset(&my_sigblock, SIGINT);
sigaddset(&my_sigblock, SIGTERM);
sigaddset(&my_sigblock, SIGUSR1);
sigaddset(&my_sigblock, SIGUSR2);
sigaddset(&my_sigblock, SIGHUP);
memset(&event_list, 0, sizeof(struct kevent)*MAX_EVENTS); memset(&event_list, 0, sizeof(struct kevent)*MAX_EVENTS);
db.kqueuefd = 0; db.kqueuefd = 0;
@ -138,19 +129,24 @@ int mux_kqueue__delete(struct mosquitto *context)
int mux_kqueue__handle(void) int mux_kqueue__handle(void)
{ {
int i; int i;
sigset_t origsig;
struct mosquitto *context; struct mosquitto *context;
struct mosquitto__listener_sock *listensock; struct mosquitto__listener_sock *listensock;
int event_count; int event_count;
struct timespec timeout = {0, 100000000}; /* 100 ms */ struct timespec timeout;
#ifdef WITH_WEBSOCKETS
timeout.tv_sec = 0;
timeout.tv_nsec = 100000000; /* 100 ms */
#else
timeout.tv_sec = db.next_event_ms/1000;
timeout.tv_nsec = (db.next_event_ms - timeout.tv_sec*100) * 1000000;
#endif
memset(&event_list, 0, sizeof(event_list)); memset(&event_list, 0, sizeof(event_list));
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig);
event_count = kevent(db.kqueuefd, event_count = kevent(db.kqueuefd,
NULL, 0, NULL, 0,
event_list, MAX_EVENTS, event_list, MAX_EVENTS,
&timeout); &timeout);
sigprocmask(SIG_SETMASK, &origsig, NULL);
db.now_s = mosquitto_time(); db.now_s = mosquitto_time();
db.now_real_s = time(NULL); db.now_real_s = time(NULL);

@ -60,24 +60,12 @@ static void loop_handle_reads_writes(void);
static struct pollfd *pollfds = NULL; static struct pollfd *pollfds = NULL;
static size_t pollfd_max, pollfd_current_max; static size_t pollfd_max, pollfd_current_max;
#ifndef WIN32
static sigset_t my_sigblock;
#endif
int mux_poll__init(struct mosquitto__listener_sock *listensock, int listensock_count) int mux_poll__init(struct mosquitto__listener_sock *listensock, int listensock_count)
{ {
size_t i; size_t i;
size_t pollfd_index = 0; size_t pollfd_index = 0;
#ifndef WIN32
sigemptyset(&my_sigblock);
sigaddset(&my_sigblock, SIGINT);
sigaddset(&my_sigblock, SIGTERM);
sigaddset(&my_sigblock, SIGUSR1);
sigaddset(&my_sigblock, SIGUSR2);
sigaddset(&my_sigblock, SIGHUP);
#endif
#ifdef WIN32 #ifdef WIN32
pollfd_max = (size_t)_getmaxstdio(); pollfd_max = (size_t)_getmaxstdio();
#else #else
@ -195,13 +183,18 @@ int mux_poll__handle(struct mosquitto__listener_sock *listensock, int listensock
#ifndef WIN32 #ifndef WIN32
sigset_t origsig; sigset_t origsig;
#endif #endif
int timeout;
#ifdef WITH_WEBSOCKETS
timeout = 100;
#else
timeout = db.next_event_ms;
#endif
#ifndef WIN32 #ifndef WIN32
sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); fdcount = poll(pollfds, pollfd_current_max+1, timeout);
fdcount = poll(pollfds, pollfd_current_max+1, 100);
sigprocmask(SIG_SETMASK, &origsig, NULL);
#else #else
fdcount = WSAPoll(pollfds, pollfd_current_max+1, 100); fdcount = WSAPoll(pollfds, pollfd_current_max+1, timeout);
#endif #endif
db.now_s = mosquitto_time(); db.now_s = mosquitto_time();

@ -219,11 +219,14 @@ static void plugin__handle_tick_single(struct mosquitto__security_options *opts)
struct mosquitto_evt_tick event_data; struct mosquitto_evt_tick event_data;
struct mosquitto__callback *cb_base; struct mosquitto__callback *cb_base;
/* FIXME - set now_s and now_ns to avoid need for multiple time lookups */
memset(&event_data, 0, sizeof(event_data)); memset(&event_data, 0, sizeof(event_data));
DL_FOREACH(opts->plugin_callbacks.tick, cb_base){ DL_FOREACH(opts->plugin_callbacks.tick, cb_base){
mosquitto_time_ns(&event_data.now_s, &event_data.now_ns);
event_data.next_s = 0;
event_data.next_ms = 0;
cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata); cb_base->cb(MOSQ_EVT_TICK, &event_data, cb_base->userdata);
loop__update_next_event(event_data.next_s * 1000 + event_data.next_ms);
} }
} }

@ -113,8 +113,20 @@ void session_expiry__check(void)
{ {
struct session_expiry_list *item, *tmp; struct session_expiry_list *item, *tmp;
struct mosquitto *context; struct mosquitto *context;
time_t timeout;
if(db.now_real_s <= last_check) return;
if(db.now_real_s <= last_check){
if(expiry_list){
/* Next event is the first item of the list, we must set the timeout even if we aren't
* checking the full list */
timeout = (expiry_list->context->session_expiry_time - db.now_real_s) * 1000;
if(timeout <= 0){
timeout = 100;
}
loop__update_next_event(timeout);
}
return;
}
last_check = db.now_real_s; last_check = db.now_real_s;
@ -137,6 +149,11 @@ void session_expiry__check(void)
context__send_will(context); context__send_will(context);
context__add_to_disused(context); context__add_to_disused(context);
}else{ }else{
timeout = (item->context->session_expiry_time - db.now_real_s + 1) * 1000;
if(timeout <= 0){
timeout = 100;
}
loop__update_next_event(timeout);
return; return;
} }
} }

@ -46,6 +46,7 @@ unsigned int g_socket_connections = 0;
unsigned int g_connection_count = 0; unsigned int g_connection_count = 0;
static time_t start_time = 0; static time_t start_time = 0;
static time_t last_update = 0;
void sys_tree__init(void) void sys_tree__init(void)
@ -62,6 +63,7 @@ void sys_tree__init(void)
db__messages_easy_queue(NULL, "$SYS/broker/version", SYS_TREE_QOS, len, buf, 1, 0, NULL); db__messages_easy_queue(NULL, "$SYS/broker/version", SYS_TREE_QOS, len, buf, 1, 0, NULL);
start_time = mosquitto_time(); start_time = mosquitto_time();
last_update = start_time;
} }
static void sys_tree__update_clients(char *buf) static void sys_tree__update_clients(char *buf)
@ -159,7 +161,6 @@ static void calc_load(char *buf, const char *topic, bool initial, double exponen
*/ */
void sys_tree__update(void) void sys_tree__update(void)
{ {
static time_t last_update = 0;
time_t uptime; time_t uptime;
char buf[BUFLEN]; char buf[BUFLEN];
@ -219,9 +220,18 @@ void sys_tree__update(void)
double i_mult; double i_mult;
uint32_t len; uint32_t len;
bool initial_publish; bool initial_publish;
time_t next_event;
if(db.config->sys_interval){
next_event = last_update - db.now_s + db.config->sys_interval;
if(next_event <= 0){
next_event = db.config->sys_interval;
}
loop__update_next_event(next_event*1000);
}
if(db.config->sys_interval if(db.config->sys_interval
&& db.now_s - db.config->sys_interval > last_update){ && db.now_s - db.config->sys_interval >= last_update){
uptime = db.now_s - start_time; uptime = db.now_s - start_time;
len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime); len = (uint32_t)snprintf(buf, BUFLEN, "%d seconds", (int)uptime);

@ -51,6 +51,8 @@ int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *auth_opts, i
static int tick_callback(int event, void *event_data, void *user_data) static int tick_callback(int event, void *event_data, void *user_data)
{ {
struct mosquitto_evt_tick *ed = event_data;
if(auth_delay == 0){ if(auth_delay == 0){
if(client_id && username && password if(client_id && username && password
&& !strcmp(username, "delayed-username") && !strcmp(password, "good")){ && !strcmp(username, "delayed-username") && !strcmp(password, "good")){
@ -69,6 +71,9 @@ static int tick_callback(int event, void *event_data, void *user_data)
auth_delay--; auth_delay--;
} }
/* fast turn around for quick testing */
ed->next_ms = 100;
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }

Loading…
Cancel
Save