diff --git a/config.mk b/config.mk index 087766c5..3d2aa499 100644 --- a/config.mk +++ b/config.mk @@ -156,6 +156,10 @@ ifeq ($(UNAME),QNX) LIB_LIBS:=$(LIB_LIBS) -lsocket endif +ifeq ($(UNAME),Linux) + BROKER_LIBS:=$(BROKER_LIBS) -lanl +endif + ifeq ($(WITH_WRAP),yes) BROKER_LIBS:=$(BROKER_LIBS) -lwrap BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_WRAP diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 4b4cf858..640de160 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -56,6 +56,9 @@ Contributors: #include "mosquitto.h" #include "time_mosq.h" #ifdef WITH_BROKER +# ifdef __linux__ +# include +# endif # include "uthash.h" struct mosquitto_client_msg; #endif @@ -151,6 +154,9 @@ struct mosquitto { mosq_sock_t sock; #ifndef WITH_BROKER mosq_sock_t sockpairR, sockpairW; +#endif +#ifdef __linux__ + struct gaicb *adns; /* For getaddrinfo_a */ #endif enum _mosquitto_protocol protocol; char *address; diff --git a/lib/net_mosq.c b/lib/net_mosq.c index 26f98554..24174f09 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -14,12 +14,15 @@ Contributors: Roger Light - initial implementation and documentation. */ +#define _GNU_SOURCE + #include #include #include #include #include #ifndef WIN32 +#define _GNU_SOURCE #include #include #include @@ -268,6 +271,88 @@ static unsigned int psk_client_callback(SSL *ssl, const char *hint, } #endif +#if defined(WITH_BROKER) && defined(__linux__) +/* Async connect, part 1 (dns lookup) */ +int _mosquitto_try_connect_step1(struct mosquitto *mosq, const char *host) +{ + int s; + void *sevp = NULL; + + if(mosq->adns){ + _mosquitto_free(mosq->adns); + } + mosq->adns = _mosquitto_calloc(1, sizeof(struct gaicb)); + if(!mosq->adns){ + return MOSQ_ERR_NOMEM; + } + mosq->adns->ar_name = host; + + s = getaddrinfo_a(GAI_NOWAIT, &mosq->adns, 1, sevp); + if(s){ + errno = s; + return MOSQ_ERR_EAI; + } + + return MOSQ_ERR_SUCCESS; +} + +/* Async connect part 2, the connection. */ +int _mosquitto_try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *sock) +{ + struct addrinfo *ainfo, *rp; + int rc; + + ainfo = mosq->adns->ar_result; + + for(rp = ainfo; rp != NULL; rp = rp->ai_next){ + *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if(*sock == INVALID_SOCKET) continue; + + if(rp->ai_family == PF_INET){ + ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port); + }else if(rp->ai_family == PF_INET6){ + ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port); + }else{ + COMPAT_CLOSE(*sock); + continue; + } + + /* Set non-blocking */ + if(_mosquitto_socket_nonblock(*sock)){ + COMPAT_CLOSE(*sock); + continue; + } + + rc = connect(*sock, rp->ai_addr, rp->ai_addrlen); +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){ + if(rc < 0 && (errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK)){ + rc = MOSQ_ERR_CONN_PENDING; + } + + /* Set non-blocking */ + if(_mosquitto_socket_nonblock(*sock)){ + COMPAT_CLOSE(*sock); + continue; + } + break; + } + + COMPAT_CLOSE(*sock); + *sock = INVALID_SOCKET; + } + freeaddrinfo(ainfo); + if(!rp){ + return MOSQ_ERR_ERRNO; + } + return rc; +} + +#endif + + int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address, bool blocking) { struct addrinfo hints; @@ -303,7 +388,7 @@ int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t po for(rp = ainfo; rp != NULL; rp = rp->ai_next){ *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if(*sock == INVALID_SOCKET) continue; - + if(rp->ai_family == PF_INET){ ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port); }else if(rp->ai_family == PF_INET6){ @@ -396,24 +481,13 @@ int mosquitto__socket_connect_tls(struct mosquitto *mosq) } #endif -/* Create a socket and connect it to 'ip' on port 'port'. - * Returns -1 on failure (ip is NULL, socket creation/connection error) - * Returns sock number on success. - */ -int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) +int _mosquitto_socket_connect_step3(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) { - mosq_sock_t sock = INVALID_SOCKET; - int rc; #ifdef WITH_TLS int ret; BIO *bio; #endif - if(!mosq || !host || !port) return MOSQ_ERR_INVAL; - - rc = _mosquitto_try_connect(mosq, host, port, &sock, bind_address, blocking); - if(rc > 0) return rc; - #ifdef WITH_TLS if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){ #if OPENSSL_VERSION_NUMBER >= 0x10001000L @@ -425,7 +499,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method()); }else{ _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version); - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_INVAL; } #else @@ -433,13 +507,13 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method()); }else{ _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version); - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_INVAL; } #endif if(!mosq->ssl_ctx){ _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context."); - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } @@ -456,7 +530,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers); if(ret == 0){ _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers); - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } } @@ -480,7 +554,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath); } #endif - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } if(mosq->tls_cert_reqs == 0){ @@ -502,7 +576,7 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t #else _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile); #endif - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } } @@ -514,13 +588,13 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t #else _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile); #endif - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } ret = SSL_CTX_check_private_key(mosq->ssl_ctx); if(ret != 1){ _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent."); - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } } @@ -532,27 +606,43 @@ int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t mosq->ssl = SSL_new(mosq->ssl_ctx); if(!mosq->ssl){ - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq); - bio = BIO_new_socket(sock, BIO_NOCLOSE); + bio = BIO_new_socket(mosq->sock, BIO_NOCLOSE); if(!bio){ - COMPAT_CLOSE(sock); + COMPAT_CLOSE(mosq->sock); return MOSQ_ERR_TLS; } SSL_set_bio(mosq->ssl, bio, bio); - mosq->sock = sock; if(mosquitto__socket_connect_tls(mosq)){ return MOSQ_ERR_TLS; } } #endif + return MOSQ_ERR_SUCCESS; +} + +/* Create a socket and connect it to 'ip' on port 'port'. + * Returns -1 on failure (ip is NULL, socket creation/connection error) + * Returns sock number on success. + */ +int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking) +{ + mosq_sock_t sock = INVALID_SOCKET; + int rc; + + if(!mosq || !host || !port) return MOSQ_ERR_INVAL; + + rc = _mosquitto_try_connect(mosq, host, port, &sock, bind_address, blocking); + if(rc > 0) return rc; mosq->sock = sock; + rc = _mosquitto_socket_connect_step3(mosq, host, port, bind_address, blocking); return rc; } diff --git a/lib/net_mosq.h b/lib/net_mosq.h index b101746d..569e9452 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -61,6 +61,8 @@ int _mosquitto_socket_close(struct mosquitto_db *db, struct mosquitto *mosq); int _mosquitto_socket_close(struct mosquitto *mosq); #endif int _mosquitto_try_connect(struct mosquitto *mosq, const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address, bool blocking); +int _mosquitto_try_connect_step1(struct mosquitto *mosq, const char *host); +int _mosquitto_try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *sock); int _mosquitto_socket_nonblock(mosq_sock_t sock); int _mosquitto_socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2); diff --git a/src/bridge.c b/src/bridge.c index a7d5ad09..e9a3d0ee 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -131,10 +131,15 @@ int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge) return MOSQ_ERR_NOMEM; } +#ifdef __linux__ + new_context->bridge->restart_t = 1; /* force quick restart of bridge */ + return mqtt3_bridge_connect_step1(db, new_context); +#else return mqtt3_bridge_connect(db, new_context); +#endif } -int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) +int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context) { int rc; int i; @@ -207,6 +212,31 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) } } + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port); + rc = _mosquitto_try_connect_step1(context, context->bridge->addresses[context->bridge->cur_address].address); + if(rc > 0 ){ + if(rc == MOSQ_ERR_TLS){ + _mosquitto_socket_close(db, context); + return rc; /* Error already printed */ + }else if(rc == MOSQ_ERR_ERRNO){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno)); + }else if(rc == MOSQ_ERR_EAI){ + _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno)); + } + + return rc; + } + + return MOSQ_ERR_SUCCESS; +} + + +int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *context) +{ + int rc; + + if(!context || !context->bridge) return MOSQ_ERR_INVAL; + _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port); rc = _mosquitto_socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false); if(rc > 0 ){ @@ -245,6 +275,18 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) } } + +int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) +{ + int rc; + + rc = mqtt3_bridge_connect_step1(db, context); + if(rc) return rc; + + return mqtt3_bridge_connect_step2(db, context); +} + + void mqtt3_bridge_packet_cleanup(struct mosquitto *context) { struct _mosquitto_packet *packet; diff --git a/src/loop.c b/src/loop.c index 07e02c5b..d3d5e6a8 100644 --- a/src/loop.c +++ b/src/loop.c @@ -244,45 +244,57 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li context->bridge->primary_retry = now + 5; } }else{ - if(context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect){ - rc = mqtt3_bridge_connect(db, context); - if(rc == MOSQ_ERR_SUCCESS){ - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[pollfd_index].events |= POLLOUT; + if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) + || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ + +#ifdef __linux__ + if(context->adns){ + /* Waiting on DNS lookup */ + rc = mqtt3_bridge_connect_step2(db, context); + if(rc == MOSQ_ERR_SUCCESS){ + pollfds[pollfd_index].fd = context->sock; + pollfds[pollfd_index].events = POLLIN; + pollfds[pollfd_index].revents = 0; + if(context->current_out_packet){ + pollfds[pollfd_index].events |= POLLOUT; + } + context->pollfd_index = pollfd_index; + pollfd_index++; + }else{ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } } - context->pollfd_index = pollfd_index; - pollfd_index++; }else{ - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; + rc = mqtt3_bridge_connect_step1(db, context); + if(rc){ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } } } - } - if(context->bridge->start_type == bst_automatic && now > context->bridge->restart_t){ - context->bridge->restart_t = 0; - rc = mqtt3_bridge_connect(db, context); - if(rc == MOSQ_ERR_SUCCESS){ - pollfds[pollfd_index].fd = context->sock; - pollfds[pollfd_index].events = POLLIN; - pollfds[pollfd_index].revents = 0; - if(context->current_out_packet){ - pollfds[pollfd_index].events |= POLLOUT; - } - context->pollfd_index = pollfd_index; - pollfd_index++; - }else{ - /* Retry later. */ - context->bridge->restart_t = now+context->bridge->restart_timeout; - - context->bridge->cur_address++; - if(context->bridge->cur_address == context->bridge->address_count){ - context->bridge->cur_address = 0; +#else + { + rc = mqtt3_bridge_connect(db, context); + if(rc == MOSQ_ERR_SUCCESS){ + pollfds[pollfd_index].fd = context->sock; + pollfds[pollfd_index].events = POLLIN; + pollfds[pollfd_index].revents = 0; + if(context->current_out_packet){ + pollfds[pollfd_index].events |= POLLOUT; + } + context->pollfd_index = pollfd_index; + pollfd_index++; + }else{ + context->bridge->cur_address++; + if(context->bridge->cur_address == context->bridge->address_count){ + context->bridge->cur_address = 0; + } } } +#endif } } } diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index bd6a0ef7..c89489b6 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -466,6 +466,8 @@ int _mosquitto_log_printf(struct mosquitto *mosq, int level, const char *fmt, .. #ifdef WITH_BRIDGE int mqtt3_bridge_new(struct mosquitto_db *db, struct _mqtt3_bridge *bridge); int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context); +int mqtt3_bridge_connect_step1(struct mosquitto_db *db, struct mosquitto *context); +int mqtt3_bridge_connect_step2(struct mosquitto_db *db, struct mosquitto *context); void mqtt3_bridge_packet_cleanup(struct mosquitto *context); #endif