Bridge support for MQTT v5 maximum-qos.

pull/1920/head
Roger A. Light 5 years ago
parent ca4b23486b
commit 04c110183c

@ -97,6 +97,7 @@ Broker:
- Fix bridges incorrectly setting Wills to manage remote notifications when
`notifications_local_only` was set true. Closes #1902.
- Bridges now obey MQTT v5 server-keepalive.
- Add bridge support for the MQTT v5 maximum-qos property.
Client library:
- Client no longer generates random client ids for v3.1.1 clients, these are

@ -49,7 +49,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(qos > mosq->maximum_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED;
if(qos > mosq->max_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED;
if(!mosq->retain_available){
retain = false;

@ -98,7 +98,7 @@ int handle__connack(struct mosquitto *mosq)
}
mosquitto_property_read_byte(properties, MQTT_PROP_RETAIN_AVAILABLE, &mosq->retain_available, false);
mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false);
mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->max_qos, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->msgs_out.inflight_maximum, false);
mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);
mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false);

@ -173,7 +173,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->ping_t = 0;
mosq->last_mid = 0;
mosq->state = mosq_cs_new;
mosq->maximum_qos = 2;
mosq->max_qos = 2;
mosq->msgs_in.inflight_maximum = 20;
mosq->msgs_out.inflight_maximum = 20;
mosq->msgs_in.inflight_quota = 20;

