Add bridge_receive_maximum option for MQTT v5.0 bridges.

pull/2438/head
Roger Light 4 years ago
parent 99eddeb109
commit ff69dc8db2

@ -85,6 +85,7 @@ Broker:
- Allow multiple instances of mosquitto to run as services on Windows. See
README-windows.txt.
- Add ability to deny wildcard subscriptions for a role to the dynsec plugin.
- Add bridge_receive_maximum option for MQTT v5.0 bridges.
- Add bridge_session_expiry_interval option for MQTT v5.0 bridges.
Client library:

@ -1828,6 +1828,19 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
<replaceable>mqttv311</replaceable>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_receive_maximum</option> <replaceable>count</replaceable></term>
<listitem>
<para>
If the bridge is using MQTT v5.0 then use
<option>bridge_receive_maximum</option>
to limit the number QoS 1 or 2 messages that can be
in-flight at once. Must be 1-65535. Defaults to
<option>max_inflight_messages</options>, which defaults
to 20.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>bridge_session_expiry_interval</option> <replaceable>interval</replaceable></term>
<listitem>

@ -366,8 +366,9 @@ static int bridge__connect_step2(struct mosquitto *context)
int bridge__connect_step3(struct mosquitto *context)
{
int rc;
mosquitto_property topic_alias_max;
mosquitto_property receive_maximum;
mosquitto_property session_expiry_interval;
mosquitto_property topic_alias_max;
mosquitto_property *properties = NULL;
rc = net__socket_connect_step3(context, context->bridge->addresses[context->bridge->cur_address].address);
@ -395,12 +396,12 @@ int bridge__connect_step3(struct mosquitto *context)
if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
#endif
if(context->bridge->max_topic_alias != 0){
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max.next = properties;
properties = &topic_alias_max;
if(context->bridge->receive_maximum != 0){
receive_maximum.value.i16 = context->bridge->receive_maximum;
receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
receive_maximum.client_generated = false;
receive_maximum.next = properties;
properties = &receive_maximum;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
@ -409,6 +410,13 @@ int bridge__connect_step3(struct mosquitto *context)
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}
if(context->bridge->max_topic_alias != 0){
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
rc = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc == MOSQ_ERR_SUCCESS){
@ -438,8 +446,9 @@ int bridge__connect(struct mosquitto *context)
uint8_t notification_payload;
struct mosquitto__bridge_topic *cur_topic;
uint8_t qos;
mosquitto_property topic_alias_max;
mosquitto_property receive_maximum;
mosquitto_property session_expiry_interval;
mosquitto_property topic_alias_max;
mosquitto_property *properties = NULL;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
@ -560,12 +569,12 @@ int bridge__connect(struct mosquitto *context)
if(bridge__set_tcp_user_timeout(context)) return MOSQ_ERR_UNKNOWN;
#endif
if(context->bridge->max_topic_alias){
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max.next = properties;
properties = &topic_alias_max;
if(context->bridge->receive_maximum != 0){
receive_maximum.value.i16 = context->bridge->receive_maximum;
receive_maximum.identifier = MQTT_PROP_RECEIVE_MAXIMUM;
receive_maximum.client_generated = false;
receive_maximum.next = properties;
properties = &receive_maximum;
}
if(context->bridge->session_expiry_interval != 0){
session_expiry_interval.value.i32 = context->bridge->session_expiry_interval;
@ -574,6 +583,13 @@ int bridge__connect(struct mosquitto *context)
session_expiry_interval.next = properties;
properties = &session_expiry_interval;
}
if(context->bridge->max_topic_alias != 0){
topic_alias_max.value.i16 = context->bridge->max_topic_alias;
topic_alias_max.identifier = MQTT_PROP_TOPIC_ALIAS_MAXIMUM;
topic_alias_max.client_generated = false;
topic_alias_max.next = properties;
properties = &topic_alias_max;
}
rc2 = send__connect(context, context->keepalive, context->clean_start, properties);
if(rc2 == MOSQ_ERR_SUCCESS){

@ -1269,6 +1269,25 @@ static int config__read_file_core(struct mosquitto__config *config, bool reload,
if(conf__parse_string(&token, "bridge_psk", &cur_bridge->tls_psk, &saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge and/or TLS-PSK support not available.");
#endif
}else if(!strcmp(token, "bridge_receive_maximum")){
#if defined(WITH_BRIDGE)
if(reload) continue; /* Bridges not valid for reloading. */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
if(conf__parse_int(&token, "bridge_receive_maximum", &tmp_int, &saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int <= 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_receive_maximum must be greater than 0.");
return MOSQ_ERR_INVAL;
}else if((uint64_t)tmp_int > (uint64_t)UINT16_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: bridge_receive_maximum must be lower than %u.", UINT16_MAX);
return MOSQ_ERR_INVAL;
}
cur_bridge->receive_maximum = (uint16_t)tmp_int;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_reload_type")){
#ifdef WITH_BRIDGE

@ -582,6 +582,7 @@ struct mosquitto__bridge{
int threshold;
uint32_t maximum_packet_size;
uint32_t session_expiry_interval;
uint16_t receive_maximum;
bool lazy_reconnect;
bool attempt_unsubscribe;
bool initial_notification_done;

Loading…
Cancel
Save