diff --git a/lib/mosquitto.h b/lib/mosquitto.h index f4770bc5..a11aa5dc 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -93,6 +93,7 @@ enum mosq_err_t { MOSQ_ERR_DUPLICATE_PROPERTY = 22, MOSQ_ERR_TLS_HANDSHAKE = 23, MOSQ_ERR_QOS_NOT_SUPPORTED = 24, + MOSQ_ERR_OVERSIZE_PACKET = 25, }; /* Error values */ diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index a72fc722..001b99aa 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -186,6 +186,7 @@ struct mosquitto { struct mosquitto__packet *out_packet; struct mosquitto_message_all *will; struct mosquitto__alias *aliases; + uint32_t maximum_packet_size; int alias_count; #ifdef WITH_TLS SSL *ssl; diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 509cd7d0..2dac17dc 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -150,6 +150,21 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) } +int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length) +{ + int len; + + if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS; + + len = remaining_length + packet__varint_bytes(remaining_length); + if(len > mosq->maximum_packet_size){ + return MOSQ_ERR_OVERSIZE_PACKET; + }else{ + return MOSQ_ERR_SUCCESS; + } +} + + int packet__write(struct mosquitto *mosq) { ssize_t write_length; diff --git a/lib/packet_mosq.h b/lib/packet_mosq.h index 22728cd7..503fb523 100644 --- a/lib/packet_mosq.h +++ b/lib/packet_mosq.h @@ -27,6 +27,8 @@ int packet__alloc(struct mosquitto__packet *packet); void packet__cleanup(struct mosquitto__packet *packet); int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet); +int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length); + int packet__read_byte(struct mosquitto__packet *packet, uint8_t *byte); int packet__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count); int packet__read_binary(struct mosquitto__packet *packet, uint8_t **data, int *length); diff --git a/lib/send_publish.c b/lib/send_publish.c index 41b5e9ce..e72572c6 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -167,6 +167,15 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, packetlen += proplen + varbytes; } } + if(packet__check_oversize(mosq, packetlen)){ +#ifdef WITH_BROKER + log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH for %s (%d bytes)", mosq->id, packetlen); +#else + log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH (%d bytes)", packetlen); +#endif + return MOSQ_ERR_OVERSIZE_PACKET; + } + packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; diff --git a/src/database.c b/src/database.c index 750c6b4b..591684da 100644 --- a/src/database.c +++ b/src/database.c @@ -946,7 +946,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) switch(tail->state){ case mosq_ms_publish_qos0: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(!rc){ + if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ db__message_remove(db, context, &tail, last); }else{ return rc; @@ -955,10 +955,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) case mosq_ms_publish_qos1: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(!rc){ + if(rc == MOSQ_ERR_SUCCESS){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ tail->state = mosq_ms_wait_for_puback; + }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ + db__message_remove(db, context, &tail, last); }else{ return rc; } @@ -968,10 +970,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) case mosq_ms_publish_qos2: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(!rc){ + if(rc == MOSQ_ERR_SUCCESS){ tail->timestamp = mosquitto_time(); tail->dup = 1; /* Any retry attempts are a duplicate. */ tail->state = mosq_ms_wait_for_pubrec; + }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ + db__message_remove(db, context, &tail, last); }else{ return rc; } diff --git a/src/property_broker.c b/src/property_broker.c index ee76b2c4..654c802b 100644 --- a/src/property_broker.c +++ b/src/property_broker.c @@ -42,6 +42,11 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro context->send_maximum = p->value.i16; context->send_quota = context->send_maximum; + }else if(p->identifier == MQTT_PROP_MAXIMUM_PACKET_SIZE){ + if(p->value.i32 == 0){ + return MOSQ_ERR_PROTOCOL; + } + context->maximum_packet_size = p->value.i32; } p = p->next; } diff --git a/src/send_connack.c b/src/send_connack.c index d51179b0..927d4d1e 100644 --- a/src/send_connack.c +++ b/src/send_connack.c @@ -29,6 +29,7 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i int rc; mosquitto_property *connack_props = NULL; int proplen, varbytes; + uint32_t remaining_length; rc = mosquitto_property_copy_all(&connack_props, properties); if(rc){ @@ -43,30 +44,40 @@ int send__connack(struct mosquitto_db *db, struct mosquitto *context, int ack, i } } - packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); - if(!packet) return MOSQ_ERR_NOMEM; + remaining_length = 2; - packet->command = CMD_CONNACK; - packet->remaining_length = 2; if(context->protocol == mosq_p_mqtt5){ if(reason_code < 128 && db->config->retain_available == false){ rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_RETAIN_AVAILABLE, 0); if(rc){ - mosquitto__free(packet); + mosquitto_property_free_all(&connack_props); return rc; } } /* FIXME - disable support until available */ rc = mosquitto_property_add_byte(&connack_props, MQTT_PROP_SHARED_SUB_AVAILABLE, 0); if(rc){ - mosquitto__free(packet); + mosquitto_property_free_all(&connack_props); return rc; } proplen = property__get_length_all(connack_props); varbytes = packet__varint_bytes(proplen); - packet->remaining_length += proplen + varbytes; + remaining_length += proplen + varbytes; + } + + if(packet__check_oversize(context, remaining_length)){ + mosquitto_property_free_all(&connack_props); + mosquitto__free(packet); + return MOSQ_ERR_OVERSIZE_PACKET; } + + packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); + if(!packet) return MOSQ_ERR_NOMEM; + + packet->command = CMD_CONNACK; + packet->remaining_length = remaining_length; + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); diff --git a/test/broker/12-prop-maximum-packet-size-connect.py b/test/broker/12-prop-maximum-packet-size-connect.py new file mode 100755 index 00000000..11936088 --- /dev/null +++ b/test/broker/12-prop-maximum-packet-size-connect.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python + +# Test whether setting maximum packet size to smaller than a CONNACK packet +# results in the CONNECT being rejected. +# MQTTv5 + +from mosq_test_helper import * + +rc = 1 + +keepalive = 10 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 2) +connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive, properties=props) + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, "", port=port) + # Exception occurs if connack packet returned + rc = 0 +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/12-prop-maximum-packet-size-publish.py b/test/broker/12-prop-maximum-packet-size-publish.py new file mode 100755 index 00000000..9db5ad39 --- /dev/null +++ b/test/broker/12-prop-maximum-packet-size-publish.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +# Test whether maximum packet size is honoured on a PUBLISH to a client +# MQTTv5 + +from mosq_test_helper import * + +rc = 1 + +keepalive = 10 +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 20) +connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive, properties=props) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0, proto_ver=5) +suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5) + +publish1_packet = mosq_test.gen_publish(topic="test/topic", qos=0, payload="12345678901234567890", proto_ver=5) +publish2_packet = mosq_test.gen_publish(topic="test/topic", qos=0, payload="67890", proto_ver=5) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet) + sock.send(publish1_packet) + # We shouldn't receive the publish here because it is > MAXIMUM_PACKET_SIZE + mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet) + mosq_test.do_send_receive(sock, publish2_packet, publish2_packet) + mosq_test.do_send_receive(sock, pingreq_packet, pingresp_packet) + rc = 0 +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index 3b5b6fe1..95c0458f 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -175,3 +175,5 @@ endif ./12-prop-server-keepalive.py ./12-prop-response-topic.py ./12-prop-response-topic-correlation-data.py + ./12-prop-maximum-packet-size-connect.py + ./12-prop-maximum-packet-size-publish.py diff --git a/test/broker/test.py b/test/broker/test.py index 526f5860..e35627d6 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -142,6 +142,8 @@ tests = [ (1, './12-prop-server-keepalive.py'), (1, './12-prop-response-topic.py'), (1, './12-prop-response-topic-correlation-data.py'), + (1, './12-prop-maximum-packet-size-connect.py'), + (1, './12-prop-maximum-packet-size-publish.py'), ] ptest.run_tests(tests)