From b9b8e0ff2a8ae4240a82629a7bb9df8b2834d8dd Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 19 Feb 2019 14:57:31 +0000 Subject: [PATCH] Add client support for outgoing maximum packet size. --- lib/actions.c | 42 ++++++++++++++-- lib/handle_connack.c | 1 + lib/mosquitto.h | 16 +++++++ test/lib/11-prop-oversize-packet.py | 59 +++++++++++++++++++++++ test/lib/Makefile | 1 + test/lib/c/11-prop-oversize-packet.c | 72 ++++++++++++++++++++++++++++ test/lib/c/Makefile | 1 + test/lib/test.py | 1 + test/unit/publish_test.c | 43 +++++++++++++++++ 9 files changed, 233 insertions(+), 3 deletions(-) create mode 100755 test/lib/11-prop-oversize-packet.py create mode 100644 test/lib/c/11-prop-oversize-packet.c create mode 100644 test/unit/publish_test.c diff --git a/lib/actions.c b/lib/actions.c index 3779318e..cc3c4222 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -24,6 +24,7 @@ Contributors: #include "messages_mosq.h" #include "mqtt_protocol.h" #include "net_mosq.h" +#include "packet_mosq.h" #include "send_mosq.h" #include "util_mosq.h" @@ -43,6 +44,8 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in mosquitto_property local_property; bool have_topic_alias; int rc; + int tlen = 0; + uint32_t remaining_length; if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; @@ -81,13 +84,24 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in return MOSQ_ERR_INVAL; } }else{ - if(mosquitto_validate_utf8(topic, strlen(topic))) return MOSQ_ERR_MALFORMED_UTF8; + tlen = strlen(topic); + if(mosquitto_validate_utf8(topic, tlen)) return MOSQ_ERR_MALFORMED_UTF8; if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE; if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){ return MOSQ_ERR_INVAL; } } + if(mosq->maximum_packet_size > 0){ + remaining_length = 1 + 2+tlen + payloadlen + property__get_length_all(outgoing_properties); + if(qos > 0){ + remaining_length++; + } + if(packet__check_oversize(mosq, remaining_length)){ + return MOSQ_ERR_OVERSIZE_PACKET; + } + } + local_mid = mosquitto__mid_generate(mosq); if(mid){ *mid = local_mid; @@ -161,6 +175,8 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count mosquitto_property local_property; int i; int rc; + uint32_t remaining_length = 0; + int slen; if(!mosq || !sub_count || !sub) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; @@ -183,7 +199,16 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count for(i=0; imaximum_packet_size > 0){ + remaining_length += 2 + property__get_length_all(outgoing_properties); + if(packet__check_oversize(mosq, remaining_length)){ + return MOSQ_ERR_OVERSIZE_PACKET; + } } return send__subscribe(mosq, mid, sub_count, sub, qos|options, outgoing_properties); @@ -206,6 +231,8 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou mosquitto_property local_property; int rc; int i; + uint32_t remaining_length = 0; + int slen; if(!mosq) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; @@ -226,7 +253,16 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou for(i=0; imaximum_packet_size > 0){ + remaining_length += 2 + property__get_length_all(outgoing_properties); + if(packet__check_oversize(mosq, remaining_length)){ + return MOSQ_ERR_OVERSIZE_PACKET; + } } return send__unsubscribe(mosq, mid, sub_count, sub, outgoing_properties); diff --git a/lib/handle_connack.c b/lib/handle_connack.c index 1b13e28a..1f8a9400 100644 --- a/lib/handle_connack.c +++ b/lib/handle_connack.c @@ -64,6 +64,7 @@ int handle__connack(struct mosquitto *mosq) mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false); mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false); mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false); + mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false); mosq->send_quota = mosq->send_maximum; diff --git a/lib/mosquitto.h b/lib/mosquitto.h index a11aa5dc..7a5e41d3 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -743,6 +743,8 @@ libmosq_EXPORT int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_co * MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8 * MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by * the broker. + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. * * See Also: * @@ -793,6 +795,8 @@ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const cha * MOSQ_ERR_PROTOCOL - if any property is invalid for use with PUBLISH. * MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by * the broker. + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ libmosq_EXPORT int mosquitto_publish_v5( struct mosquitto *mosq, @@ -825,6 +829,8 @@ libmosq_EXPORT int mosquitto_publish_v5( * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8 + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos); @@ -882,6 +888,8 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c * MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8 * MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden. * MOSQ_ERR_PROTOCOL - if any property is invalid for use with SUBSCRIBE. + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties); @@ -936,6 +944,8 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8 + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties); @@ -958,6 +968,8 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8 + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub); @@ -984,6 +996,8 @@ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const * MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8 * MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden. * MOSQ_ERR_PROTOCOL - if any property is invalid for use with UNSUBSCRIBE. + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ libmosq_EXPORT int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, const mosquitto_property *properties); @@ -1013,6 +1027,8 @@ libmosq_EXPORT int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, co * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8 + * MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than + * supported by the broker. */ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, const mosquitto_property *properties); diff --git a/test/lib/11-prop-oversize-packet.py b/test/lib/11-prop-oversize-packet.py new file mode 100755 index 00000000..946b7139 --- /dev/null +++ b/test/lib/11-prop-oversize-packet.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +# Test whether a client publishing an oversize packet correctly. +# The client should try to publish a message that is too big, then the one below which is ok. +# It should also try to subscribe with a too large topic + +from mosq_test_helper import * + +port = mosq_test.get_lib_port() + +rc = 1 +keepalive = 60 +connect_packet = mosq_test.gen_connect("publish-qos0-test", keepalive=keepalive, proto_ver=5) +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 30) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props) + +bad_publish_packet = mosq_test.gen_publish("pub/test", qos=0, payload="0123456789012345678", proto_ver=5) +publish_packet = mosq_test.gen_publish("pub/test", qos=0, payload="012345678901234567", proto_ver=5) + +disconnect_packet = mosq_test.gen_disconnect() + + +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.settimeout(10) +sock.bind(('', port)) +sock.listen(5) + +client_args = sys.argv[1:] +env = dict(os.environ) +env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp' +try: + pp = env['PYTHONPATH'] +except KeyError: + pp = '' +env['PYTHONPATH'] = '../../lib/python:'+pp +client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env, port=port) + +try: + (conn, address) = sock.accept() + conn.settimeout(10) + + if mosq_test.expect_packet(conn, "connect", connect_packet): + conn.send(connack_packet) + + if mosq_test.expect_packet(conn, "publish", publish_packet): + if mosq_test.expect_packet(conn, "disconnect", disconnect_packet): + rc = 0 + + conn.close() +finally: + client.terminate() + client.wait() + if rc: + (stdo, stde) = client.communicate() + print(stde) + sock.close() + +exit(rc) diff --git a/test/lib/Makefile b/test/lib/Makefile index 9e55b52a..b9d9b96b 100644 --- a/test/lib/Makefile +++ b/test/lib/Makefile @@ -61,6 +61,7 @@ ifeq ($(WITH_TLS),yes) #./08-ssl-fake-cacert.py $@/08-ssl-fake-cacert.test endif ./09-util-topic-tokenise.py $@/09-util-topic-tokenise.test + ./11-prop-oversize-packet.py $@/11-prop-oversize-packet.test ./11-prop-send-payload-format.py $@/11-prop-send-payload-format.test ./11-prop-send-content-type.py $@/11-prop-send-content-type.test diff --git a/test/lib/c/11-prop-oversize-packet.c b/test/lib/c/11-prop-oversize-packet.c new file mode 100644 index 00000000..c96526f5 --- /dev/null +++ b/test/lib/c/11-prop-oversize-packet.c @@ -0,0 +1,72 @@ +#include +#include +#include +#include +#include + +static int run = -1; +static int sent_mid = -1; + +void on_connect(struct mosquitto *mosq, void *obj, int rc) +{ + if(rc){ + exit(1); + }else{ + rc = mosquitto_subscribe(mosq, NULL, "0123456789012345678901234567890", 0); + if(rc != MOSQ_ERR_OVERSIZE_PACKET){ + printf("Fail on subscribe\n"); + exit(1); + } + + rc = mosquitto_unsubscribe(mosq, NULL, "0123456789012345678901234567890"); + if(rc != MOSQ_ERR_OVERSIZE_PACKET){ + printf("Fail on unsubscribe\n"); + exit(1); + } + + rc = mosquitto_publish(mosq, &sent_mid, "pub/test", strlen("0123456789012345678"), "0123456789012345678", 0, false); + if(rc != MOSQ_ERR_OVERSIZE_PACKET){ + printf("Fail on publish 1\n"); + exit(1); + } + rc = mosquitto_publish(mosq, &sent_mid, "pub/test", strlen("012345678901234567"), "012345678901234567", 0, false); + if(rc != MOSQ_ERR_SUCCESS){ + printf("Fail on publish 2\n"); + exit(1); + } + } +} + +void on_publish(struct mosquitto *mosq, void *obj, int mid) +{ + if(mid == sent_mid){ + mosquitto_disconnect(mosq); + run = 0; + }else{ + exit(1); + } +} + +int main(int argc, char *argv[]) +{ + int rc; + struct mosquitto *mosq; + + int port = atoi(argv[1]); + + mosquitto_lib_init(); + + mosq = mosquitto_new("publish-qos0-test", true, NULL); + mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5); + mosquitto_connect_callback_set(mosq, on_connect); + mosquitto_publish_callback_set(mosq, on_publish); + + rc = mosquitto_connect(mosq, "localhost", port, 60); + + while(run == -1){ + rc = mosquitto_loop(mosq, -1, 1); + } + + mosquitto_lib_cleanup(); + return run; +} diff --git a/test/lib/c/Makefile b/test/lib/c/Makefile index d89ab9ab..4eed7da0 100644 --- a/test/lib/c/Makefile +++ b/test/lib/c/Makefile @@ -43,6 +43,7 @@ SRC = \ 08-ssl-bad-cacert.c \ 08-ssl-fake-cacert.c \ 09-util-topic-tokenise.c \ + 11-prop-oversize-packet.c \ 11-prop-send-payload-format.c \ 11-prop-send-content-type.c diff --git a/test/lib/test.py b/test/lib/test.py index 68df8855..b6520ef5 100755 --- a/test/lib/test.py +++ b/test/lib/test.py @@ -41,6 +41,7 @@ tests = [ (1, ['./08-ssl-connect-cert-auth.py', 'c/08-ssl-connect-cert-auth.test']), (1, ['./08-ssl-connect-no-auth.py', 'c/08-ssl-connect-no-auth.test']), (1, ['./09-util-topic-tokenise.py', 'c/09-util-topic-tokenise.test']), + (1, ['./11-prop-oversize-packet.py', 'c/11-prop-oversize-packet.test']), (1, ['./11-prop-send-payload-format.py', 'c/11-prop-send-payload-format.test']), (1, ['./11-prop-send-content-type.py', 'c/11-prop-send-content-type.test']), diff --git a/test/unit/publish_test.c b/test/unit/publish_test.c new file mode 100644 index 00000000..6371db3c --- /dev/null +++ b/test/unit/publish_test.c @@ -0,0 +1,43 @@ +#include +#include + +#include +#include + + +static void TEST_maximum_packet_size(void) +{ + struct mosquitto mosq; + int rc; + + memset(&mosq, 0, sizeof(struct mosquitto)); + + mosq.maximum_packet_size = 5; + rc = mosquitto_publish(&mosq, NULL, "topic/oversize", strlen("payload"), "payload", 0, 0); + CU_ASSERT_EQUAL(rc, MOSQ_ERR_OVERSIZE_PACKET); +} + +/* ======================================================================== + * TEST SUITE SETUP + * ======================================================================== */ + +int init_publish_tests(void) +{ + CU_pSuite test_suite = NULL; + + test_suite = CU_add_suite("Publish", NULL, NULL); + if(!test_suite){ + printf("Error adding CUnit Publish test suite.\n"); + return 1; + } + + if(0 + || !CU_add_test(test_suite, "v5: Maximum packet size", TEST_maximum_packet_size) + ){ + + printf("Error adding Publish CUnit tests.\n"); + return 1; + } + + return 0; +}