Merge branch 'improve_backoff' of https://github.com/abiliojr/mosquitto into abiliojr-improve_backoff

pull/2485/head
Roger A. Light 4 years ago
commit a85d9fb3e9

@ -2058,7 +2058,7 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term><option>restart_timeout</option> <replaceable>base cap</replaceable></term> <term><option>restart_timeout</option> <replaceable>base cap [stable]</replaceable></term>
<term><option>restart_timeout</option> <replaceable>constant</replaceable></term> <term><option>restart_timeout</option> <replaceable>constant</replaceable></term>
<listitem> <listitem>
<para>Set the amount of time a bridge using the automatic <para>Set the amount of time a bridge using the automatic
@ -2067,14 +2067,22 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
time in seconds, or to use a backoff mechanism based on time in seconds, or to use a backoff mechanism based on
"Decorrelated Jitter", which adds a degree of "Decorrelated Jitter", which adds a degree of
randomness to when the restart occurs, starting at the randomness to when the restart occurs, starting at the
base and increasing up to the cap. Set a constant base and increasing up to the cap. The backoff time will be reset to
timeout of 20 seconds:</para> <replaceable>base</replaceable> after a successful connection.
When <replaceable>stable</replaceable> is specified, the backoff time
will be reset only if the connection remains open for at least
<replaceable>stable</replaceable> seconds.
<para>Set a constant timeout of 20 seconds:</para>
<programlisting language="config"> <programlisting language="config">
restart_timeout 20</programlisting> restart_timeout 20</programlisting>
<para>Set backoff with a base (start value) of 10 seconds and a cap (upper <para>Set backoff with a base (start value) of 10 seconds and a cap (upper
limit) of 60 seconds:</para> limit) of 60 seconds:</para>
<programlisting language="config"> <programlisting language="config">
restart_timeout 10 30</programlisting> restart_timeout 10 60</programlisting>
<para>Same as previous example, but wait for the connection to be stable for
at least 30 seconds before resetting the backoff:</para>
<programlisting language="config">
restart_timeout 10 60 30</programlisting>
<para>Defaults to jitter with a base of 5 seconds and cap <para>Defaults to jitter with a base of 5 seconds and cap
of 30 seconds.</para> of 30 seconds.</para>
</listitem> </listitem>

@ -857,7 +857,11 @@
# #
# Set backoff with a base (start value) of 10 seconds and a cap (upper limit) of # Set backoff with a base (start value) of 10 seconds and a cap (upper limit) of
# 60 seconds: # 60 seconds:
# restart_timeout 10 30 # restart_timeout 10 60
#
# Same as previous example, but wait for the connection to be stable for
# at least 30 seconds before resetting the backoff:
# restart_timeout 10 60 30
# #
# Defaults to jitter with a base of 5 and cap of 30 # Defaults to jitter with a base of 5 and cap of 30
#restart_timeout 5 30 #restart_timeout 5 30