@ -335,7 +335,7 @@ struct mosquitto {
ares_channel achan;
# endif
#endif
uint8_t maximum_qos;
uint8_t max_qos;
uint8_t retain_available;
bool tcp_nodelay;

@ -1107,7 +1107,7 @@ log_timestamp_format %Y-%m-%dT%H:%M:%S
</listitem>
</varlistentry>
<varlistentry>
<term><option>maximum_qos</option> <replaceable>count</replaceable></term>
<term><option>max_qos</option> <replaceable>value</replaceable></term>
<listitem>
<para>Limit the QoS value allowed for clients connecting to this
listener. Defaults to 2, which means any QoS can be

@ -97,6 +97,10 @@
# be queued until the first limit is reached.
#max_queued_bytes 0
# Set the maximum QoS supported. Clients publishing at a QoS higher than
# specified here will be disconnected.
#max_qos 2
# The maximum number of QoS 1 and 2 messages to hold in a queue per client
# above those that are currently in-flight. Defaults to 1000. Set
# to 0 for no maximum (not recommended).

@ -149,6 +149,7 @@ int bridge__connect_step1(struct mosquitto *context)
size_t notification_topic_len;
uint8_t notification_payload;
int i;
uint8_t qos;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
@ -176,9 +177,14 @@ int bridge__connect_step1(struct mosquitto *context)
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(context->bridge->topics[i].qos > context->max_qos){
qos = context->max_qos;
}else{
qos = context->bridge->topics[i].qos;
}
if(sub__add(context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos,
qos,
0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
&db.subs) > 0){
@ -186,7 +192,7 @@ int bridge__connect_step1(struct mosquitto *context)
}
retain__queue(context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos, 0);
qos, 0);
}
}
@ -194,14 +200,19 @@ int bridge__connect_step1(struct mosquitto *context)
bridge__backoff_step(context);
if(context->bridge->notifications){
if(context->max_qos == 0){
qos = 0;
}else{
qos = 1;
}
if(context->bridge->notification_topic){
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
context->bridge->initial_notification_done = true;
}
notification_payload = '0';
rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, 1, true, NULL);
rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
if(rc != MOSQ_ERR_SUCCESS){
return rc;
}
@ -214,12 +225,12 @@ int bridge__connect_step1(struct mosquitto *context)
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(context, notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
context->bridge->initial_notification_done = true;
}
notification_payload = '0';
rc = will__set(context, notification_topic, 1, &notification_payload, 1, true, NULL);
rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
mosquitto__free(notification_topic);
if(rc != MOSQ_ERR_SUCCESS){
return rc;
@ -326,6 +337,7 @@ int bridge__connect(struct mosquitto *context)
char *notification_topic = NULL;
size_t notification_topic_len;
uint8_t notification_payload;
uint8_t qos;
if(!context || !context->bridge) return MOSQ_ERR_INVAL;
@ -353,9 +365,14 @@ int bridge__connect(struct mosquitto *context)
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic);
if(context->bridge->topics[i].qos > context->max_qos){
qos = context->max_qos;
}else{
qos = context->bridge->topics[i].qos;
}
if(sub__add(context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos,
qos,
0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
&db.subs) > 0){
@ -369,15 +386,20 @@ int bridge__connect(struct mosquitto *context)
bridge__backoff_step(context);
if(context->bridge->notifications){
if(context->max_qos == 0){
qos = 0;
}else{
qos = 1;
}
if(context->bridge->notification_topic){
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
context->bridge->initial_notification_done = true;
}
notification_payload = '0';
rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, 1, true, NULL);
rc = will__set(context, context->bridge->notification_topic, 1, &notification_payload, qos, true, NULL);
if(rc != MOSQ_ERR_SUCCESS){
return rc;
}
@ -390,12 +412,12 @@ int bridge__connect(struct mosquitto *context)
if(!context->bridge->initial_notification_done){
notification_payload = '0';
db__messages_easy_queue(context, notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
context->bridge->initial_notification_done = true;
}
notification_payload = '0';
rc = will__set(context, notification_topic, 1, &notification_payload, 1, true, NULL);
rc = will__set(context, notification_topic, 1, &notification_payload, qos, true, NULL);
if(rc != MOSQ_ERR_SUCCESS){
mosquitto__free(notification_topic);
return rc;
@ -459,8 +481,14 @@ int bridge__on_connect(struct mosquitto *context)
char notification_payload;
int sub_opts;
bool retain = true;
uint8_t qos;
if(context->bridge->notifications){
if(context->max_qos == 0){
qos = 0;
}else{
qos = 1;
}
if(!context->retain_available){
retain = false;
}
@ -468,12 +496,12 @@ int bridge__on_connect(struct mosquitto *context)
if(context->bridge->notification_topic){
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
context->bridge->notification_topic, 1, &notification_payload, 1, retain, 0, NULL, NULL, 0)){
context->bridge->notification_topic, 1, &notification_payload, qos, retain, 0, NULL, NULL, 0)){
return 1;
}
}
db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
}else{
notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state");
notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1));
@ -483,19 +511,23 @@ int bridge__on_connect(struct mosquitto *context)
notification_payload = '1';
if(!context->bridge->notifications_local_only){
if(send__real_publish(context, mosquitto__mid_generate(context),
notification_topic, 1, &notification_payload, 1, retain, 0, NULL, NULL, 0)){
notification_topic, 1, &notification_payload, qos, retain, 0, NULL, NULL, 0)){
mosquitto__free(notification_topic);
return 1;
}
}
db__messages_easy_queue(context, notification_topic, 1, 1, &notification_payload, 1, 0, NULL);
db__messages_easy_queue(context, notification_topic, qos, 1, &notification_payload, 1, 0, NULL);
mosquitto__free(notification_topic);
}
}
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){
sub_opts = context->bridge->topics[i].qos;
if(context->bridge->topics[i].qos > context->max_qos){
sub_opts = context->max_qos;
}else{
sub_opts = context->bridge->topics[i].qos;
}
if(context->bridge->protocol_version == mosq_p_mqtt5){
sub_opts = sub_opts
| MQTT_SUB_OPT_NO_LOCAL
@ -518,9 +550,14 @@ int bridge__on_connect(struct mosquitto *context)
}
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){
if(context->bridge->topics[i].qos > context->max_qos){
qos = context->max_qos;
}else{
qos = context->bridge->topics[i].qos;
}
retain__queue(context,
context->bridge->topics[i].local_topic,
context->bridge->topics[i].qos, 0);
qos, 0);
}
}

