Add client support for outgoing maximum packet size.

pull/1203/head
Roger A. Light 7 years ago
parent 1877f8a326
commit b9b8e0ff2a

@ -24,6 +24,7 @@ Contributors:
#include "messages_mosq.h"
#include "mqtt_protocol.h"
#include "net_mosq.h"
#include "packet_mosq.h"
#include "send_mosq.h"
#include "util_mosq.h"
@ -43,6 +44,8 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
mosquitto_property local_property;
bool have_topic_alias;
int rc;
int tlen = 0;
uint32_t remaining_length;
if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
@ -81,13 +84,24 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in
return MOSQ_ERR_INVAL;
}
}else{
if(mosquitto_validate_utf8(topic, strlen(topic))) return MOSQ_ERR_MALFORMED_UTF8;
tlen = strlen(topic);
if(mosquitto_validate_utf8(topic, tlen)) return MOSQ_ERR_MALFORMED_UTF8;
if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){
return MOSQ_ERR_INVAL;
}
}
if(mosq->maximum_packet_size > 0){
remaining_length = 1 + 2+tlen + payloadlen + property__get_length_all(outgoing_properties);
if(qos > 0){
remaining_length++;
}
if(packet__check_oversize(mosq, remaining_length)){
return MOSQ_ERR_OVERSIZE_PACKET;
}
}
local_mid = mosquitto__mid_generate(mosq);
if(mid){
*mid = local_mid;
@ -161,6 +175,8 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
mosquitto_property local_property;
int i;
int rc;
uint32_t remaining_length = 0;
int slen;
if(!mosq || !sub_count || !sub) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
@ -183,7 +199,16 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
for(i=0; i<sub_count; i++){
if(mosquitto_sub_topic_check(sub[i])) return MOSQ_ERR_INVAL;
if(mosquitto_validate_utf8(sub[i], strlen(sub[i]))) return MOSQ_ERR_MALFORMED_UTF8;
slen = strlen(sub[i]);
if(mosquitto_validate_utf8(sub[i], slen)) return MOSQ_ERR_MALFORMED_UTF8;
remaining_length += 2+slen + 1;
}
if(mosq->maximum_packet_size > 0){
remaining_length += 2 + property__get_length_all(outgoing_properties);
if(packet__check_oversize(mosq, remaining_length)){
return MOSQ_ERR_OVERSIZE_PACKET;
}
}
return send__subscribe(mosq, mid, sub_count, sub, qos|options, outgoing_properties);
@ -206,6 +231,8 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou
mosquitto_property local_property;
int rc;
int i;
uint32_t remaining_length = 0;
int slen;
if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
@ -226,7 +253,16 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou
for(i=0; i<sub_count; i++){
if(mosquitto_sub_topic_check(sub[i])) return MOSQ_ERR_INVAL;
if(mosquitto_validate_utf8(sub[i], strlen(sub[i]))) return MOSQ_ERR_MALFORMED_UTF8;
slen = strlen(sub[i]);
if(mosquitto_validate_utf8(sub[i], slen)) return MOSQ_ERR_MALFORMED_UTF8;
remaining_length += 2+slen;
}
if(mosq->maximum_packet_size > 0){
remaining_length += 2 + property__get_length_all(outgoing_properties);
if(packet__check_oversize(mosq, remaining_length)){
return MOSQ_ERR_OVERSIZE_PACKET;
}
}
return send__unsubscribe(mosq, mid, sub_count, sub, outgoing_properties);

@ -64,6 +64,7 @@ int handle__connack(struct mosquitto *mosq)
mosquitto_property_read_byte(properties, MQTT_PROP_MAXIMUM_QOS, &mosq->maximum_qos, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false);
mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);
mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false);
mosq->send_quota = mosq->send_maximum;

@ -743,6 +743,8 @@ libmosq_EXPORT int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_co
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by
* the broker.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*
* See Also:
* <mosquitto_max_inflight_messages_set>
@ -793,6 +795,8 @@ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const cha
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with PUBLISH.
* MOSQ_ERR_QOS_NOT_SUPPORTED - if the QoS is greater than that supported by
* the broker.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
libmosq_EXPORT int mosquitto_publish_v5(
struct mosquitto *mosq,
@ -825,6 +829,8 @@ libmosq_EXPORT int mosquitto_publish_v5(
* MOSQ_ERR_NOMEM - if an out of memory condition occurred.
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);
@ -882,6 +888,8 @@ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const c
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with SUBSCRIBE.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, int qos, int options, const mosquitto_property *properties);
@ -936,6 +944,8 @@ libmosq_EXPORT int mosquitto_subscribe_v5(struct mosquitto *mosq, int *mid, cons
* MOSQ_ERR_NOMEM - if an out of memory condition occurred.
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, int qos, int options, const mosquitto_property *properties);
@ -958,6 +968,8 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
* MOSQ_ERR_NOMEM - if an out of memory condition occurred.
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);
@ -984,6 +996,8 @@ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const
* MOSQ_ERR_MALFORMED_UTF8 - if the topic is not valid UTF-8
* MOSQ_ERR_DUPLICATE_PROPERTY - if a property is duplicated where it is forbidden.
* MOSQ_ERR_PROTOCOL - if any property is invalid for use with UNSUBSCRIBE.
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
libmosq_EXPORT int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, const char *sub, const mosquitto_property *properties);
@ -1013,6 +1027,8 @@ libmosq_EXPORT int mosquitto_unsubscribe_v5(struct mosquitto *mosq, int *mid, co
* MOSQ_ERR_NOMEM - if an out of memory condition occurred.
* MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
* MOSQ_ERR_MALFORMED_UTF8 - if a topic is not valid UTF-8
* MOSQ_ERR_OVERSIZE_PACKET - if the resulting packet would be larger than
* supported by the broker.
*/
int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count, char *const *const sub, const mosquitto_property *properties);

