Enforce receive maximum on MQTT v5.

pull/2157/head
Roger Light 5 years ago
parent 898da756ed
commit 4c60fad52b

@ -10,6 +10,7 @@ Broker:
- Increase maximum connection count on Windows from 2048 to 8192 where
supported. Closes #2122.
- Add kqueue support.
- Enforce receive maximum on MQTT v5.
Client library:
- Add MOSQ_OPT_DISABLE_SOCKETPAIR to allow the disabling of the socketpair

@ -121,6 +121,7 @@ enum mosq_err_t {
MOSQ_ERR_TOPIC_ALIAS_INVALID = 29,
MOSQ_ERR_ADMINISTRATIVE_ACTION = 30,
MOSQ_ERR_ALREADY_EXISTS = 31,
MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED = 32,
};
/* Option values */

@ -550,6 +550,8 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
if(dir == mosq_md_out && msg->qos > 0){
util__decrement_send_quota(context);
}else if(dir == mosq_md_in && msg->qos > 0){
util__decrement_receive_quota(context);
}
if(dir == mosq_md_out && update){

@ -286,6 +286,12 @@ int handle__publish(struct mosquitto *context)
db__message_store_find(context, msg->source_mid, &stored);
}
if(!stored){
if(msg->qos > 0 && context->msgs_in.inflight_quota == 0){
/* Client isn't allowed any more incoming messages, so fail early */
db__msg_store_free(msg);
return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED;
}
if(msg->qos == 0
|| db__ready_for_flight(&context->msgs_in, msg->qos)
|| db__ready_for_queue(context, msg->qos, &context->msgs_in)){

@ -362,6 +362,9 @@ void do_disconnect(struct mosquitto *context, int reason)
case MOSQ_ERR_ERRNO:
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected: %s.", id, strerror(errno));
break;
case MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED:
log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s disconnected due to exceeding the receive maximum.", id);
break;
default:
log__printf(NULL, MOSQ_LOG_NOTICE, "Bad socket read/write on client %s: %s", id, mosquitto_strerror(reason));
break;

@ -89,6 +89,8 @@ int handle__packet(struct mosquitto *context)
send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL);
}else if(rc == MOSQ_ERR_TOPIC_ALIAS_INVALID){
send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL);
}else if(rc == MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED){
send__disconnect(context, MQTT_RC_RECEIVE_MAXIMUM_EXCEEDED, NULL);
}else if(rc == MOSQ_ERR_UNKNOWN || rc == MOSQ_ERR_NOMEM){
send__disconnect(context, MQTT_RC_UNSPECIFIED, NULL);
}

@ -0,0 +1,50 @@
#!/usr/bin/env python3
# What does the broker do if an MQTT v5 client doesn't respect max_inflight_messages?
from mosq_test_helper import *
def do_test(proto_ver):
port = mosq_test.get_port()
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("pub-qos2-inflight-exceeded", keepalive=keepalive, proto_ver=proto_ver)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port, timeout=10)
for i in range(1, 21):
publish_packet = mosq_test.gen_publish("pub/qos2/max/inflight/exceeded", qos=2, mid=i, payload="message", proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=i, proto_ver=proto_ver)
mosq_test.do_send_receive(sock, publish_packet, pubrec_packet)
i = 21
publish_packet = mosq_test.gen_publish("pub/qos2/max/inflight/exceeded", qos=2, mid=i, payload="message", proto_ver=proto_ver)
if proto_ver == 5:
disconnect_packet = mosq_test.gen_disconnect(reason_code=mqtt5_rc.MQTT_RC_RECEIVE_MAXIMUM_EXCEEDED, proto_ver=proto_ver)
else:
disconnect_packet = b""
mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, "disconnect")
rc = 0
sock.close()
except mosq_test.TestError:
pass
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
print("proto_ver=%d" % (proto_ver))
exit(rc)
do_test(proto_ver=4)
do_test(proto_ver=5)
exit(0)

@ -98,6 +98,7 @@ test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14
./03-publish-qos1-no-subscribers-v5.py
./03-publish-qos1-retain-disabled.py
./03-publish-qos1.py
./03-publish-qos2-max-inflight-exceeded.py
./03-publish-qos2-max-inflight.py
./03-publish-qos2.py

@ -81,6 +81,7 @@ tests = [
(1, './03-publish-qos1-no-subscribers-v5.py'),
(1, './03-publish-qos1-retain-disabled.py'),
(1, './03-publish-qos1.py'),
(1, './03-publish-qos2-max-inflight-exceeded.py'),
(1, './03-publish-qos2-max-inflight.py'),
(1, './03-publish-qos2.py'),

Loading…
Cancel
Save