Basic MQTT v5 support for bridges.

This gives equivalent behaviour as for v3.1.1/v3.1 bridges, there is no extra functionality yet.
pull/1480/head
Roger A. Light 6 years ago
parent 847c3f1f8b
commit b660283e64

@ -141,7 +141,7 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session
packet__write_string(packet, PROTOCOL_NAME, strlen(PROTOCOL_NAME)); packet__write_string(packet, PROTOCOL_NAME, strlen(PROTOCOL_NAME));
} }
#if defined(WITH_BROKER) && defined(WITH_BRIDGE) #if defined(WITH_BROKER) && defined(WITH_BRIDGE)
if(mosq->bridge && mosq->bridge->try_private && mosq->bridge->try_private_accepted){ if(mosq->bridge && mosq->bridge->protocol_version != mosq_p_mqtt5 && mosq->bridge->try_private && mosq->bridge->try_private_accepted){
version |= 0x80; version |= 0x80;
}else{ }else{
} }

@ -1505,9 +1505,10 @@ openssl dhparam -out dhparam.pem 2048</programlisting>
<listitem> <listitem>
<para>Set the version of the MQTT protocol to use with for <para>Set the version of the MQTT protocol to use with for
this bridge. Can be one of this bridge. Can be one of
<replaceable>mqttv31</replaceable> or <replaceable>mqttv50</replaceable>,
<replaceable>mqttv311</replaceable>. Defaults to <replaceable>mqttv311</replaceable> or
<replaceable>mqttv31</replaceable>.</para> <replaceable>mqttv31</replaceable>. Defaults to
<replaceable>mqttv311</replaceable>.</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>

@ -824,7 +824,7 @@
#bridge_attempt_unsubscribe true #bridge_attempt_unsubscribe true
# Set the version of the MQTT protocol to use with for this bridge. Can be one # Set the version of the MQTT protocol to use with for this bridge. Can be one
# of mqttv311 or mqttv11. Defaults to mqttv311. # of mqttv50, mqttv311 or mqttv31. Defaults to mqttv311.
#bridge_protocol_version mqttv311 #bridge_protocol_version mqttv311
# Set the clean session variable for this bridge. # Set the clean session variable for this bridge.

@ -445,6 +445,7 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context)
char *notification_topic; char *notification_topic;
int notification_topic_len; int notification_topic_len;
char notification_payload; char notification_payload;
int sub_opts;
if(context->bridge->notifications){ if(context->bridge->notifications){
notification_payload = '1'; notification_payload = '1';
@ -478,7 +479,14 @@ int bridge__on_connect(struct mosquitto_db *db, struct mosquitto *context)
} }
for(i=0; i<context->bridge->topic_count; i++){ for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){ if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){
if(send__subscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, context->bridge->topics[i].qos, NULL)){ sub_opts = context->bridge->topics[i].qos;
if(context->bridge->protocol_version == mosq_p_mqtt5){
sub_opts = sub_opts
| MQTT_SUB_OPT_NO_LOCAL
| MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
| MQTT_SUB_OPT_SEND_RETAIN_ALWAYS;
}
if(send__subscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, sub_opts, NULL)){
return 1; return 1;
} }
}else{ }else{

@ -1121,6 +1121,8 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->protocol_version = mosq_p_mqtt31; cur_bridge->protocol_version = mosq_p_mqtt31;
}else if(!strcmp(token, "mqttv311")){ }else if(!strcmp(token, "mqttv311")){
cur_bridge->protocol_version = mosq_p_mqtt311; cur_bridge->protocol_version = mosq_p_mqtt311;
}else if(!strcmp(token, "mqttv50")){
cur_bridge->protocol_version = mosq_p_mqtt5;
}else{ }else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge_protocol_version value (%s).", token); log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge_protocol_version value (%s).", token);
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;

Loading…
Cancel
Save