Fix "round_robin false" behaviour.

Closes #481.
pull/971/head
Roger A. Light 7 years ago
parent 3ae387e232
commit 2b4ba10b3d

@ -7,6 +7,7 @@ Broker:
- Fix excessive CPU usage when the number of sockets exceeds the system limit.
Closes #948.
- Fix for bridge connections when using WITH_ADNS=yes.
- Fix round_robin false behaviour. Closes #481.
Library:
- Fix situation where username and password is used with SOCKS5 proxy. Closes

@ -249,6 +249,10 @@ int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context)
return rc;
}
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = mosquitto_time() + 5;
}
rc = send__connect(context, context->keepalive, context->clean_session);
if(rc == MOSQ_ERR_SUCCESS){
return MOSQ_ERR_SUCCESS;
@ -270,7 +274,7 @@ int bridge__connect_step3(struct mosquitto_db *db, struct mosquitto *context)
int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
{
int rc;
int rc, rc2;
int i;
char *notification_topic;
int notification_topic_len;
@ -348,7 +352,7 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_NOTICE, "Connecting bridge %s (%s:%d)", context->bridge->name, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port);
rc = net__socket_connect(context, context->bridge->addresses[context->bridge->cur_address].address, context->bridge->addresses[context->bridge->cur_address].port, NULL, false);
if(rc > 0 ){
if(rc > 0){
if(rc == MOSQ_ERR_TLS){
net__socket_close(db, context);
return rc; /* Error already printed */
@ -359,28 +363,27 @@ int bridge__connect(struct mosquitto_db *db, struct mosquitto *context)
}
return rc;
}else if(rc == MOSQ_ERR_CONN_PENDING){
context->state = mosq_cs_connect_pending;
}
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(context->sock), context);
if(rc == MOSQ_ERR_CONN_PENDING){
context->state = mosq_cs_connect_pending;
}
rc = send__connect(context, context->keepalive, context->clean_session);
if(rc == MOSQ_ERR_SUCCESS){
return MOSQ_ERR_SUCCESS;
}else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
rc2 = send__connect(context, context->keepalive, context->clean_session);
if(rc2 == MOSQ_ERR_SUCCESS){
return rc;
}else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){
return MOSQ_ERR_SUCCESS;
}else{
if(rc == MOSQ_ERR_TLS){
return rc; /* Error already printed */
}else if(rc == MOSQ_ERR_ERRNO){
if(rc2 == MOSQ_ERR_TLS){
return rc2; /* Error already printed */
}else if(rc2 == MOSQ_ERR_ERRNO){
log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", strerror(errno));
}else if(rc == MOSQ_ERR_EAI){
}else if(rc2 == MOSQ_ERR_EAI){
log__printf(NULL, MOSQ_LOG_ERR, "Error creating bridge: %s.", gai_strerror(errno));
}
net__socket_close(db, context);
return rc;
return rc2;
}
}
#endif

@ -1157,6 +1157,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true;
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;

@ -125,11 +125,12 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
int pollfd_max;
#endif
#ifdef WITH_BRIDGE
mosq_sock_t bridge_sock;
int rc;
#endif
time_t expiration_check_time = 0;
char *id;
int err;
socklen_t len;
#ifndef WIN32
sigemptyset(&sigblock);
@ -231,12 +232,40 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
mosquitto__check_keepalive(db, context);
if(context->bridge->round_robin == false
&& context->bridge->cur_address != 0
&& context->bridge->primary_retry
&& now > context->bridge->primary_retry){
if(net__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &bridge_sock, NULL, false) <= 0){
COMPAT_CLOSE(bridge_sock);
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
if(context->bridge->primary_retry_sock == INVALID_SOCKET){
rc = net__try_connect(context, context->bridge->addresses[0].address,
context->bridge->addresses[0].port,
&context->bridge->primary_retry_sock, NULL, false);
if(rc == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = 0;
}
}else{
len = sizeof(int);
if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if(err == 0){
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = 0;
net__socket_close(db, context);
context->bridge->cur_address = context->bridge->address_count-1;
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}else{
COMPAT_CLOSE(context->bridge->primary_retry_sock);
context->bridge->primary_retry_sock = INVALID_SOCKET;
context->bridge->primary_retry = now+5;
}
}
}
}
@ -324,9 +353,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0;
}
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
}else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){
@ -397,6 +423,10 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
{
rc = bridge__connect(db, context);
if(rc == MOSQ_ERR_SUCCESS){
context->bridge->restart_t = 0;
if(context->bridge->round_robin == false && context->bridge->cur_address != 0){
context->bridge->primary_retry = now + 5;
}
#ifdef WITH_EPOLL
ev.data.fd = context->sock;
ev.events = EPOLLIN;

@ -428,6 +428,7 @@ struct mosquitto__bridge{
int cur_address;
int address_count;
time_t primary_retry;
mosq_sock_t primary_retry_sock;
bool round_robin;
bool try_private;
bool try_private_accepted;

Loading…
Cancel
Save