diff --git a/ChangeLog.txt b/ChangeLog.txt index 6a5a8537..fd643a8b 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -16,8 +16,8 @@ Broker: publish messages. - Add `mosquitto_client_protocol_version()` function which can be used by plugins to determine which version of MQTT a client has connected with. -- Send DISCONNECT with `malformed-packet` reason code on invalid SUBSCRIBE, - UNSUBSCRIBE packets. +- Send DISCONNECT with `malformed-packet` reason code on invalid PUBLISH, + SUBSCRIBE, and UNSUBSCRIBE packets. Client library: - Client no longer generates random client ids for v3.1.1 clients, these are diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 44d7c83f..9f9f996f 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -99,6 +99,8 @@ enum mosq_err_t { MOSQ_ERR_OVERSIZE_PACKET = 25, MOSQ_ERR_OCSP = 26, MOSQ_ERR_TIMEOUT = 27, + MOSQ_ERR_RETAIN_NOT_SUPPORTED = 28, + MOSQ_ERR_TOPIC_ALIAS_INVALID = 29, }; /* Option values */ diff --git a/src/handle_connect.c b/src/handle_connect.c index 1d721996..39ba66d7 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -228,14 +228,14 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v context->maximum_qos = context->listener->maximum_qos; if(context->protocol == mosq_p_mqtt5){ - if(context->maximum_qos != 2){ - if(mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->maximum_qos)){ + if(context->listener->max_topic_alias > 0){ + if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_TOPIC_ALIAS_MAXIMUM, context->listener->max_topic_alias)){ rc = MOSQ_ERR_NOMEM; goto error; } } - if(context->listener->max_topic_alias > 0){ - if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_TOPIC_ALIAS_MAXIMUM, context->listener->max_topic_alias)){ + if(context->maximum_qos != 2){ + if(mosquitto_property_add_byte(&connack_props, MQTT_PROP_MAXIMUM_QOS, context->maximum_qos)){ rc = MOSQ_ERR_NOMEM; goto error; } diff --git a/src/handle_publish.c b/src/handle_publish.c index bd45aab9..ec7b3ead 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -66,38 +66,35 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) log__printf(NULL, MOSQ_LOG_INFO, "Invalid QoS in PUBLISH from %s, disconnecting.", context->id); db__msg_store_free(msg); - return 1; + return MOSQ_ERR_MALFORMED_PACKET; } if(msg->qos > context->maximum_qos){ log__printf(NULL, MOSQ_LOG_INFO, "Too high QoS in PUBLISH from %s, disconnecting.", context->id); db__msg_store_free(msg); - return 1; + return MOSQ_ERR_QOS_NOT_SUPPORTED; } msg->retain = (header & 0x01); if(msg->retain && db->config->retain_available == false){ - if(context->protocol == mosq_p_mqtt5){ - send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL); - } db__msg_store_free(msg); - return 1; + return MOSQ_ERR_RETAIN_NOT_SUPPORTED; } if(packet__read_string(&context->in_packet, &msg->topic, &slen)){ db__msg_store_free(msg); - return 1; + return MOSQ_ERR_MALFORMED_PACKET; } if(!slen && context->protocol != mosq_p_mqtt5){ /* Invalid publish topic, disconnect client. */ db__msg_store_free(msg); - return 1; + return MOSQ_ERR_MALFORMED_PACKET; } if(msg->qos > 0){ if(packet__read_uint16(&context->in_packet, &mid)){ db__msg_store_free(msg); - return 1; + return MOSQ_ERR_MALFORMED_PACKET; } if(mid == 0){ db__msg_store_free(msg); @@ -113,7 +110,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) rc = property__read_all(CMD_PUBLISH, &context->in_packet, &properties); if(rc){ db__msg_store_free(msg); - return rc; + if(rc == MOSQ_ERR_PROTOCOL){ + return MOSQ_ERR_MALFORMED_PACKET; + }else{ + return rc; + } } p = properties; @@ -171,8 +172,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) if(topic_alias == 0 || (context->listener && topic_alias > context->listener->max_topic_alias)){ db__msg_store_free(msg); - send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); - return MOSQ_ERR_PROTOCOL; + return MOSQ_ERR_TOPIC_ALIAS_INVALID; }else if(topic_alias > 0){ if(msg->topic){ rc = alias__add(context, msg->topic, topic_alias); @@ -183,9 +183,8 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) }else{ rc = alias__find(context, &msg->topic, topic_alias); if(rc){ - send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); db__msg_store_free(msg); - return rc; + return MOSQ_ERR_TOPIC_ALIAS_INVALID; } } } @@ -201,7 +200,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) if(mosquitto_pub_topic_check(msg->topic) != MOSQ_ERR_SUCCESS){ /* Invalid publish topic, just swallow it. */ db__msg_store_free(msg); - return 1; + return MOSQ_ERR_PROTOCOL; } msg->payloadlen = context->in_packet.remaining_length - context->in_packet.pos; @@ -233,7 +232,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->payloadlen)){ db__msg_store_free(msg); - return MOSQ_ERR_UNKNOWN; + return MOSQ_ERR_MALFORMED_PACKET; } } diff --git a/src/read_handle.c b/src/read_handle.c index c2aa05e9..e068dd9b 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -46,7 +46,8 @@ int handle__packet(struct mosquitto_db *db, struct mosquitto *context) case CMD_PUBCOMP: return handle__pubackcomp(db, context, "PUBCOMP"); case CMD_PUBLISH: - return handle__publish(db, context); + rc = handle__publish(db, context); + break; case CMD_PUBREC: return handle__pubrec(db, context); case CMD_PUBREL: @@ -80,6 +81,12 @@ int handle__packet(struct mosquitto_db *db, struct mosquitto *context) send__disconnect(context, MQTT_RC_PROTOCOL_ERROR, NULL); }else if(rc == MOSQ_ERR_MALFORMED_PACKET){ send__disconnect(context, MQTT_RC_MALFORMED_PACKET, NULL); + }else if(rc == MOSQ_ERR_QOS_NOT_SUPPORTED){ + send__disconnect(context, MQTT_RC_QOS_NOT_SUPPORTED, NULL); + }else if(rc == MOSQ_ERR_RETAIN_NOT_SUPPORTED){ + send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL); + }else if(rc == MOSQ_ERR_TOPIC_ALIAS_INVALID){ + send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); }else if(rc == MOSQ_ERR_UNKNOWN || rc == MOSQ_ERR_NOMEM){ send__disconnect(context, MQTT_RC_UNSPECIFIED, NULL); } diff --git a/test/broker/03-publish-invalid-utf8.py b/test/broker/03-publish-invalid-utf8.py index 333e9d38..e22b7012 100755 --- a/test/broker/03-publish-invalid-utf8.py +++ b/test/broker/03-publish-invalid-utf8.py @@ -25,7 +25,11 @@ def do_test(proto_ver): time.sleep(0.5) sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) - mosq_test.do_send_receive(sock, publish_packet, b"", "puback") + if proto_ver == 4: + mosq_test.do_send_receive(sock, publish_packet, b"", "puback") + else: + disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_MALFORMED_PACKET) + mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "puback") rc = 0 diff --git a/test/broker/03-publish-long-topic.py b/test/broker/03-publish-long-topic.py index 54fb919f..23d32628 100755 --- a/test/broker/03-publish-long-topic.py +++ b/test/broker/03-publish-long-topic.py @@ -22,7 +22,11 @@ def do_test(proto_ver): try: sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) - mosq_test.do_send_receive(sock, publish_packet, b"", "puback") + if proto_ver == 4: + mosq_test.do_send_receive(sock, publish_packet, b"", "puback") + else: + disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_PROTOCOL_ERROR) + mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "puback") rc = 0 diff --git a/test/broker/13-malformed-publish-v5.py b/test/broker/13-malformed-publish-v5.py new file mode 100755 index 00000000..e40975a6 --- /dev/null +++ b/test/broker/13-malformed-publish-v5.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 + +# Test whether the broker handles malformed packets correctly - PUBLISH +# MQTTv5 + +from mosq_test_helper import * + +rc = 1 + +def write_config(filename, port): + with open(filename, 'w') as f: + f.write("listener %d\n" % (port)) + f.write("maximum_qos 1\n") + f.write("retain_available false\n") + +def do_test(publish_packet, reason_code, error_string): + global rc + + rc = 1 + + keepalive = 10 + connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive) + + connack_props = mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1) + connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_RETAIN_AVAILABLE, 0) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props) + + mid = 0 + disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code) + + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) + mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, error_string=error_string) + rc = 0 + + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +write_config(conf_file, port) +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + # mid == 0 + publish_packet = mosq_test.gen_publish(topic="test/topic", qos=1, mid=0, proto_ver=5) + do_test(publish_packet, mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, "mid == 0") + + # qos > 2 + publish_packet = mosq_test.gen_publish(topic="test/topic", qos=3, mid=1, proto_ver=5) + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "qos > 2") + + # qos > maximum qos + publish_packet = mosq_test.gen_publish(topic="test/topic", qos=2, mid=1, proto_ver=5) + do_test(publish_packet, mqtt5_rc.MQTT_RC_QOS_NOT_SUPPORTED, "qos > maximum qos") + + # retain not supported + publish_packet = mosq_test.gen_publish(topic="test/topic", qos=0, retain=True, proto_ver=5, payload="a") + do_test(publish_packet, mqtt5_rc.MQTT_RC_RETAIN_NOT_SUPPORTED, "retain not supported") + + # Incorrect property + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 0) + publish_packet = mosq_test.gen_publish(topic="test/topic", qos=1, mid=1, proto_ver=5, properties=props) + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Incorrect property") + + # Truncated packet, remaining length only + publish_packet = struct.pack("!BB", 48, 0) + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, remaining length only") + + # Truncated packet, empty topic + publish_packet = struct.pack("!BBH", 48, 2, 0) + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, empty topic") + + # Truncated packet, with topic, no properties + publish_packet = struct.pack("!BBH1s", 48, 3, 1, b"a") + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, no properties") + + # Truncated packet, with topic, no mid + publish_packet = struct.pack("!BBH1s", 48+2, 3, 1, b"a") + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, no mid") + + # Truncated packet, with topic, with mid, no properties + publish_packet = struct.pack("!BBH1sH", 48+2, 5, 1, b"a", 1) + do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, with mid, no properties") + + # Bad topic + publish_packet = mosq_test.gen_publish(topic="#/test/topic", qos=1, mid=1, proto_ver=5) + do_test(publish_packet, mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, "Bad topic") +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + if rc: + print(stde.decode('utf-8')) + exit(rc) diff --git a/test/broker/Makefile b/test/broker/Makefile index a1d4709d..673c640a 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -218,5 +218,6 @@ endif ./12-prop-topic-alias-invalid.py 13 : + ./13-malformed-publish-v5.py ./13-malformed-subscribe-v5.py ./13-malformed-unsubscribe-v5.py diff --git a/test/broker/test.py b/test/broker/test.py index a4354aff..324b89ee 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -182,6 +182,7 @@ tests = [ (1, './12-prop-subpub-payload-format.py'), (1, './12-prop-topic-alias-invalid.py'), + (1, './13-malformed-publish-v5.py'), (1, './13-malformed-subscribe-v5.py'), (1, './13-malformed-unsubscribe-v5.py'), ]