diff --git a/lib/connect.c b/lib/connect.c index ac15d2f8..35fa6497 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -79,20 +79,6 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum; mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum; - if(mosq->sockpairR != INVALID_SOCKET){ - COMPAT_CLOSE(mosq->sockpairR); - mosq->sockpairR = INVALID_SOCKET; - } - if(mosq->sockpairW != INVALID_SOCKET){ - COMPAT_CLOSE(mosq->sockpairW); - mosq->sockpairW = INVALID_SOCKET; - } - - if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ - log__printf(mosq, MOSQ_LOG_WARNING, - "Warning: Unable to open socket pair, outgoing publish commands may be delayed."); - } - return MOSQ_ERR_SUCCESS; } diff --git a/lib/loop.c b/lib/loop.c index 42e6ba4d..dffb254b 100644 --- a/lib/loop.c +++ b/lib/loop.c @@ -193,6 +193,15 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) { +#ifdef HAVE_PSELECT + struct timespec local_timeout; +#else + struct timeval local_timeout; +#endif + fd_set readfds; + int fdcount; + char pairbuf; + int maxfd = 0; int run = 1; int rc; unsigned long reconnect_delay; @@ -252,15 +261,42 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) mosq->reconnects++; } -#ifdef WIN32 - Sleep(reconnect_delay*1000); + local_timeout.tv_sec = reconnect_delay; +#ifdef HAVE_PSELECT + local_timeout.tv_nsec = 0; #else - req.tv_sec = reconnect_delay; - req.tv_nsec = 0; - while(nanosleep(&req, &rem) == -1 && errno == EINTR){ - req = rem; + local_timeout.tv_usec = 0; +#endif + FD_ZERO(&readfds); + maxfd = 0; + if(mosq->sockpairR != INVALID_SOCKET){ + /* sockpairR is used to break out of select() before the + * timeout, when mosquitto_loop_stop() is called */ + FD_SET(mosq->sockpairR, &readfds); + maxfd = mosq->sockpairR; } +#ifdef HAVE_PSELECT + fdcount = pselect(maxfd+1, &readfds, NULL, NULL, &local_timeout, NULL); +#else + fdcount = select(maxfd+1, &readfds, NULL, NULL, &local_timeout); +#endif + if(fdcount == -1){ +#ifdef WIN32 + errno = WSAGetLastError(); #endif + if(errno == EINTR){ + return MOSQ_ERR_SUCCESS; + }else{ + return MOSQ_ERR_ERRNO; + } + }else if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){ +#ifndef WIN32 + if(read(mosq->sockpairR, &pairbuf, 1) == 0){ + } +#else + recv(mosq->sockpairR, &pairbuf, 1, 0); +#endif + } state = mosquitto__get_state(mosq); if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ diff --git a/lib/mosquitto.c b/lib/mosquitto.c index f9be81b7..173e716e 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -92,8 +92,10 @@ struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata mosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto)); if(mosq){ mosq->sock = INVALID_SOCKET; - mosq->sockpairR = INVALID_SOCKET; - mosq->sockpairW = INVALID_SOCKET; + if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ + log__printf(mosq, MOSQ_LOG_WARNING, + "Warning: Unable to open socket pair, outgoing publish commands may be delayed."); + } #ifdef WITH_THREADING mosq->thread_id = pthread_self(); #endif @@ -131,8 +133,10 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st } mosq->protocol = mosq_p_mqtt311; mosq->sock = INVALID_SOCKET; - mosq->sockpairR = INVALID_SOCKET; - mosq->sockpairW = INVALID_SOCKET; + if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){ + log__printf(mosq, MOSQ_LOG_WARNING, + "Warning: Unable to open socket pair, outgoing publish commands may be delayed."); + } mosq->keepalive = 60; mosq->clean_start = clean_start; if(id){ diff --git a/lib/net_mosq.c b/lib/net_mosq.c index c6b46238..8d9d9a7c 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -1086,6 +1086,9 @@ int net__socketpair(mosq_sock_t *pairR, mosq_sock_t *pairW) #else int sv[2]; + *pairR = INVALID_SOCKET; + *pairW = INVALID_SOCKET; + if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){ return MOSQ_ERR_ERRNO; }