@ -441,7 +441,7 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[])
|| config->default_listener.host
|| config->default_listener.port
|| config->default_listener.max_connections != -1
|| config->default_listener.maximum_qos != 2
|| config->default_listener.max_qos != 2
|| config->default_listener.mount_point
|| config->default_listener.protocol != mp_mqtt
|| config->default_listener.socket_domain
@ -482,7 +482,7 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[])
config->listeners[config->listener_count-1].sock_count = 0;
config->listeners[config->listener_count-1].client_count = 0;
config->listeners[config->listener_count-1].use_username_as_clientid = config->default_listener.use_username_as_clientid;
config->listeners[config->listener_count-1].maximum_qos = config->default_listener.maximum_qos;
config->listeners[config->listener_count-1].max_qos = config->default_listener.max_qos;
config->listeners[config->listener_count-1].max_topic_alias = config->default_listener.max_topic_alias;
#ifdef WITH_TLS
config->listeners[config->listener_count-1].tls_version = config->default_listener.tls_version;
@ -1625,14 +1625,14 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration.");
}
}else if(!strcmp(token, "maximum_qos")){
}else if(!strcmp(token, "maximum_qos") || !strcmp(token, "max_qos")){
if(reload) continue; /* Listeners not valid for reloading. */
if(conf__parse_int(&token, "maximum_qos", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(conf__parse_int(&token, token, &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0 || tmp_int > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: maximum_qos must be between 0 and 2 inclusive.");
log__printf(NULL, MOSQ_LOG_ERR, "Error: max_qos must be between 0 and 2 inclusive.");
return MOSQ_ERR_INVAL;
}
cur_listener->maximum_qos = (uint8_t)tmp_int;
cur_listener->max_qos = (uint8_t)tmp_int;
}else if(!strcmp(token, "max_inflight_bytes")){
if(conf__parse_int(&token, "max_inflight_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL;
if(tmp_int < 0) tmp_int = 0;

@ -83,7 +83,7 @@ struct mosquitto *context__init(mosq_sock_t sock)
context->msgs_out.inflight_maximum = db.config->max_inflight_messages;
context->msgs_in.inflight_quota = db.config->max_inflight_messages;
context->msgs_out.inflight_quota = db.config->max_inflight_messages;
context->maximum_qos = 2;
context->max_qos = 2;
#ifdef WITH_TLS
context->ssl = NULL;
#endif

@ -492,8 +492,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
msg->direction = dir;
msg->state = state;
msg->dup = false;
if(qos > context->maximum_qos){
msg->qos = context->maximum_qos;
if(qos > context->max_qos){
msg->qos = context->max_qos;
}else{
msg->qos = qos;
}

@ -35,6 +35,7 @@ int handle__connack(struct mosquitto *context)
uint32_t maximum_packet_size;
uint8_t retain_available;
uint16_t server_keepalive;
uint8_t max_qos = 255;
if(!context){
return MOSQ_ERR_INVAL;
@ -56,9 +57,14 @@ int handle__connack(struct mosquitto *context)
context->bridge->protocol_version = mosq_p_mqtt311;
return MOSQ_ERR_PROTOCOL;
}
rc = property__read_all(CMD_CONNACK, &context->in_packet, &properties);
if(rc) return rc;
/* maximum-qos */
mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS,
&max_qos, false);
/* maximum-packet-size */
if(mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE,
&maximum_packet_size, false)){
@ -96,6 +102,9 @@ int handle__connack(struct mosquitto *context)
if(rc) return rc;
}
#endif
if(max_qos != 255){
context->max_qos = max_qos;
}
mosquitto__set_state(context, mosq_cs_active);
rc = db__message_write_queued_out(context);
if(rc) return rc;
@ -108,6 +117,16 @@ int handle__connack(struct mosquitto *context)
context->retain_available = 0;
log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: retain not available (will retry)");
return MOSQ_ERR_CONN_LOST;
case MQTT_RC_QOS_NOT_SUPPORTED:
if(max_qos == 255){
if(context->max_qos != 0){
context->max_qos--;
}
}else{
context->max_qos = max_qos;
}
log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: QoS not supported (will retry)");
return MOSQ_ERR_CONN_LOST;
default:
log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: %s", mosquitto_reason_string(reason_code));
return MOSQ_ERR_CONN_LOST;

@ -225,7 +225,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
db.persistence_changes++;
}
#endif
context->maximum_qos = context->listener->maximum_qos;
context->max_qos = context->listener->max_qos;
if(context->protocol == mosq_p_mqtt5){
if(context->listener->max_topic_alias > 0){
@ -234,12 +234,6 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
goto error;
}
}
if(context->maximum_qos != 2){
if(mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->maximum_qos)){
rc = MOSQ_ERR_NOMEM;
goto error;
}
}
if(context->keepalive > db.config->max_keepalive){
context->keepalive = db.config->max_keepalive;
if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_SERVER_KEEP_ALIVE, context->keepalive)){
@ -528,6 +522,14 @@ int handle__connect(struct mosquitto *context)
}
property__process_connect(context, &properties);
if(will && will_qos > context->listener->max_qos){
if(protocol_version == mosq_p_mqtt5){
send__connack(context, 0, MQTT_RC_QOS_NOT_SUPPORTED, NULL);
}
rc = MOSQ_ERR_NOT_SUPPORTED;
goto handle_connect_error;
}
if(mosquitto_property_read_string(properties, MQTT_PROP_AUTHENTICATION_METHOD, &context->auth_method, false)){
mosquitto_property_read_binary(properties, MQTT_PROP_AUTHENTICATION_DATA, &auth_data, &auth_data_len, false);
}

@ -68,7 +68,7 @@ int handle__publish(struct mosquitto *context)
db__msg_store_free(msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(msg->qos > context->maximum_qos){
if(msg->qos > context->max_qos){
log__printf(NULL, MOSQ_LOG_INFO,
"Too high QoS in PUBLISH from %s, disconnecting.", context->id);
db__msg_store_free(msg);

@ -142,6 +142,9 @@ int handle__subscribe(struct mosquitto *context)
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(qos > context->max_qos){
qos = context->max_qos;
}
if(context->listener && context->listener->mount_point){

@ -314,6 +314,9 @@ void do_disconnect(struct mosquitto *context, int reason)
case MOSQ_ERR_NOMEM:
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to out of memory.", id);
break;
case MOSQ_ERR_NOT_SUPPORTED:
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to QoS too high or retain not supported.", id);
break;
case MOSQ_ERR_ADMINISTRATIVE_ACTION:
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id);
break;

@ -174,7 +174,7 @@ void listener__set_defaults(struct mosquitto__listener *listener)
listener->security_options.allow_zero_length_clientid = true;
listener->protocol = mp_mqtt;
listener->max_connections = -1;
listener->maximum_qos = 2;
listener->max_qos = 2;
listener->max_topic_alias = 10;
}

@ -199,7 +199,7 @@ struct mosquitto__listener {
enum mosquitto_protocol protocol;
int socket_domain;
bool use_username_as_clientid;
uint8_t maximum_qos;
uint8_t max_qos;
uint16_t max_topic_alias;
#ifdef WITH_TLS
char *cafile;
@ -273,6 +273,7 @@ struct mosquitto__config {
uint32_t message_size_limit;
uint16_t max_inflight_messages;
uint16_t max_keepalive;
uint8_t max_qos;
bool persistence;
char *persistence_location;
char *persistence_file;

@ -65,6 +65,13 @@ int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, c
return rc;
}
}
if(context->listener->max_qos != 2){
rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->listener->max_qos);
if(rc){
mosquitto_property_free_all(&connack_props);
return rc;
}
}
remaining_length += property__get_remaining_length(connack_props);
}

@ -22,9 +22,12 @@ def do_test(publish_packet, reason_code, error_string):
keepalive = 10
connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive)
connack_props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1)
connack_props = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10)
connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_RETAIN_AVAILABLE, 0)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props)
connack_props += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props, property_helper=False)
mid = 0
disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code)

Loading…
Cancel
Save