From 9dd8d1e054249c6c8202dc8847317f565d1cdd51 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Wed, 23 Jan 2019 11:44:19 +0000 Subject: [PATCH] Handle mismatched handshakes properly. For example, a QoS1 PUBLISH with QoS2 reply. --- ChangeLog.txt | 1 + lib/handle_pubackcomp.c | 9 ++- lib/handle_pubrec.c | 4 +- lib/handle_pubrel.c | 10 ++- lib/messages_mosq.c | 17 ++++-- lib/messages_mosq.h | 6 +- src/database.c | 19 ++++-- src/mosquitto_broker_internal.h | 4 +- test/broker/02-subpub-qos1-bad-pubcomp.py | 69 +++++++++++++++++++++ test/broker/02-subpub-qos1-bad-pubrec.py | 65 ++++++++++++++++++++ test/broker/02-subpub-qos2-bad-puback-1.py | 68 +++++++++++++++++++++ test/broker/02-subpub-qos2-bad-puback-2.py | 71 ++++++++++++++++++++++ test/broker/02-subpub-qos2-bad-pubcomp.py | 68 +++++++++++++++++++++ test/broker/Makefile | 5 ++ test/broker/ptest.py | 5 ++ 15 files changed, 401 insertions(+), 20 deletions(-) create mode 100755 test/broker/02-subpub-qos1-bad-pubcomp.py create mode 100755 test/broker/02-subpub-qos1-bad-pubrec.py create mode 100755 test/broker/02-subpub-qos2-bad-puback-1.py create mode 100755 test/broker/02-subpub-qos2-bad-puback-2.py create mode 100755 test/broker/02-subpub-qos2-bad-pubcomp.py diff --git a/ChangeLog.txt b/ChangeLog.txt index b9faac5a..6bb4cae9 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -4,6 +4,7 @@ Broker: - Fixed comment handling for config options that have optional arguments. - Improved documentation around bridge topic remapping. +- Handle mismatched handshakes (e.g. QoS1 PUBLISH with QoS2 reply) properly. Library: - Fix TLS connections not working over SOCKS. diff --git a/lib/handle_pubackcomp.c b/lib/handle_pubackcomp.c index 37788456..b86c26d7 100644 --- a/lib/handle_pubackcomp.c +++ b/lib/handle_pubackcomp.c @@ -44,15 +44,17 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) { uint16_t mid; int rc; + int qos; assert(mosq); rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; + qos = type[3] == 'A'?1:2; /* pubAck or pubComp */ #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid); if(mid){ - rc = db__message_delete(db, mosq, mid, mosq_md_out); + rc = db__message_delete(db, mosq, mid, mosq_md_out, qos); if(rc == MOSQ_ERR_NOT_FOUND){ log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received %s from %s for an unknown packet identifier %d.", type, mosq->id, mid); return MOSQ_ERR_SUCCESS; @@ -63,7 +65,10 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid); - if(!message__delete(mosq, mid, mosq_md_out)){ + rc = message__delete(mosq, mid, mosq_md_out, qos); + if(rc){ + return rc; + }else{ /* Only inform the client the message has been sent once. */ pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_publish){ diff --git a/lib/handle_pubrec.c b/lib/handle_pubrec.c index 59eea1b7..a610af39 100644 --- a/lib/handle_pubrec.c +++ b/lib/handle_pubrec.c @@ -46,11 +46,11 @@ int handle__pubrec(struct mosquitto *mosq) #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid); - rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp); + rc = db__message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp, 2); #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid); - rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp); + rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp, 2); #endif if(rc == MOSQ_ERR_NOT_FOUND){ log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREC from %s for an unknown packet identifier %d.", mosq->id, mid); diff --git a/lib/handle_pubrel.c b/lib/handle_pubrel.c index 56660987..81f33ebb 100644 --- a/lib/handle_pubrel.c +++ b/lib/handle_pubrel.c @@ -55,7 +55,10 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq) #ifdef WITH_BROKER log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid); - if(db__message_release(db, mosq, mid, mosq_md_in)){ + rc = db__message_release(db, mosq, mid, mosq_md_in); + if(rc == MOSQ_ERR_PROTOCOL){ + return rc; + }else if(rc != MOSQ_ERR_SUCCESS){ /* Message not found. Still send a PUBCOMP anyway because this could be * due to a repeated PUBREL after a client has reconnected. */ log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received PUBREL from %s for an unknown packet identifier %d.", mosq->id, mid); @@ -63,7 +66,10 @@ int handle__pubrel(struct mosquitto_db *db, struct mosquitto *mosq) #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid); - if(!message__remove(mosq, mid, mosq_md_in, &message)){ + rc = message__remove(mosq, mid, mosq_md_in, &message, 2); + if(rc){ + return rc; + }else{ /* Only pass the message on if we have removed it from the queue - this * prevents multiple callbacks for the same message. */ pthread_mutex_lock(&mosq->callback_mutex); diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index f6c50b8b..c7359446 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -82,13 +82,13 @@ int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto return MOSQ_ERR_SUCCESS; } -int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir) +int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos) { struct mosquitto_message_all *message; int rc; assert(mosq); - rc = message__remove(mosq, mid, dir, &message); + rc = message__remove(mosq, mid, dir, &message, qos); if(rc == MOSQ_ERR_SUCCESS){ message__cleanup(&message); } @@ -218,7 +218,7 @@ void message__reconnect_reset(struct mosquitto *mosq) pthread_mutex_unlock(&mosq->out_message_mutex); } -int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message) +int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos) { struct mosquitto_message_all *cur, *prev = NULL; bool found = false; @@ -231,6 +231,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir cur = mosq->out_messages; while(cur){ if(cur->msg.mid == mid){ + if(cur->msg.qos != qos){ + return MOSQ_ERR_PROTOCOL; + } if(prev){ prev->next = cur->next; }else{ @@ -287,6 +290,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir cur = mosq->in_messages; while(cur){ if(cur->msg.mid == mid){ + if(cur->msg.qos != qos){ + return MOSQ_ERR_PROTOCOL; + } if(prev){ prev->next = cur->next; }else{ @@ -370,7 +376,7 @@ void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_re { } -int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state) +int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos) { struct mosquitto_message_all *message; assert(mosq); @@ -379,6 +385,9 @@ int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg message = mosq->out_messages; while(message){ if(message->msg.mid == mid){ + if(message->msg.qos != qos){ + return MOSQ_ERR_PROTOCOL; + } message->state = state; message->timestamp = mosquitto_time(); pthread_mutex_unlock(&mosq->out_message_mutex); diff --git a/lib/messages_mosq.h b/lib/messages_mosq.h index 85b82185..42385003 100644 --- a/lib/messages_mosq.h +++ b/lib/messages_mosq.h @@ -21,11 +21,11 @@ Contributors: void message__cleanup_all(struct mosquitto *mosq); void message__cleanup(struct mosquitto_message_all **message); -int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir); +int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos); int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir); void message__reconnect_reset(struct mosquitto *mosq); -int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message); +int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos); void message__retry_check(struct mosquitto *mosq); -int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state); +int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state, int qos); #endif diff --git a/src/database.c b/src/database.c index d2f482d1..2f76de12 100644 --- a/src/database.c +++ b/src/database.c @@ -288,7 +288,7 @@ void db__message_dequeue_first(struct mosquitto *context) msg->next = NULL; } -int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) +int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos) { struct mosquitto_client_msg *tail, *last = NULL; int msg_index = 0; @@ -299,6 +299,11 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 while(tail){ msg_index++; if(tail->mid == mid && tail->direction == dir){ + if(tail->qos != qos){ + return MOSQ_ERR_PROTOCOL; + }else if(qos == 2 && tail->state != mosq_ms_wait_for_pubcomp){ + return MOSQ_ERR_PROTOCOL; + } msg_index--; db__message_remove(db, context, &tail, last); }else{ @@ -509,13 +514,16 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 #endif } -int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state) +int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos) { struct mosquitto_client_msg *tail; tail = context->inflight_msgs; while(tail){ if(tail->mid == mid && tail->direction == dir){ + if(tail->qos != qos){ + return MOSQ_ERR_PROTOCOL; + } tail->state = state; tail->timestamp = mosquitto_time(); return MOSQ_ERR_SUCCESS; @@ -780,7 +788,6 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) { struct mosquitto_client_msg *tail, *last = NULL; - int qos; int retain; char *topic; char *source_id; @@ -793,7 +800,9 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint while(tail){ msg_index++; if(tail->mid == mid && tail->direction == dir){ - qos = tail->store->qos; + if(tail->store->qos != 2){ + return MOSQ_ERR_PROTOCOL; + } topic = tail->store->topic; retain = tail->retain; source_id = tail->store->source_id; @@ -802,7 +811,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint * denied/dropped and is being processed so the client doesn't * keep resending it. That means we don't send it to other * clients. */ - if(!topic || !sub__messages_queue(db, source_id, topic, qos, retain, &tail->store)){ + if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){ db__message_remove(db, context, &tail, last); deleted = true; }else{ diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index bf13eebb..0b69a8cc 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -545,10 +545,10 @@ int persist__restore(struct mosquitto_db *db); void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes); /* Return the number of in-flight messages in count. */ int db__message_count(int *count); -int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); +int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos); int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored); int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); -int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state); +int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state, int qos); int db__message_write(struct mosquitto_db *db, struct mosquitto *context); void db__message_dequeue_first(struct mosquitto *context); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context); diff --git a/test/broker/02-subpub-qos1-bad-pubcomp.py b/test/broker/02-subpub-qos1-bad-pubcomp.py new file mode 100755 index 00000000..6c38d9b6 --- /dev/null +++ b/test/broker/02-subpub-qos1-bad-pubcomp.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +# Test what the broker does if receiving a PUBCOMP in response to a QoS 1 PUBLISH. + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test +import time + +rc = 1 +keepalive = 60 + +connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1) +suback_packet = mosq_test.gen_suback(mid, 1) + +mid = 1 +publish_packet2 = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message") + + +helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive) +helper_connack = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message") +puback1s_packet = mosq_test.gen_puback(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message") +pubcomp1r_packet = mosq_test.gen_pubcomp(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s") + helper.close() + + if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet): + sock.send(pubcomp1r_packet) + sock.send(pingreq_packet) + p = sock.recv(len(pingresp_packet)) + if len(p) == 0: + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/02-subpub-qos1-bad-pubrec.py b/test/broker/02-subpub-qos1-bad-pubrec.py new file mode 100755 index 00000000..9f926fd0 --- /dev/null +++ b/test/broker/02-subpub-qos1-bad-pubrec.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +# Test what the broker does if receiving a PUBREC in response to a QoS 1 PUBLISH. + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test +import time + +rc = 1 +keepalive = 60 + +connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos1", 1) +suback_packet = mosq_test.gen_suback(mid, 1) + +helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive) +helper_connack = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message") +puback1s_packet = mosq_test.gen_puback(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish("subpub/qos1", qos=1, mid=mid, payload="message") +pubrec1r_packet = mosq_test.gen_pubrec(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, puback1s_packet, "puback 1s") + helper.close() + + if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet): + sock.send(pubrec1r_packet) + sock.send(pingreq_packet) + p = sock.recv(len(pingresp_packet)) + if len(p) == 0: + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/02-subpub-qos2-bad-puback-1.py b/test/broker/02-subpub-qos2-bad-puback-1.py new file mode 100755 index 00000000..e228c3d5 --- /dev/null +++ b/test/broker/02-subpub-qos2-bad-puback-1.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +# Test what the broker does if receiving a PUBACK in response to a QoS 2 PUBLISH. + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test +import time + +rc = 1 +keepalive = 60 + +connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2) +suback_packet = mosq_test.gen_suback(mid, 2) + +helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive) +helper_connack = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +pubrec1s_packet = mosq_test.gen_pubrec(mid) +pubrel1s_packet = mosq_test.gen_pubrel(mid) +pubcomp1s_packet = mosq_test.gen_pubcomp(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +puback1r_packet = mosq_test.gen_puback(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s") + mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s") + helper.close() + + if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet): + sock.send(puback1r_packet) + sock.send(pingreq_packet) + p = sock.recv(len(pingresp_packet)) + if len(p) == 0: + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/02-subpub-qos2-bad-puback-2.py b/test/broker/02-subpub-qos2-bad-puback-2.py new file mode 100755 index 00000000..67189990 --- /dev/null +++ b/test/broker/02-subpub-qos2-bad-puback-2.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python + +# Test what the broker does if receiving a PUBACK in response to a QoS 2 PUBREL. + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test +import time + +rc = 1 +keepalive = 60 + +connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2) +suback_packet = mosq_test.gen_suback(mid, 2) + +helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive) +helper_connack = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +pubrec1s_packet = mosq_test.gen_pubrec(mid) +pubrel1s_packet = mosq_test.gen_pubrel(mid) +pubcomp1s_packet = mosq_test.gen_pubcomp(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +pubrec1r_packet = mosq_test.gen_pubrec(mid) +pubrel1r_packet = mosq_test.gen_pubrel(mid) +puback1r_packet = mosq_test.gen_puback(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s") + mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s") + helper.close() + + if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet): + mosq_test.do_send_receive(sock, pubrec1s_packet, pubrel1s_packet, "pubrel 1r") + sock.send(puback1r_packet) + sock.send(pingreq_packet) + p = sock.recv(len(pingresp_packet)) + if len(p) == 0: + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/02-subpub-qos2-bad-pubcomp.py b/test/broker/02-subpub-qos2-bad-pubcomp.py new file mode 100755 index 00000000..acaac552 --- /dev/null +++ b/test/broker/02-subpub-qos2-bad-pubcomp.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +# Test what the broker does if receiving a PUBCOMP in response to a QoS 2 PUBLISH. + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test +import time + +rc = 1 +keepalive = 60 + +connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive) +connack_packet = mosq_test.gen_connack(rc=0) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2) +suback_packet = mosq_test.gen_suback(mid, 2) + +helper_connect = mosq_test.gen_connect("helper", keepalive=keepalive) +helper_connack = mosq_test.gen_connack(rc=0) + +mid = 1 +publish1s_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +pubrec1s_packet = mosq_test.gen_pubrec(mid) +pubrel1s_packet = mosq_test.gen_pubrel(mid) +pubcomp1s_packet = mosq_test.gen_pubcomp(mid) + +mid = 1 +publish1r_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message") +pubcomp1r_packet = mosq_test.gen_pubcomp(mid) + +pingreq_packet = mosq_test.gen_pingreq() +pingresp_packet = mosq_test.gen_pingresp() + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port) + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + + helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port) + mosq_test.do_send_receive(helper, publish1s_packet, pubrec1s_packet, "pubrec 1s") + mosq_test.do_send_receive(helper, pubrel1s_packet, pubcomp1s_packet, "pubcomp 1s") + helper.close() + + if mosq_test.expect_packet(sock, "publish 1r", publish1r_packet): + sock.send(pubcomp1r_packet) + sock.send(pingreq_packet) + p = sock.recv(len(pingresp_packet)) + if len(p) == 0: + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index c70f1f7d..08f1b0ff 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -45,6 +45,11 @@ endif ./02-subpub-qos0.py ./02-subpub-qos1.py ./02-subpub-qos2.py + ./02-subpub-qos1-bad-pubrec.py + ./02-subpub-qos1-bad-pubcomp.py + ./02-subpub-qos2-bad-puback-1.py + ./02-subpub-qos2-bad-puback-2.py + ./02-subpub-qos2-bad-pubcomp.py ./02-unsubscribe-qos0.py ./02-unsubscribe-qos1.py ./02-unsubscribe-qos2.py diff --git a/test/broker/ptest.py b/test/broker/ptest.py index f39049e1..a9d9a9a0 100755 --- a/test/broker/ptest.py +++ b/test/broker/ptest.py @@ -28,6 +28,11 @@ tests = [ (1, './02-subpub-qos0.py'), (1, './02-subpub-qos1.py'), (1, './02-subpub-qos2.py'), + (1, './02-subpub-qos1-bad-pubrec.py'), + (1, './02-subpub-qos1-bad-pubcomp.py'), + (1, './02-subpub-qos2-bad-puback-1.py'), + (1, './02-subpub-qos2-bad-puback-2.py'), + (1, './02-subpub-qos2-bad-pubcomp.py'), (1, './02-unsubscribe-qos0.py'), (1, './02-unsubscribe-qos1.py'), (1, './02-unsubscribe-qos2.py'),