From 4c60fad52b7ae0ce4601f13c60df57a757d946f2 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Sun, 21 Mar 2021 22:08:46 +0000 Subject: [PATCH] Enforce receive maximum on MQTT v5. --- ChangeLog.txt | 1 + include/mosquitto.h | 1 + src/database.c | 2 + src/handle_publish.c | 6 +++ src/loop.c | 3 ++ src/read_handle.c | 2 + .../03-publish-qos2-max-inflight-exceeded.py | 50 +++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 9 files changed, 67 insertions(+) create mode 100755 test/broker/03-publish-qos2-max-inflight-exceeded.py diff --git a/ChangeLog.txt b/ChangeLog.txt index 1892b422..d794ae9e 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -10,6 +10,7 @@ Broker: - Increase maximum connection count on Windows from 2048 to 8192 where supported. Closes #2122. - Add kqueue support. +- Enforce receive maximum on MQTT v5. Client library: - Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair diff --git a/include/mosquitto.h b/include/mosquitto.h index cb75a1c6..0dcb28eb 100644 --- a/include/mosquitto.h +++ b/include/mosquitto.h @@ -121,6 +121,7 @@ enum mosq_err_t { MOSQ_ERR_TOPIC_ALIAS_INVALID = 29, MOSQ_ERR_ADMINISTRATIVE_ACTION = 30, MOSQ_ERR_ALREADY_EXISTS = 31, + MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED = 32, }; /* Option values */ diff --git a/src/database.c b/src/database.c index 77eb4520..6947987a 100644 --- a/src/database.c +++ b/src/database.c @@ -550,6 +550,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m if(dir == mosq_md_out && msg->qos > 0){ util__decrement_send_quota(context); + }else if(dir == mosq_md_in && msg->qos > 0){ + util__decrement_receive_quota(context); } if(dir == mosq_md_out && update){ diff --git a/src/handle_publish.c b/src/handle_publish.c index 68359b69..b215b907 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -286,6 +286,12 @@ int handle__publish(struct mosquitto *context) db__message_store_find(context, msg->source_mid, &stored); } if(!stored){ + if(msg->qos > 0 && context->msgs_in.inflight_quota == 0){ + /* Client isn't allowed any more incoming messages, so fail early */ + db__msg_store_free(msg); + return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED; + } + if(msg->qos == 0 || db__ready_for_flight(&context->msgs_in, msg->qos) || db__ready_for_queue(context, msg->qos, &context->msgs_in)){ diff --git a/src/loop.c b/src/loop.c index e17bae69..daee31c2 100644 --- a/src/loop.c +++ b/src/loop.c @@ -362,6 +362,9 @@ void do_disconnect(struct mosquitto *context, int reason) case MOSQ_ERR_ERRNO: log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected: %s.", id, strerror(errno)); break; + case MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED: + log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to exceeding the receive maximum.", id); + break; default: log__printf(NULL, MOSQ_LOG_NOTICE, "Bad socket read/write on client %s: %s", id, mosquitto_strerror(reason)); break; diff --git a/src/read_handle.c b/src/read_handle.c index 416c32ab..f7026b16 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -89,6 +89,8 @@ int handle__packet(struct mosquitto *context) send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL); }else if(rc == MOSQ_ERR_TOPIC_ALIAS_INVALID){ send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); + }else if(rc == MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED){ + send__disconnect(context, MQTT_RC_RECEIVE_MAXIMUM_EXCEEDED, NULL); }else if(rc == MOSQ_ERR_UNKNOWN || rc == MOSQ_ERR_NOMEM){ send__disconnect(context, MQTT_RC_UNSPECIFIED, NULL); } diff --git a/test/broker/03-publish-qos2-max-inflight-exceeded.py b/test/broker/03-publish-qos2-max-inflight-exceeded.py new file mode 100755 index 00000000..c5dd92ae --- /dev/null +++ b/test/broker/03-publish-qos2-max-inflight-exceeded.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 + +# What does the broker do if an MQTT v5 client doesn't respect max_inflight_messages? + +from mosq_test_helper import * + +def do_test(proto_ver): + port = mosq_test.get_port() + + rc = 1 + keepalive = 60 + connect_packet = mosq_test.gen_connect("pub-qos2-inflight-exceeded", keepalive=keepalive, proto_ver=proto_ver) + connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + + try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port, timeout=10) + + for i in range(1, 21): + publish_packet = mosq_test.gen_publish("pub/qos2/max/inflight/exceeded", qos=2, mid=i, payload="message", proto_ver=proto_ver) + pubrec_packet = mosq_test.gen_pubrec(mid=i, proto_ver=proto_ver) + mosq_test.do_send_receive(sock, publish_packet, pubrec_packet) + + i = 21 + publish_packet = mosq_test.gen_publish("pub/qos2/max/inflight/exceeded", qos=2, mid=i, payload="message", proto_ver=proto_ver) + if proto_ver == 5: + disconnect_packet = mosq_test.gen_disconnect(reason_code=mqtt5_rc.MQTT_RC_RECEIVE_MAXIMUM_EXCEEDED, proto_ver=proto_ver) + else: + disconnect_packet = b"" + mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "disconnect") + + rc = 0 + + sock.close() + except mosq_test.TestError: + pass + finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde.decode('utf-8')) + print("proto_ver=%d" % (proto_ver)) + exit(rc) + + +do_test(proto_ver=4) +do_test(proto_ver=5) +exit(0) diff --git a/test/broker/Makefile b/test/broker/Makefile index d74b311b..58f5026c 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -98,6 +98,7 @@ test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14 ./03-publish-qos1-no-subscribers-v5.py ./03-publish-qos1-retain-disabled.py ./03-publish-qos1.py + ./03-publish-qos2-max-inflight-exceeded.py ./03-publish-qos2-max-inflight.py ./03-publish-qos2.py diff --git a/test/broker/test.py b/test/broker/test.py index 2cf8a2b9..73c03ca4 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -81,6 +81,7 @@ tests = [ (1, './03-publish-qos1-no-subscribers-v5.py'), (1, './03-publish-qos1-retain-disabled.py'), (1, './03-publish-qos1.py'), + (1, './03-publish-qos2-max-inflight-exceeded.py'), (1, './03-publish-qos2-max-inflight.py'), (1, './03-publish-qos2.py'),