diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index f1a5bb1f..be0fda70 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -205,6 +205,10 @@ struct mosquitto_msg_data{ struct mosquitto { +#if defined(WITH_BROKER) && defined(WITH_EPOLL) + /* This *must* be the first element in the struct. */ + int ident; +#endif mosq_sock_t sock; #ifndef WITH_BROKER mosq_sock_t sockpairR, sockpairW; diff --git a/src/context.c b/src/context.c index 4159b531..e7d53ef1 100644 --- a/src/context.c +++ b/src/context.c @@ -38,6 +38,9 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context = mosquitto__calloc(1, sizeof(struct mosquitto)); if(!context) return NULL; +#ifdef WITH_EPOLL + context->ident = id_client; +#endif context->pollfd_index = -1; mosquitto__set_state(context, mosq_cs_new); context->sock = sock; diff --git a/src/loop.c b/src/loop.c index 23be05b6..a975f4e6 100644 --- a/src/loop.c +++ b/src/loop.c @@ -125,7 +125,7 @@ void queue_plugin_msgs(struct mosquitto_db *db) } -int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +int mosquitto_main_loop(struct mosquitto_db *db, struct mosquitto__listener_sock *listensock, int listensock_count) { #ifdef WITH_SYS_TREE time_t start_time = mosquitto_time(); diff --git a/src/mosquitto.c b/src/mosquitto.c index 70a3fe3a..582b6a98 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -205,16 +205,16 @@ void listeners__reload_all_certificates(struct mosquitto_db *db) } -int listeners__start_single_mqtt(struct mosquitto_db *db, mosq_sock_t **listensock, int *listensock_count, int *listensock_index, struct mosquitto__listener *listener) +int listeners__start_single_mqtt(struct mosquitto_db *db, struct mosquitto__listener_sock **listensock, int *listensock_count, int *listensock_index, struct mosquitto__listener *listener) { int i; - mosq_sock_t *listensock_new; + struct mosquitto__listener_sock *listensock_new; if(net__socket_listen(listener)){ return 1; } (*listensock_count) += listener->sock_count; - listensock_new = mosquitto__realloc(*listensock, sizeof(mosq_sock_t)*(size_t)(*listensock_count)); + listensock_new = mosquitto__realloc(*listensock, sizeof(struct mosquitto__listener_sock)*(size_t)(*listensock_count)); if(!listensock_new){ return 1; } @@ -224,47 +224,53 @@ int listeners__start_single_mqtt(struct mosquitto_db *db, mosq_sock_t **listenso if(listener->socks[i] == INVALID_SOCKET){ return 1; } - (*listensock)[*listensock_index] = listener->socks[i]; + (*listensock)[*listensock_index].sock = listener->socks[i]; + (*listensock)[*listensock_index].listener = listener; +#ifdef WITH_EPOLL + (*listensock)[*listensock_index].ident = id_listener; +#endif (*listensock_index)++; } return MOSQ_ERR_SUCCESS; } -int listeners__add_local(struct mosquitto_db *db, mosq_sock_t **listensock, int *listensock_count, int *listensock_index, const char *host, uint16_t port) +int listeners__add_local(struct mosquitto_db *db, struct mosquitto__listener_sock **listensock, int *listensock_count, int *listensock_index, const char *host, uint16_t port) { struct mosquitto__listener *listeners; + listeners = db->config->listeners; - listeners = mosquitto__realloc(db->config->listeners, (size_t)(db->config->listener_count+1)*sizeof(struct mosquitto__listener)); - if(listeners == NULL){ - return MOSQ_ERR_NOMEM; - } - db->config->listener_count++; - db->config->listeners = listeners; - memset(&listeners[db->config->listener_count-1], 0, sizeof(struct mosquitto__listener)); - - listener__set_defaults(&listeners[db->config->listener_count-1]); - listeners[db->config->listener_count-1].security_options.allow_anonymous = true; - listeners[db->config->listener_count-1].port = port; - listeners[db->config->listener_count-1].host = mosquitto__strdup(host); - if(listeners[db->config->listener_count-1].host == NULL){ + listener__set_defaults(&listeners[db->config->listener_count]); + listeners[db->config->listener_count].security_options.allow_anonymous = true; + listeners[db->config->listener_count].port = port; + listeners[db->config->listener_count].host = mosquitto__strdup(host); + if(listeners[db->config->listener_count].host == NULL){ return MOSQ_ERR_NOMEM; } - if(listeners__start_single_mqtt(db, listensock, listensock_count, listensock_index, &listeners[db->config->listener_count-1])){ + if(listeners__start_single_mqtt(db, listensock, listensock_count, listensock_index, &listeners[db->config->listener_count])){ mosquitto__free(listeners[db->config->listener_count-1].host); - listeners[db->config->listener_count-1].host = NULL; - db->config->listener_count--; + listeners[db->config->listener_count].host = NULL; return MOSQ_ERR_UNKNOWN; } + db->config->listener_count++; return MOSQ_ERR_SUCCESS; } -int listeners__start_local_only(struct mosquitto_db *db, mosq_sock_t **listensock, int *listensock_count) +int listeners__start_local_only(struct mosquitto_db *db, struct mosquitto__listener_sock **listensock, int *listensock_count) { /* Attempt to open listeners bound to 127.0.0.1 and ::1 only */ int i; int listensock_index = 0; int rc; + struct mosquitto__listener *listeners; + + listeners = mosquitto__realloc(db->config->listeners, 2*sizeof(struct mosquitto__listener)); + if(listeners == NULL){ + return MOSQ_ERR_NOMEM; + } + memset(listeners, 0, 2*sizeof(struct mosquitto__listener)); + db->config->listener_count = 0; + db->config->listeners = listeners; log__printf(NULL, MOSQ_LOG_WARNING, "Starting in local only mode. Connections will only be possible from clients running on this machine."); log__printf(NULL, MOSQ_LOG_WARNING, "Create a configuration file which defines a listener to allow remote access."); @@ -290,7 +296,7 @@ int listeners__start_local_only(struct mosquitto_db *db, mosq_sock_t **listensoc } -int listeners__start(struct mosquitto_db *db, mosq_sock_t **listensock, int *listensock_count) +int listeners__start(struct mosquitto_db *db, struct mosquitto__listener_sock **listensock, int *listensock_count) { int i; int listensock_index = 0; @@ -336,7 +342,7 @@ int listeners__start(struct mosquitto_db *db, mosq_sock_t **listensock, int *lis } -void listeners__stop(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +void listeners__stop(struct mosquitto_db *db, struct mosquitto__listener_sock *listensock, int listensock_count) { int i; @@ -355,8 +361,8 @@ void listeners__stop(struct mosquitto_db *db, mosq_sock_t *listensock, int liste } for(i=0; iepollfd, EPOLL_CTL_ADD, listensock[i], &ev) == -1) { + if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, listensock[i].sock, &ev) == -1) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll initial registering: %s", strerror(errno)); (void)close(db->epollfd); db->epollfd = 0; @@ -115,7 +115,7 @@ int mux_epoll__add_out(struct mosquitto_db *db, struct mosquitto *context) memset(&ev, 0, sizeof(struct epoll_event)); if(!(context->events & EPOLLOUT)) { - ev.data.fd = context->sock; + ev.data.ptr = context; ev.events = EPOLLIN | EPOLLOUT; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { @@ -134,7 +134,7 @@ int mux_epoll__remove_out(struct mosquitto_db *db, struct mosquitto *context) memset(&ev, 0, sizeof(struct epoll_event)); if(context->events & EPOLLOUT) { - ev.data.fd = context->sock; + ev.data.ptr = context; ev.events = EPOLLIN; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { @@ -153,7 +153,7 @@ int mux_epoll__add_in(struct mosquitto_db *db, struct mosquitto *context) memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; - ev.data.fd = context->sock; + ev.data.ptr = context; if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno)); } @@ -176,21 +176,22 @@ int mux_epoll__delete(struct mosquitto_db *db, struct mosquitto *context) } -int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +int mux_epoll__handle(struct mosquitto_db *db) { int i; - int j; struct epoll_event ev; sigset_t origsig; struct mosquitto *context; - int fdcount; + struct mosquitto__listener_sock *listensock; + int event_count; + int sock; memset(&ev, 0, sizeof(struct epoll_event)); sigprocmask(SIG_SETMASK, &my_sigblock, &origsig); - fdcount = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100); + event_count = epoll_wait(db->epollfd, ep_events, MAX_EVENTS, 100); sigprocmask(SIG_SETMASK, &origsig, NULL); - switch(fdcount){ + switch(event_count){ case -1: if(errno != EINTR){ log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno)); @@ -199,26 +200,25 @@ int mux_epoll__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int list case 0: break; default: - for(i=0; icontexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context); - if(!context) { - log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); - } - context->events = EPOLLIN; - mux__add_in(db, context); + for(i=0; iident == id_client){ + loop_handle_reads_writes(db, context, ep_events[i].events); + }else if(context->ident == id_listener){ + listensock = ep_events[i].data.ptr; + + if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){ + while((sock = net__socket_accept(db, listensock)) != -1){ + context = NULL; + HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); + if(!context) { + log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); } + context->events = EPOLLIN; + mux__add_in(db, context); } - break; } } - if (j == listensock_count) { - loop_handle_reads_writes(db, ep_events[i].data.fd, ep_events[i].events); - } } } return MOSQ_ERR_SUCCESS; @@ -233,99 +233,74 @@ int mux_epoll__cleanup(struct mosquitto_db *db) } -static void loop_handle_reads_writes(struct mosquitto_db *db, mosq_sock_t sock, uint32_t events) +static void loop_handle_reads_writes(struct mosquitto_db *db, struct mosquitto *context, uint32_t events) { - struct mosquitto *context; int err; socklen_t len; int rc; - int i; - - context = NULL; - HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); - if(!context) { - return; - } - for (i=0;i<1;i++) { #ifdef WITH_WEBSOCKETS - if(context->wsi){ - struct lws_pollfd wspoll; - wspoll.fd = context->sock; - wspoll.events = (int16_t)context->events; - wspoll.revents = (int16_t)events; + if(context->wsi){ + struct lws_pollfd wspoll; + wspoll.fd = context->sock; + wspoll.events = (int16_t)context->events; + wspoll.revents = (int16_t)events; #ifdef LWS_LIBRARY_VERSION_NUMBER - lws_service_fd(lws_get_context(context->wsi), &wspoll); + lws_service_fd(lws_get_context(context->wsi), &wspoll); #else - lws_service_fd(context->ws_context, &wspoll); + lws_service_fd(context->ws_context, &wspoll); #endif - continue; - } + return; + } #endif + if(events & EPOLLOUT #ifdef WITH_TLS - if(events & EPOLLOUT || - context->want_write || - (context->ssl && context->state == mosq_cs_new)){ -#else - if(events & EPOLLOUT){ + || context->want_write + || (context->ssl && context->state == mosq_cs_new) #endif - if(context->state == mosq_cs_connect_pending){ - len = sizeof(int); - if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ - if(err == 0){ - mosquitto__set_state(context, mosq_cs_new); + ){ + + if(context->state == mosq_cs_connect_pending){ + len = sizeof(int); + if(!getsockopt(context->sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if(err == 0){ + mosquitto__set_state(context, mosq_cs_new); #if defined(WITH_ADNS) && defined(WITH_BRIDGE) - if(context->bridge){ - bridge__connect_step3(db, context); - continue; - } -#endif + if(context->bridge){ + bridge__connect_step3(db, context); } - }else{ - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); - continue; +#endif } - } - rc = packet__write(context); - if(rc){ - do_disconnect(db, context, rc); - continue; + }else{ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + return; } } - } - - context = NULL; - HASH_FIND(hh_sock, db->contexts_by_sock, &sock, sizeof(mosq_sock_t), context); - if(!context) { - return; - } - for (i=0;i<1;i++) { -#ifdef WITH_WEBSOCKETS - if(context->wsi){ - // Websocket are already handled above - continue; + rc = packet__write(context); + if(rc){ + do_disconnect(db, context, rc); + return; } -#endif + } + if(events & EPOLLIN #ifdef WITH_TLS - if(events & EPOLLIN || - (context->ssl && context->state == mosq_cs_new)){ -#else - if(events & EPOLLIN){ + || (context->ssl && context->state == mosq_cs_new) #endif - do{ - rc = packet__read(db, context); - if(rc){ - do_disconnect(db, context, rc); - continue; - } - }while(SSL_DATA_PENDING(context)); - }else{ - if(events & (EPOLLERR | EPOLLHUP)){ - do_disconnect(db, context, MOSQ_ERR_CONN_LOST); - continue; + ){ + + do{ + rc = packet__read(db, context); + if(rc){ + do_disconnect(db, context, rc); + return; } + }while(SSL_DATA_PENDING(context)); + }else{ + if(events & (EPOLLERR | EPOLLHUP)){ + do_disconnect(db, context, MOSQ_ERR_CONN_LOST); + return; } } } diff --git a/src/mux_poll.c b/src/mux_poll.c index cf0ff4f6..e2e66bdb 100644 --- a/src/mux_poll.c +++ b/src/mux_poll.c @@ -63,7 +63,7 @@ static size_t pollfd_max; static sigset_t my_sigblock; #endif -int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) +int mux_poll__init(struct mosquitto_db *db, struct mosquitto__listener_sock *listensock, int listensock_count) { int i; int pollfd_index = 0; @@ -91,7 +91,7 @@ int mux_poll__init(struct mosquitto_db *db, mosq_sock_t *listensock, int listens memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); for(i=0; icontexts_by_sock, &sock, sizeof(mosq_sock_t), context); if(!context) { diff --git a/src/net.c b/src/net.c index 02bc9064..7f3f5544 100644 --- a/src/net.c +++ b/src/net.c @@ -105,10 +105,8 @@ static void net__print_error(unsigned int log, const char *format_str) } -int net__socket_accept(struct mosquitto_db *db, mosq_sock_t listensock) +int net__socket_accept(struct mosquitto_db *db, struct mosquitto__listener_sock *listensock) { - int i; - int j; mosq_sock_t new_sock = INVALID_SOCKET; struct mosquitto *new_context; #ifdef WITH_TLS @@ -122,7 +120,7 @@ int net__socket_accept(struct mosquitto_db *db, mosq_sock_t listensock) char address[1024]; #endif - new_sock = accept(listensock, NULL, 0); + new_sock = accept(listensock->sock, NULL, 0); if(new_sock == INVALID_SOCKET){ #ifdef WIN32 errno = WSAGetLastError(); @@ -138,7 +136,7 @@ int net__socket_accept(struct mosquitto_db *db, mosq_sock_t listensock) * but there are lots of reasons why this would be tricky (TLS * being the big one). */ COMPAT_CLOSE(spare_sock); - new_sock = accept(listensock, NULL, 0); + new_sock = accept(listensock->sock, NULL, 0); if(new_sock != INVALID_SOCKET){ COMPAT_CLOSE(new_sock); } @@ -187,19 +185,12 @@ int net__socket_accept(struct mosquitto_db *db, mosq_sock_t listensock) COMPAT_CLOSE(new_sock); return -1; } - for(i=0; iconfig->listener_count; i++){ - for(j=0; jconfig->listeners[i].sock_count; j++){ - if(db->config->listeners[i].socks[j] == listensock){ - new_context->listener = &db->config->listeners[i]; - new_context->listener->client_count++; - break; - } - } - } + new_context->listener = listensock->listener; if(!new_context->listener){ context__cleanup(db, new_context, true); return -1; } + new_context->listener->client_count++; if(new_context->listener->max_connections > 0 && new_context->listener->client_count > new_context->listener->max_connections){ if(db->config->connection_messages == true){