@ -57,8 +57,7 @@ Contributors:
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
static void bridge__backoff_step(struct mosquitto *context); static void bridge__update_backoff(struct mosquitto__bridge *bridge);
static void bridge__backoff_reset(struct mosquitto *context);
#if defined(__GLIBC__) && defined(WITH_ADNS) #if defined(__GLIBC__) && defined(WITH_ADNS)
static int bridge__connect_step1(struct mosquitto *context); static int bridge__connect_step1(struct mosquitto *context);
static int bridge__connect_step2(struct mosquitto *context); static int bridge__connect_step2(struct mosquitto *context);
@ -275,9 +274,6 @@ static int bridge__connect_step1(struct mosquitto *context)
} }
} }
/* prepare backoff for a possible failure. Restart timeout will be reset if connection gets established */
bridge__backoff_step(context);
if(context->bridge->notifications){ if(context->bridge->notifications){
if(context->max_qos == 0){ if(context->max_qos == 0){
qos = 0; qos = 0;
@ -500,9 +496,6 @@ int bridge__connect(struct mosquitto *context)
} }
} }
/* prepare backoff for a possible failure. Restart timeout will be reset if connection gets established */
bridge__backoff_step(context);
if(context->bridge->notifications){ if(context->bridge->notifications){
if(context->max_qos == 0){ if(context->max_qos == 0){
qos = 0; qos = 0;
@ -707,7 +700,7 @@ int bridge__on_connect(struct mosquitto *context)
} }
} }
bridge__backoff_reset(context); context->bridge->connected_at = db.now_s;
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
@ -863,38 +856,40 @@ static int rand_between(int low, int high)
return (abs(r) % (high - low)) + low; return (abs(r) % (high - low)) + low;
} }
static void bridge__backoff_step(struct mosquitto *context) static void bridge__backoff_step(struct mosquitto__bridge *bridge)
{ {
struct mosquitto__bridge *bridge; /*
if(!context || !context->bridge) return; Decorrelated Jitter calculation, according to:
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
bridge = context->bridge; */
/* skip if not using backoff */ bridge->restart_timeout = rand_between(bridge->backoff_base, bridge->restart_timeout * 3);
if(bridge->backoff_cap){ if(bridge->restart_timeout > bridge->backoff_cap){
/* “Decorrelated Jitter” calculation, according to: bridge->restart_timeout = bridge->backoff_cap;
* https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
*/
bridge->restart_timeout = rand_between(bridge->backoff_base, bridge->restart_timeout * 3);
if(bridge->restart_timeout > bridge->backoff_cap){
bridge->restart_timeout = bridge->backoff_cap;
}
} }
} }
static void bridge__backoff_reset(struct mosquitto *context) static void bridge__backoff_reset(struct mosquitto__bridge *bridge)
{ {
struct mosquitto__bridge *bridge; bridge->restart_timeout = bridge->backoff_base;
if(!context || !context->bridge) return; }
bridge = context->bridge;
/* skip if not using backoff */ static void bridge__update_backoff(struct mosquitto__bridge *bridge)
if(bridge->backoff_cap){ {
bridge->restart_timeout = bridge->backoff_base; if(!bridge) return;
if(!bridge->backoff_cap) return; /* skip if not using jitter */
if (bridge->connected_at && db.now_s - bridge->connected_at >= bridge->stable_connection_period) {
log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s connection was stable enough, resetting backoff", bridge->name);
bridge__backoff_reset(bridge);
} else {
bridge__backoff_step(bridge);
} }
}
bridge->connected_at = 0;
log__printf(NULL, MOSQ_LOG_INFO, "Bridge %s next backoff will be %d", bridge->name, bridge->restart_timeout);
}
static void bridge_check_pending(struct mosquitto *context) static void bridge_check_pending(struct mosquitto *context)
{ {
@ -1004,22 +999,21 @@ void bridge_check(void)
} }
} }
} }
} }else{
if(!net__is_connected(context)){
if(reload_if_needed(context)) continue; if(reload_if_needed(context)) continue;
/* Want to try to restart the bridge connection */ /* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){ if(!context->bridge->restart_t){
context->bridge->restart_t = db.now_s+context->bridge->restart_timeout; bridge__update_backoff(context->bridge);
context->bridge->restart_t = 1000*db.now_s+context->bridge->restart_timeout;
context->bridge->cur_address++; context->bridge->cur_address++;
if(context->bridge->cur_address == context->bridge->address_count){ if(context->bridge->cur_address == context->bridge->address_count){
context->bridge->cur_address = 0; context->bridge->cur_address = 0;
} }
loop__update_next_event(context->bridge->restart_timeout*1000); loop__update_next_event(context->bridge->restart_timeout);
}else{ }else{
if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect)
|| (context->bridge->start_type == bst_automatic && db.now_s >= context->bridge->restart_t)){ || (context->bridge->start_type == bst_automatic && 1000*db.now_s >= context->bridge->restart_t)){
#if defined(__GLIBC__) && defined(WITH_ADNS) #if defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){ if(context->adns){

@ -1501,6 +1501,7 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
cur_bridge->restart_timeout = 0; cur_bridge->restart_timeout = 0;
cur_bridge->backoff_base = 5; cur_bridge->backoff_base = 5;
cur_bridge->backoff_cap = 30; cur_bridge->backoff_cap = 30;
cur_bridge->stable_connection_period = 0;
cur_bridge->threshold = 10; cur_bridge->threshold = 10;
cur_bridge->try_private = true; cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true; cur_bridge->attempt_unsubscribe = true;
@ -2142,6 +2143,7 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration."); log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }
cur_bridge->backoff_cap = 0; /* set backoff to constant mode, unless cap is specified further down */
token = strtok_r(NULL, " ", &saveptr); token = strtok_r(NULL, " ", &saveptr);
if(!token){ if(!token){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty restart_timeout value in configuration."); log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty restart_timeout value in configuration.");
@ -2160,7 +2162,19 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
log__printf(NULL, MOSQ_LOG_ERR, "Error: backoff cap is lower than the base in restart_timeout."); log__printf(NULL, MOSQ_LOG_ERR, "Error: backoff cap is lower than the base in restart_timeout.");
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->stable_connection_period = atoi(token);
if(cur_bridge->stable_connection_period < 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: stable connection period cannot be negative.");
return MOSQ_ERR_INVAL;
}
}
} }
cur_bridge->restart_timeout *= 1000; /* backoff is tracked in ms */
cur_bridge->backoff_base *= 1000;
cur_bridge->backoff_cap *= 1000;
#else #else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif #endif

@ -584,6 +584,7 @@ struct mosquitto__bridge{
bool topic_remapping; bool topic_remapping;
enum mosquitto__protocol protocol_version; enum mosquitto__protocol protocol_version;
time_t restart_t; time_t restart_t;
time_t connected_at;
char *remote_clientid; char *remote_clientid;
char *remote_username; char *remote_username;
char *remote_password; char *remote_password;
@ -599,6 +600,7 @@ struct mosquitto__bridge{
int restart_timeout; int restart_timeout;
int backoff_base; int backoff_base;
int backoff_cap; int backoff_cap;
int stable_connection_period;
int threshold; int threshold;
uint32_t maximum_packet_size; uint32_t maximum_packet_size;
uint32_t session_expiry_interval; uint32_t session_expiry_interval;

Loading…
Cancel
Save