Add bridge_session_expiry_interval option for MQTT v5.0 bridges.

pull/2438/head
Roger Light 4 years ago
parent cf3be2eb10
commit 99eddeb109

@ -85,6 +85,7 @@ Broker:
- Allow multiple instances of mosquitto to run as services on Windows. See
README-windows.txt.
- Add ability to deny wildcard subscriptions for a role to the dynsec plugin.
- Add bridge_session_expiry_interval option for MQTT v5.0 bridges.
Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair

@ -1828,6 +1828,21 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
<replaceable>mqttv311</replaceable>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_session_expiry_interval</option> <replaceable>interval</replaceable></term>
<listitem>
<para>
If the bridge is using MQTT v5.0 then use
<option>bridge_session_expiry_interval</option>
to set the session expiry interval. Set to
<replaceable>0</replaceable> to have the session expire
immediately when the connection drops. Set to
<replaceable>0xFFFFFFFF</replaceable> to have an infinite
expiry interval. Otherwise the expiry interval is set to
the number of seconds that you specify. Defaults to 0.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_tcp_keepalive</option> <replaceable>idle</replaceable> <replaceable>interval</replaceable> <replaceable>counter</replaceable></term>
<listitem>

@ -778,6 +778,14 @@
# remote broker, and delivered when the bridge reconnects.
#cleansession false
# If the bridge is using MQTT v5.0 then use bridge_session_expiry_interval to
# set the session expiry interval. Set to 0 to have the session expire
# immediately when the connection drops. Set to 0xFFFFFFFF to have an infinite
# expiry interval. Otherwise the expiry interval is set to the number of
# seconds that you specify.
# Defaults to 0.
#bridge_session_expiry_interval 0
# Set the amount of time a bridge using the lazy start type must be idle before
# it will be stopped. Defaults to 60 seconds.
#idle_timeout 60

@ -366,7 +366,9 @@ static int bridge__connect_step2(struct mosquitto *context)
int bridge__connect_step3(struct mosquitto *context)
{
int rc;
mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL;
mosquitto_property topic_alias_max;
mosquitto_property session_expiry_interval;
mosquitto_property *properties = NULL;
rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
if(rc > 0){
@ -394,14 +396,21 @@ int bridge__connect_step3(struct mosquitto *context)
#endif
if(context->bridge->max_topic_alias != 0){
topic_alias_max.next = NULL;
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max_prop = &topic_alias_max;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
session_expiry_interval.client_generated = false;
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}
rc = send__connect(context, context->keepalive, context->clean_start, topic_alias_max_prop);
rc = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc == MOSQ_ERR_SUCCESS){
return MOSQ_ERR_SUCCESS;
}else if(rc == MOSQ_ERR_ERRNO && errno == ENOTCONN){
@ -429,7 +438,9 @@ int bridge__connect(struct mosquitto *context)
uint8_t notification_payload;
struct mosquitto__bridge_topic *cur_topic;
uint8_t qos;
mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL;
mosquitto_property topic_alias_max;
mosquitto_property session_expiry_interval;
mosquitto_property *properties = NULL;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
@ -550,14 +561,21 @@ int bridge__connect(struct mosquitto *context)
#endif
if(context->bridge->max_topic_alias){
topic_alias_max.next = NULL;
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max_prop = &topic_alias_max;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
session_expiry_interval.identifier = MQTT_PROP_SESSION_EXPIRY_INTERVAL;
session_expiry_interval.client_generated = false;
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}
rc2 = send__connect(context, context->keepalive, context->clean_start, topic_alias_max_prop);
rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc2 == MOSQ_ERR_SUCCESS){
return rc;
}else if(rc2 == MOSQ_ERR_ERRNO && errno == ENOTCONN){

@ -1292,6 +1292,25 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
}
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_session_expiry_interval")){
#if defined(WITH_BRIDGE)
if(reload) continue; /* Bridges not valid for reloading. */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
if(conf__parse_int(&token, "bridge_session_expiry_interval", &tmp_int, &saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_session_expiry_interval must not be negative.");
return MOSQ_ERR_INVAL;
}else if((uint64_t)tmp_int > (uint64_t)UINT32_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_session_expiry_interval must be lower than %u.", UINT32_MAX);
return MOSQ_ERR_INVAL;
}
cur_bridge->session_expiry_interval = (uint32_t)tmp_int;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_tcp_keepalive")){
#ifdef WITH_BRIDGE

@ -581,6 +581,7 @@ struct mosquitto__bridge{
int backoff_cap;
int threshold;
uint32_t maximum_packet_size;
uint32_t session_expiry_interval;
bool lazy_reconnect;
bool attempt_unsubscribe;
bool initial_notification_done;

Loading…
Cancel
Save