From 04c110183c5a4abd1eaa9e60b2d1898976e8bb9b Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 1 Dec 2020 16:08:05 +0000 Subject: [PATCH] Bridge support for MQTT v5 maximum-qos. --- ChangeLog.txt | 1 + lib/actions.c | 2 +- lib/handle_connack.c | 2 +- lib/mosquitto.c | 2 +- lib/mosquitto_internal.h | 2 +- man/mosquitto.conf.5.xml | 2 +- mosquitto.conf | 4 ++ src/bridge.c | 71 ++++++++++++++++++++------ src/conf.c | 12 ++--- src/context.c | 2 +- src/database.c | 4 +- src/handle_connack.c | 19 +++++++ src/handle_connect.c | 16 +++--- src/handle_publish.c | 2 +- src/handle_subscribe.c | 3 ++ src/loop.c | 3 ++ src/mosquitto.c | 2 +- src/mosquitto_broker_internal.h | 3 +- src/send_connack.c | 7 +++ test/broker/13-malformed-publish-v5.py | 7 ++- 20 files changed, 123 insertions(+), 43 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 3911485b..fa0e7f11 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -97,6 +97,7 @@ Broker: - Fix bridges incorrectly setting Wills to manage remote notifications when `notifications_local_only` was set true. Closes #1902. - Bridges now obey MQTT v5 server-keepalive. +- Add bridge support for the MQTT v5 maximum-qos property. Client library: - Client no longer generates random client ids for v3.1.1 clients, these are diff --git a/lib/actions.c b/lib/actions.c index 705cb208..da72482b 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -49,7 +49,7 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; - if(qos > mosq->maximum_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED; + if(qos > mosq->max_qos) return MOSQ_ERR_QOS_NOT_SUPPORTED; if(!mosq->retain_available){ retain = false; diff --git a/lib/handle_connack.c b/lib/handle_connack.c index f1cca2b1..11b03657 100644 --- a/lib/handle_connack.c +++ b/lib/handle_connack.c @@ -98,7 +98,7 @@ int handle__connack(struct mosquitto *mosq) } mosquitto_property_read_byte(properties, MQTT_PROP_RETAIN_AVAILABLE, &mosq->retain_available, false); - mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false); + mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->max_qos, false); mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->msgs_out.inflight_maximum, false); mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false); mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false); diff --git a/lib/mosquitto.c b/lib/mosquitto.c index fdaf862c..11149956 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -173,7 +173,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st mosq->ping_t = 0; mosq->last_mid = 0; mosq->state = mosq_cs_new; - mosq->maximum_qos = 2; + mosq->max_qos = 2; mosq->msgs_in.inflight_maximum = 20; mosq->msgs_out.inflight_maximum = 20; mosq->msgs_in.inflight_quota = 20; diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index ace04a2e..18b7eb49 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -335,7 +335,7 @@ struct mosquitto { ares_channel achan; # endif #endif - uint8_t maximum_qos; + uint8_t max_qos; uint8_t retain_available; bool tcp_nodelay; diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index fa909a68..3af551c5 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -1107,7 +1107,7 @@ log_timestamp_format %Y-%m-%dT%H:%M:%S - count + value Limit the QoS value allowed for clients connecting to this listener. Defaults to 2, which means any QoS can be diff --git a/mosquitto.conf b/mosquitto.conf index 243964d0..216b8c7d 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -97,6 +97,10 @@ # be queued until the first limit is reached. #max_queued_bytes 0 +# Set the maximum QoS supported. Clients publishing at a QoS higher than +# specified here will be disconnected. +#max_qos 2 + # The maximum number of QoS 1 and 2 messages to hold in a queue per client # above those that are currently in-flight. Defaults to 1000. Set # to 0 for no maximum (not recommended). diff --git a/src/bridge.c b/src/bridge.c index 62ce2354..cd14a9f0 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -149,6 +149,7 @@ int bridge__connect_step1(struct mosquitto *context) size_t notification_topic_len; uint8_t notification_payload; int i; + uint8_t qos; if(!context || !context->bridge) return MOSQ_ERR_INVAL; @@ -176,9 +177,14 @@ int bridge__connect_step1(struct mosquitto *context) for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); + if(context->bridge->topics[i].qos > context->max_qos){ + qos = context->max_qos; + }else{ + qos = context->bridge->topics[i].qos; + } if(sub__add(context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos, + qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, &db.subs) > 0){ @@ -186,7 +192,7 @@ int bridge__connect_step1(struct mosquitto *context) } retain__queue(context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos, 0); + qos, 0); } } @@ -194,14 +200,19 @@ int bridge__connect_step1(struct mosquitto *context) bridge__backoff_step(context); if(context->bridge->notifications){ + if(context->max_qos == 0){ + qos = 0; + }else{ + qos = 1; + } if(context->bridge->notification_topic){ if(!context->bridge->initial_notification_done){ notification_payload = '0'; - db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); context->bridge->initial_notification_done = true; } notification_payload = '0'; - rc = will__set(context, context->bridge->notification_topic, 1, ¬ification_payload, 1, true, NULL); + rc = will__set(context, context->bridge->notification_topic, 1, ¬ification_payload, qos, true, NULL); if(rc != MOSQ_ERR_SUCCESS){ return rc; } @@ -214,12 +225,12 @@ int bridge__connect_step1(struct mosquitto *context) if(!context->bridge->initial_notification_done){ notification_payload = '0'; - db__messages_easy_queue(context, notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); context->bridge->initial_notification_done = true; } notification_payload = '0'; - rc = will__set(context, notification_topic, 1, ¬ification_payload, 1, true, NULL); + rc = will__set(context, notification_topic, 1, ¬ification_payload, qos, true, NULL); mosquitto__free(notification_topic); if(rc != MOSQ_ERR_SUCCESS){ return rc; @@ -326,6 +337,7 @@ int bridge__connect(struct mosquitto *context) char *notification_topic = NULL; size_t notification_topic_len; uint8_t notification_payload; + uint8_t qos; if(!context || !context->bridge) return MOSQ_ERR_INVAL; @@ -353,9 +365,14 @@ int bridge__connect(struct mosquitto *context) for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); + if(context->bridge->topics[i].qos > context->max_qos){ + qos = context->max_qos; + }else{ + qos = context->bridge->topics[i].qos; + } if(sub__add(context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos, + qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, &db.subs) > 0){ @@ -369,15 +386,20 @@ int bridge__connect(struct mosquitto *context) bridge__backoff_step(context); if(context->bridge->notifications){ + if(context->max_qos == 0){ + qos = 0; + }else{ + qos = 1; + } if(context->bridge->notification_topic){ if(!context->bridge->initial_notification_done){ notification_payload = '0'; - db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); context->bridge->initial_notification_done = true; } notification_payload = '0'; - rc = will__set(context, context->bridge->notification_topic, 1, ¬ification_payload, 1, true, NULL); + rc = will__set(context, context->bridge->notification_topic, 1, ¬ification_payload, qos, true, NULL); if(rc != MOSQ_ERR_SUCCESS){ return rc; } @@ -390,12 +412,12 @@ int bridge__connect(struct mosquitto *context) if(!context->bridge->initial_notification_done){ notification_payload = '0'; - db__messages_easy_queue(context, notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); context->bridge->initial_notification_done = true; } notification_payload = '0'; - rc = will__set(context, notification_topic, 1, ¬ification_payload, 1, true, NULL); + rc = will__set(context, notification_topic, 1, ¬ification_payload, qos, true, NULL); if(rc != MOSQ_ERR_SUCCESS){ mosquitto__free(notification_topic); return rc; @@ -459,8 +481,14 @@ int bridge__on_connect(struct mosquitto *context) char notification_payload; int sub_opts; bool retain = true; + uint8_t qos; if(context->bridge->notifications){ + if(context->max_qos == 0){ + qos = 0; + }else{ + qos = 1; + } if(!context->retain_available){ retain = false; } @@ -468,12 +496,12 @@ int bridge__on_connect(struct mosquitto *context) if(context->bridge->notification_topic){ if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - context->bridge->notification_topic, 1, ¬ification_payload, 1, retain, 0, NULL, NULL, 0)){ + context->bridge->notification_topic, 1, ¬ification_payload, qos, retain, 0, NULL, NULL, 0)){ return 1; } } - db__messages_easy_queue(context, context->bridge->notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, context->bridge->notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); }else{ notification_topic_len = strlen(context->bridge->remote_clientid)+strlen("$SYS/broker/connection//state"); notification_topic = mosquitto__malloc(sizeof(char)*(notification_topic_len+1)); @@ -483,19 +511,23 @@ int bridge__on_connect(struct mosquitto *context) notification_payload = '1'; if(!context->bridge->notifications_local_only){ if(send__real_publish(context, mosquitto__mid_generate(context), - notification_topic, 1, ¬ification_payload, 1, retain, 0, NULL, NULL, 0)){ + notification_topic, 1, ¬ification_payload, qos, retain, 0, NULL, NULL, 0)){ mosquitto__free(notification_topic); return 1; } } - db__messages_easy_queue(context, notification_topic, 1, 1, ¬ification_payload, 1, 0, NULL); + db__messages_easy_queue(context, notification_topic, qos, 1, ¬ification_payload, 1, 0, NULL); mosquitto__free(notification_topic); } } for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){ - sub_opts = context->bridge->topics[i].qos; + if(context->bridge->topics[i].qos > context->max_qos){ + sub_opts = context->max_qos; + }else{ + sub_opts = context->bridge->topics[i].qos; + } if(context->bridge->protocol_version == mosq_p_mqtt5){ sub_opts = sub_opts | MQTT_SUB_OPT_NO_LOCAL @@ -518,9 +550,14 @@ int bridge__on_connect(struct mosquitto *context) } for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ + if(context->bridge->topics[i].qos > context->max_qos){ + qos = context->max_qos; + }else{ + qos = context->bridge->topics[i].qos; + } retain__queue(context, context->bridge->topics[i].local_topic, - context->bridge->topics[i].qos, 0); + qos, 0); } } diff --git a/src/conf.c b/src/conf.c index daef7417..f5b9046c 100644 --- a/src/conf.c +++ b/src/conf.c @@ -441,7 +441,7 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[]) || config->default_listener.host || config->default_listener.port || config->default_listener.max_connections != -1 - || config->default_listener.maximum_qos != 2 + || config->default_listener.max_qos != 2 || config->default_listener.mount_point || config->default_listener.protocol != mp_mqtt || config->default_listener.socket_domain @@ -482,7 +482,7 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[]) config->listeners[config->listener_count-1].sock_count = 0; config->listeners[config->listener_count-1].client_count = 0; config->listeners[config->listener_count-1].use_username_as_clientid = config->default_listener.use_username_as_clientid; - config->listeners[config->listener_count-1].maximum_qos = config->default_listener.maximum_qos; + config->listeners[config->listener_count-1].max_qos = config->default_listener.max_qos; config->listeners[config->listener_count-1].max_topic_alias = config->default_listener.max_topic_alias; #ifdef WITH_TLS config->listeners[config->listener_count-1].tls_version = config->default_listener.tls_version; @@ -1625,14 +1625,14 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration."); } - }else if(!strcmp(token, "maximum_qos")){ + }else if(!strcmp(token, "maximum_qos") || !strcmp(token, "max_qos")){ if(reload) continue; /* Listeners not valid for reloading. */ - if(conf__parse_int(&token, "maximum_qos", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; + if(conf__parse_int(&token, token, &tmp_int, saveptr)) return MOSQ_ERR_INVAL; if(tmp_int < 0 || tmp_int > 2){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: maximum_qos must be between 0 and 2 inclusive."); + log__printf(NULL, MOSQ_LOG_ERR, "Error: max_qos must be between 0 and 2 inclusive."); return MOSQ_ERR_INVAL; } - cur_listener->maximum_qos = (uint8_t)tmp_int; + cur_listener->max_qos = (uint8_t)tmp_int; }else if(!strcmp(token, "max_inflight_bytes")){ if(conf__parse_int(&token, "max_inflight_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; if(tmp_int < 0) tmp_int = 0; diff --git a/src/context.c b/src/context.c index 999d50ec..715e59dc 100644 --- a/src/context.c +++ b/src/context.c @@ -83,7 +83,7 @@ struct mosquitto *context__init(mosq_sock_t sock) context->msgs_out.inflight_maximum = db.config->max_inflight_messages; context->msgs_in.inflight_quota = db.config->max_inflight_messages; context->msgs_out.inflight_quota = db.config->max_inflight_messages; - context->maximum_qos = 2; + context->max_qos = 2; #ifdef WITH_TLS context->ssl = NULL; #endif diff --git a/src/database.c b/src/database.c index 80158b38..2b8c5588 100644 --- a/src/database.c +++ b/src/database.c @@ -492,8 +492,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m msg->direction = dir; msg->state = state; msg->dup = false; - if(qos > context->maximum_qos){ - msg->qos = context->maximum_qos; + if(qos > context->max_qos){ + msg->qos = context->max_qos; }else{ msg->qos = qos; } diff --git a/src/handle_connack.c b/src/handle_connack.c index a1bb8967..14a4f2a3 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -35,6 +35,7 @@ int handle__connack(struct mosquitto *context) uint32_t maximum_packet_size; uint8_t retain_available; uint16_t server_keepalive; + uint8_t max_qos = 255; if(!context){ return MOSQ_ERR_INVAL; @@ -56,9 +57,14 @@ int handle__connack(struct mosquitto *context) context->bridge->protocol_version = mosq_p_mqtt311; return MOSQ_ERR_PROTOCOL; } + rc = property__read_all(CMD_CONNACK, &context->in_packet, &properties); if(rc) return rc; + /* maximum-qos */ + mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, + &max_qos, false); + /* maximum-packet-size */ if(mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &maximum_packet_size, false)){ @@ -96,6 +102,9 @@ int handle__connack(struct mosquitto *context) if(rc) return rc; } #endif + if(max_qos != 255){ + context->max_qos = max_qos; + } mosquitto__set_state(context, mosq_cs_active); rc = db__message_write_queued_out(context); if(rc) return rc; @@ -108,6 +117,16 @@ int handle__connack(struct mosquitto *context) context->retain_available = 0; log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: retain not available (will retry)"); return MOSQ_ERR_CONN_LOST; + case MQTT_RC_QOS_NOT_SUPPORTED: + if(max_qos == 255){ + if(context->max_qos != 0){ + context->max_qos--; + } + }else{ + context->max_qos = max_qos; + } + log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: QoS not supported (will retry)"); + return MOSQ_ERR_CONN_LOST; default: log__printf(NULL, MOSQ_LOG_ERR, "Connection Refused: %s", mosquitto_reason_string(reason_code)); return MOSQ_ERR_CONN_LOST; diff --git a/src/handle_connect.c b/src/handle_connect.c index 509ba379..207fd5a1 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -225,7 +225,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 db.persistence_changes++; } #endif - context->maximum_qos = context->listener->maximum_qos; + context->max_qos = context->listener->max_qos; if(context->protocol == mosq_p_mqtt5){ if(context->listener->max_topic_alias > 0){ @@ -234,12 +234,6 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 goto error; } } - if(context->maximum_qos != 2){ - if(mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->maximum_qos)){ - rc = MOSQ_ERR_NOMEM; - goto error; - } - } if(context->keepalive > db.config->max_keepalive){ context->keepalive = db.config->max_keepalive; if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_SERVER_KEEP_ALIVE, context->keepalive)){ @@ -528,6 +522,14 @@ int handle__connect(struct mosquitto *context) } property__process_connect(context, &properties); + if(will && will_qos > context->listener->max_qos){ + if(protocol_version == mosq_p_mqtt5){ + send__connack(context, 0, MQTT_RC_QOS_NOT_SUPPORTED, NULL); + } + rc = MOSQ_ERR_NOT_SUPPORTED; + goto handle_connect_error; + } + if(mosquitto_property_read_string(properties, MQTT_PROP_AUTHENTICATION_METHOD, &context->auth_method, false)){ mosquitto_property_read_binary(properties, MQTT_PROP_AUTHENTICATION_DATA, &auth_data, &auth_data_len, false); } diff --git a/src/handle_publish.c b/src/handle_publish.c index d3dfd7a4..1fe3899f 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -68,7 +68,7 @@ int handle__publish(struct mosquitto *context) db__msg_store_free(msg); return MOSQ_ERR_MALFORMED_PACKET; } - if(msg->qos > context->maximum_qos){ + if(msg->qos > context->max_qos){ log__printf(NULL, MOSQ_LOG_INFO, "Too high QoS in PUBLISH from %s, disconnecting.", context->id); db__msg_store_free(msg); diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 0a0313a3..b38558ae 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -142,6 +142,9 @@ int handle__subscribe(struct mosquitto *context) mosquitto__free(payload); return MOSQ_ERR_MALFORMED_PACKET; } + if(qos > context->max_qos){ + qos = context->max_qos; + } if(context->listener && context->listener->mount_point){ diff --git a/src/loop.c b/src/loop.c index 7aa7f33a..6826f2bc 100644 --- a/src/loop.c +++ b/src/loop.c @@ -314,6 +314,9 @@ void do_disconnect(struct mosquitto *context, int reason) case MOSQ_ERR_NOMEM: log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to out of memory.", id); break; + case MOSQ_ERR_NOT_SUPPORTED: + log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to QoS too high or retain not supported.", id); + break; case MOSQ_ERR_ADMINISTRATIVE_ACTION: log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s been disconnected by administrative action.", id); break; diff --git a/src/mosquitto.c b/src/mosquitto.c index f8fb7a8c..85bfc7c3 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -174,7 +174,7 @@ void listener__set_defaults(struct mosquitto__listener *listener) listener->security_options.allow_zero_length_clientid = true; listener->protocol = mp_mqtt; listener->max_connections = -1; - listener->maximum_qos = 2; + listener->max_qos = 2; listener->max_topic_alias = 10; } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 5dd69fdc..101f7c0b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -199,7 +199,7 @@ struct mosquitto__listener { enum mosquitto_protocol protocol; int socket_domain; bool use_username_as_clientid; - uint8_t maximum_qos; + uint8_t max_qos; uint16_t max_topic_alias; #ifdef WITH_TLS char *cafile; @@ -273,6 +273,7 @@ struct mosquitto__config { uint32_t message_size_limit; uint16_t max_inflight_messages; uint16_t max_keepalive; + uint8_t max_qos; bool persistence; char *persistence_location; char *persistence_file; diff --git a/src/send_connack.c b/src/send_connack.c index c5982681..e5948484 100644 --- a/src/send_connack.c +++ b/src/send_connack.c @@ -65,6 +65,13 @@ int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, c return rc; } } + if(context->listener->max_qos != 2){ + rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->listener->max_qos); + if(rc){ + mosquitto_property_free_all(&connack_props); + return rc; + } + } remaining_length += property__get_remaining_length(connack_props); } diff --git a/test/broker/13-malformed-publish-v5.py b/test/broker/13-malformed-publish-v5.py index d48b9cd4..ac24fc7f 100755 --- a/test/broker/13-malformed-publish-v5.py +++ b/test/broker/13-malformed-publish-v5.py @@ -22,9 +22,12 @@ def do_test(publish_packet, reason_code, error_string): keepalive = 10 connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive) - connack_props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1) + connack_props = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10) connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_RETAIN_AVAILABLE, 0) - connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props) + connack_props += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20) + connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1) + + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props, property_helper=False) mid = 0 disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code)