From 105ad17dc6d51d04563701958936be58a65aa04d Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 4 Mar 2019 07:36:35 +0000 Subject: [PATCH] Tests and support for QoS 1 reporting of no subscribers on publish. --- lib/handle_pubackcomp.c | 4 +- lib/mosquitto.h | 1 + src/handle_publish.c | 14 +++- src/subs.c | 41 +++++++++-- .../02-subpub-qos1-message-expiry-retain.py | 6 +- test/broker/03-publish-dollar-v5.py | 2 +- .../03-publish-qos1-no-subscribers-v5.py | 72 +++++++++++++++++++ test/broker/Makefile | 1 + test/broker/test.py | 1 + 9 files changed, 127 insertions(+), 15 deletions(-) create mode 100755 test/broker/03-publish-qos1-no-subscribers-v5.py diff --git a/lib/handle_pubackcomp.c b/lib/handle_pubackcomp.c index 99b2539d..a3307a49 100644 --- a/lib/handle_pubackcomp.c +++ b/lib/handle_pubackcomp.c @@ -65,7 +65,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) } #ifdef WITH_BROKER - log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid); + log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d, RC:%d)", type, mosq->id, mid, reason_code); /* Immediately free, we don't do anything with Reason String or User Property at the moment */ mosquitto_property_free_all(&properties); @@ -78,7 +78,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) return rc; } #else - log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid); + log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d, RC:%d)", mosq->id, type, mid, reason_code); rc = message__delete(mosq, mid, mosq_md_out, qos); if(rc){ diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 4f0903f7..08b086bf 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -66,6 +66,7 @@ extern "C" { /* Error values */ enum mosq_err_t { + MOSQ_ERR_NO_SUBSCRIBERS = -3, MOSQ_ERR_SUB_EXISTS = -2, MOSQ_ERR_CONN_PENDING = -1, MOSQ_ERR_SUCCESS = 0, diff --git a/src/handle_publish.c b/src/handle_publish.c index 2a3c9f5a..cdb4871e 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -40,6 +40,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) uint8_t dup, qos, retain; uint16_t mid = 0; int rc = 0; + int rc2; uint8_t header = context->in_packet.command; int res = 0; struct mosquitto_msg_store *stored = NULL; @@ -304,11 +305,18 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) switch(qos){ case 0: - if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; + rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored); + if(rc2 > 0) rc = 1; break; case 1: - if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; - if(send__puback(context, mid, 0)) rc = 1; + rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored); + if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){ + if(send__puback(context, mid, 0)) rc = 1; + }else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){ + if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS)) rc = 1; + }else{ + rc = rc2; + } break; case 2: if(!dup){ diff --git a/src/subs.c b/src/subs.c index ad71b73f..0f07d711 100644 --- a/src/subs.c +++ b/src/subs.c @@ -74,6 +74,9 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie leaf = hier->subs; + if(topic[0] != '$'){ + log__printf(NULL, MOSQ_LOG_INFO, "LEAF:%p", leaf); + } if(retain && set_retain){ #ifdef WITH_PERSISTENCE if(strncmp(topic, "$SYS", 4)){ @@ -139,7 +142,11 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie } leaf = leaf->next; } - return rc; + if(hier->subs){ + return rc; + }else{ + return MOSQ_ERR_NO_SUBSCRIBERS; + } } static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub__token **topics, char *topic) @@ -378,11 +385,13 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex return MOSQ_ERR_SUCCESS; } -static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) +static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) { /* FIXME - need to take into account source_id if the client is a bridge */ struct mosquitto__subhier *branch, *branch_tmp; bool sr; + int rc; + bool have_subscribers = false; HASH_ITER(hh, subhier->children, branch, branch_tmp){ sr = set_retain; @@ -396,18 +405,38 @@ static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subh /* Don't set a retained message where + is in the hierarchy. */ sr = false; } - sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr); + rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr); + if(rc == MOSQ_ERR_SUCCESS){ + have_subscribers = true; + }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ + return rc; + } if(!tokens->next){ - subs__process(db, branch, source_id, topic, qos, retain, stored, sr); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored, sr); + if(rc == MOSQ_ERR_SUCCESS){ + have_subscribers = true; + }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ + return rc; + } } }else if(!strcmp(UHPA_ACCESS_TOPIC(branch), "#") && !branch->children){ /* The topic matches due to a # wildcard - process the * subscriptions but *don't* return. Although this branch has ended * there may still be other subscriptions to deal with. */ - subs__process(db, branch, source_id, topic, qos, retain, stored, false); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + if(rc == MOSQ_ERR_SUCCESS){ + have_subscribers = true; + }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ + return rc; + } } } + if(have_subscribers){ + return MOSQ_ERR_SUCCESS; + }else{ + return MOSQ_ERR_NO_SUBSCRIBERS; + } } @@ -527,7 +556,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch */ sub__add_recurse(db, NULL, 0, 0, 0, subhier, tokens); } - sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); + rc = sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); } sub__topic_tokens_free(tokens); diff --git a/test/broker/02-subpub-qos1-message-expiry-retain.py b/test/broker/02-subpub-qos1-message-expiry-retain.py index 8ff512ea..e949937a 100755 --- a/test/broker/02-subpub-qos1-message-expiry-retain.py +++ b/test/broker/02-subpub-qos1-message-expiry-retain.py @@ -32,15 +32,15 @@ helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5) mid=1 props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 4) publish1_packet = mosq_test.gen_publish("subpub/expired", mid=mid, qos=1, retain=True, payload="message1", proto_ver=5, properties=props) -puback1_packet = mosq_test.gen_puback(mid) +puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) mid=2 publish2s_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5) -puback2s_packet = mosq_test.gen_puback(mid) +puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) mid=1 publish2r_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5) -puback2r_packet = mosq_test.gen_puback(mid) +puback2r_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) pingreq_packet = mosq_test.gen_pingreq() pingresp_packet = mosq_test.gen_pingresp() diff --git a/test/broker/03-publish-dollar-v5.py b/test/broker/03-publish-dollar-v5.py index d154a302..f8cce944 100755 --- a/test/broker/03-publish-dollar-v5.py +++ b/test/broker/03-publish-dollar-v5.py @@ -29,7 +29,7 @@ try: sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) helper("$SYS/broker/uptime", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED) helper("$SYS/broker/connection/me", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED) - helper("$SYS/broker/connection/me/state", 0) + helper("$SYS/broker/connection/me/state", mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) helper("$share/share/topic", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED) rc = 0 diff --git a/test/broker/03-publish-qos1-no-subscribers-v5.py b/test/broker/03-publish-qos1-no-subscribers-v5.py new file mode 100755 index 00000000..34c1d469 --- /dev/null +++ b/test/broker/03-publish-qos1-no-subscribers-v5.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +# Test whether a PUBLISH to a topic with QoS 1 results in the correct PUBACK +# packet when there are no subscribers. + +from mosq_test_helper import * + +rc = 1 +keepalive = 60 +connect_packet = mosq_test.gen_connect("pub-qos1-test", keepalive=keepalive, proto_ver=5) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + +mid = 1 +publish1_packet = mosq_test.gen_publish("pub", qos=1, mid=mid, payload="message", proto_ver=5) +puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 2 +publish2_packet = mosq_test.gen_publish("pub/qos1", qos=1, mid=mid, payload="message", proto_ver=5) +puback2_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 3 +publish3_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5) +puback3_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 4 +publish4_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5, retain=True) +puback4_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 5 +publish1b_packet = mosq_test.gen_publish("pub", qos=1, mid=mid, payload="message", proto_ver=5) +puback1b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 6 +publish2b_packet = mosq_test.gen_publish("pub/qos1", qos=1, mid=mid, payload="message", proto_ver=5) +puback2b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +mid = 7 +publish3b_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5) +puback3b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS) + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port) + + # None of the pub/qos1/test topic tree exists here + mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1a") + mosq_test.do_send_receive(sock, publish2_packet, puback2_packet, "puback2a") + mosq_test.do_send_receive(sock, publish3_packet, puback3_packet, "puback3a") + + # This publish sets a retained message, which means the topic tree exists + mosq_test.do_send_receive(sock, publish4_packet, puback4_packet, "puback4") + + # So now test again + mosq_test.do_send_receive(sock, publish1b_packet, puback1b_packet, "puback1b") + mosq_test.do_send_receive(sock, publish2b_packet, puback2b_packet, "puback2b") + mosq_test.do_send_receive(sock, publish3b_packet, puback3b_packet, "puback3b") + + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index 7fc2b4e9..6f378508 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -94,6 +94,7 @@ endif ./03-publish-dollar-v5.py ./03-publish-dollar.py ./03-publish-invalid-utf8.py + ./03-publish-qos1-no-subscribers-v5.py ./03-publish-qos1-retain-disabled.py ./03-publish-qos1.py ./03-publish-qos2.py diff --git a/test/broker/test.py b/test/broker/test.py index 5c3f42d9..93583ee7 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -74,6 +74,7 @@ tests = [ (1, './03-publish-dollar-v5.py'), (1, './03-publish-dollar.py'), (1, './03-publish-invalid-utf8.py'), + (1, './03-publish-qos1-no-subscribers-v5.py'), (1, './03-publish-qos1-retain-disabled.py'), (1, './03-publish-qos1.py'), (1, './03-publish-qos2.py'),