@ -0,0 +1,59 @@
#!/usr/bin/env python
# Test whether a client publishing an oversize packet correctly.
# The client should try to publish a message that is too big, then the one below which is ok.
# It should also try to subscribe with a too large topic
from mosq_test_helper import *
port = mosq_test.get_lib_port()
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("publish-qos0-test", keepalive=keepalive, proto_ver=5)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MAXIMUM_PACKET_SIZE, 30)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props)
bad_publish_packet = mosq_test.gen_publish("pub/test", qos=0, payload="0123456789012345678", proto_ver=5)
publish_packet = mosq_test.gen_publish("pub/test", qos=0, payload="012345678901234567", proto_ver=5)
disconnect_packet = mosq_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', port))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
env['LD_LIBRARY_PATH'] = '../../lib:../../lib/cpp'
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../lib/python:'+pp
client = mosq_test.start_client(filename=sys.argv[1].replace('/', '-'), cmd=client_args, env=env, port=port)
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if mosq_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if mosq_test.expect_packet(conn, "publish", publish_packet):
if mosq_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
if rc:
(stdo, stde) = client.communicate()
print(stde)
sock.close()
exit(rc)

@ -61,6 +61,7 @@ ifeq ($(WITH_TLS),yes)
#./08-ssl-fake-cacert.py $@/08-ssl-fake-cacert.test
endif
./09-util-topic-tokenise.py $@/09-util-topic-tokenise.test
./11-prop-oversize-packet.py $@/11-prop-oversize-packet.test
./11-prop-send-payload-format.py $@/11-prop-send-payload-format.test
./11-prop-send-content-type.py $@/11-prop-send-content-type.test

@ -0,0 +1,72 @@
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
static int run = -1;
static int sent_mid = -1;
void on_connect(struct mosquitto *mosq, void *obj, int rc)
{
if(rc){
exit(1);
}else{
rc = mosquitto_subscribe(mosq, NULL, "0123456789012345678901234567890", 0);
if(rc != MOSQ_ERR_OVERSIZE_PACKET){
printf("Fail on subscribe\n");
exit(1);
}
rc = mosquitto_unsubscribe(mosq, NULL, "0123456789012345678901234567890");
if(rc != MOSQ_ERR_OVERSIZE_PACKET){
printf("Fail on unsubscribe\n");
exit(1);
}
rc = mosquitto_publish(mosq, &sent_mid, "pub/test", strlen("0123456789012345678"), "0123456789012345678", 0, false);
if(rc != MOSQ_ERR_OVERSIZE_PACKET){
printf("Fail on publish 1\n");
exit(1);
}
rc = mosquitto_publish(mosq, &sent_mid, "pub/test", strlen("012345678901234567"), "012345678901234567", 0, false);
if(rc != MOSQ_ERR_SUCCESS){
printf("Fail on publish 2\n");
exit(1);
}
}
}
void on_publish(struct mosquitto *mosq, void *obj, int mid)
{
if(mid == sent_mid){
mosquitto_disconnect(mosq);
run = 0;
}else{
exit(1);
}
}
int main(int argc, char *argv[])
{
int rc;
struct mosquitto *mosq;
int port = atoi(argv[1]);
mosquitto_lib_init();
mosq = mosquitto_new("publish-qos0-test", true, NULL);
mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_publish_callback_set(mosq, on_publish);
rc = mosquitto_connect(mosq, "localhost", port, 60);
while(run == -1){
rc = mosquitto_loop(mosq, -1, 1);
}
mosquitto_lib_cleanup();
return run;
}

@ -43,6 +43,7 @@ SRC = \
08-ssl-bad-cacert.c \
08-ssl-fake-cacert.c \
09-util-topic-tokenise.c \
11-prop-oversize-packet.c \
11-prop-send-payload-format.c \
11-prop-send-content-type.c

@ -41,6 +41,7 @@ tests = [
(1, ['./08-ssl-connect-cert-auth.py', 'c/08-ssl-connect-cert-auth.test']),
(1, ['./08-ssl-connect-no-auth.py', 'c/08-ssl-connect-no-auth.test']),
(1, ['./09-util-topic-tokenise.py', 'c/09-util-topic-tokenise.test']),
(1, ['./11-prop-oversize-packet.py', 'c/11-prop-oversize-packet.test']),
(1, ['./11-prop-send-payload-format.py', 'c/11-prop-send-payload-format.test']),
(1, ['./11-prop-send-content-type.py', 'c/11-prop-send-content-type.test']),

@ -0,0 +1,43 @@
#include <CUnit/CUnit.h>
#include <CUnit/Basic.h>
#include <mosquitto_internal.h>
#include <util_mosq.h>
static void TEST_maximum_packet_size(void)
{
struct mosquitto mosq;
int rc;
memset(&mosq, 0, sizeof(struct mosquitto));
mosq.maximum_packet_size = 5;
rc = mosquitto_publish(&mosq, NULL, "topic/oversize", strlen("payload"), "payload", 0, 0);
CU_ASSERT_EQUAL(rc, MOSQ_ERR_OVERSIZE_PACKET);
}
/* ========================================================================
* TEST SUITE SETUP
* ======================================================================== */
int init_publish_tests(void)
{
CU_pSuite test_suite = NULL;
test_suite = CU_add_suite("Publish", NULL, NULL);
if(!test_suite){
printf("Error adding CUnit Publish test suite.\n");
return 1;
}
if(0
|| !CU_add_test(test_suite, "v5: Maximum packet size", TEST_maximum_packet_size)
){
printf("Error adding Publish CUnit tests.\n");
return 1;
}
return 0;
}
Loading…
Cancel
Save