diff --git a/src/bridge.c b/src/bridge.c index 703add5c..fdeb012c 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -646,7 +646,7 @@ int bridge__register_local_connections(void) HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ if(context->bridge){ - if(mux__add_in(context)){ + if(mux__new(context)){ log__printf(NULL, MOSQ_LOG_ERR, "Error in initial bridge registration: %s", strerror(errno)); return MOSQ_ERR_UNKNOWN; } @@ -971,12 +971,12 @@ void bridge_check(void) }else if(rc == 0){ rc = bridge__connect_step2(context); if(rc == MOSQ_ERR_SUCCESS){ - mux__add_in(context); + mux__new(context); if(context->out_packet){ mux__add_out(context); } }else if(rc == MOSQ_ERR_CONN_PENDING){ - mux__add_in(context); + mux__new(context); mux__add_out(context); context->bridge->restart_t = 0; }else{ @@ -1015,7 +1015,7 @@ void bridge_check(void) if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = db.now_s + 5; } - mux__add_in(context); + mux__new(context); if(context->out_packet){ mux__add_out(context); } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 026ad7e6..f8c179c0 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -757,9 +757,9 @@ int bridge__remap_topic_in(struct mosquitto *context, char **topic); * ============================================================ */ int mux__init(struct mosquitto__listener_sock *listensock, int listensock_count); int mux__loop_prepare(void); +int mux__new(struct mosquitto *context); int mux__add_out(struct mosquitto *context); int mux__remove_out(struct mosquitto *context); -int mux__add_in(struct mosquitto *context); int mux__delete(struct mosquitto *context); int mux__wait(void); int mux__handle(struct mosquitto__listener_sock *listensock, int listensock_count); diff --git a/src/mux.c b/src/mux.c index 0f7354ef..9e863203 100644 --- a/src/mux.c +++ b/src/mux.c @@ -54,14 +54,14 @@ int mux__remove_out(struct mosquitto *context) } -int mux__add_in(struct mosquitto *context) +int mux__new(struct mosquitto *context) { #ifdef WITH_EPOLL - return mux_epoll__add_in(context); + return mux_epoll__new(context); #elif defined(WITH_KQUEUE) - return mux_kqueue__add_in(context); + return mux_kqueue__new(context); #else - return mux_poll__add_in(context); + return mux_poll__new(context); #endif } diff --git a/src/mux.h b/src/mux.h index 9384c839..3f0e651d 100644 --- a/src/mux.h +++ b/src/mux.h @@ -22,25 +22,25 @@ Contributors: #include "mosquitto_broker_internal.h" int mux_epoll__init(struct mosquitto__listener_sock *listensock, int listensock_count); +int mux_epoll__new(struct mosquitto *context); int mux_epoll__add_out(struct mosquitto *context); int mux_epoll__remove_out(struct mosquitto *context); -int mux_epoll__add_in(struct mosquitto *context); int mux_epoll__delete(struct mosquitto *context); int mux_epoll__handle(void); int mux_epoll__cleanup(void); int mux_kqueue__init(struct mosquitto__listener_sock *listensock, int listensock_count); +int mux_kqueue__new(struct mosquitto *context); int mux_kqueue__add_out(struct mosquitto *context); int mux_kqueue__remove_out(struct mosquitto *context); -int mux_kqueue__add_in(struct mosquitto *context); int mux_kqueue__delete(struct mosquitto *context); int mux_kqueue__handle(void); int mux_kqueue__cleanup(void); int mux_poll__init(struct mosquitto__listener_sock *listensock, int listensock_count); +int mux_poll__new(struct mosquitto *context); int mux_poll__add_out(struct mosquitto *context); int mux_poll__remove_out(struct mosquitto *context); -int mux_poll__add_in(struct mosquitto *context); int mux_poll__delete(struct mosquitto *context); int mux_poll__handle(struct mosquitto__listener_sock *listensock, int listensock_count); int mux_poll__cleanup(void); diff --git a/src/mux_epoll.c b/src/mux_epoll.c index 8aeda43b..118f0256 100644 --- a/src/mux_epoll.c +++ b/src/mux_epoll.c @@ -110,7 +110,7 @@ int mux_epoll__remove_out(struct mosquitto *context) } -int mux_epoll__add_in(struct mosquitto *context) +int mux_epoll__new(struct mosquitto *context) { struct epoll_event ev; @@ -176,8 +176,6 @@ int mux_epoll__handle(void) if (ep_events[i].events & (EPOLLIN | EPOLLPRI)){ while((context = net__socket_accept(listensock)) != NULL){ - context->events = EPOLLIN; - mux__add_in(context); } } #ifdef WITH_WEBSOCKETS diff --git a/src/mux_kqueue.c b/src/mux_kqueue.c index b1c8e7cf..46e3ea14 100644 --- a/src/mux_kqueue.c +++ b/src/mux_kqueue.c @@ -106,7 +106,7 @@ int mux_kqueue__remove_out(struct mosquitto *context) } -int mux_kqueue__add_in(struct mosquitto *context) +int mux_kqueue__new(struct mosquitto *context) { struct kevent ev; @@ -173,8 +173,6 @@ int mux_kqueue__handle(void) if(event_list[i].filter == EVFILT_READ){ while((context = net__socket_accept(listensock)) != NULL){ - context->events = EVFILT_READ; - mux__add_in(context); } } #ifdef WITH_WEBSOCKETS diff --git a/src/mux_poll.c b/src/mux_poll.c index e977c209..732a6f18 100644 --- a/src/mux_poll.c +++ b/src/mux_poll.c @@ -139,14 +139,14 @@ int mux_poll__add_out(struct mosquitto *context) int mux_poll__remove_out(struct mosquitto *context) { if(context->events & POLLOUT) { - return mux_poll__add_in(context); + return mux_poll__new(context); }else{ return MOSQ_ERR_SUCCESS; } } -int mux_poll__add_in(struct mosquitto *context) +int mux_poll__new(struct mosquitto *context) { size_t i; @@ -247,8 +247,6 @@ int mux_poll__handle(struct mosquitto__listener_sock *listensock, int listensock #endif { while((context = net__socket_accept(&listensock[i])) != NULL){ - context->pollfd_index = -1; - mux__add_in(context); } } } diff --git a/src/net.c b/src/net.c index dc16c94f..4d635b34 100644 --- a/src/net.c +++ b/src/net.c @@ -228,6 +228,8 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock new_context->address, new_context->remote_port, new_context->listener->port); } + mux__new(new_context); + return new_context; }