Tests and support for QoS 1 reporting of no subscribers on publish.

pull/1203/head
Roger A. Light 7 years ago
parent 7b002abd7e
commit 105ad17dc6

@ -65,7 +65,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
}
#ifdef WITH_BROKER
log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d, RC:%d)", type, mosq->id, mid, reason_code);
/* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties);
@ -78,7 +78,7 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
return rc;
}
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d, RC:%d)", mosq->id, type, mid, reason_code);
rc = message__delete(mosq, mid, mosq_md_out, qos);
if(rc){

@ -66,6 +66,7 @@ extern "C" {
/* Error values */
enum mosq_err_t {
MOSQ_ERR_NO_SUBSCRIBERS = -3,
MOSQ_ERR_SUB_EXISTS = -2,
MOSQ_ERR_CONN_PENDING = -1,
MOSQ_ERR_SUCCESS = 0,

@ -40,6 +40,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
uint8_t dup, qos, retain;
uint16_t mid = 0;
int rc = 0;
int rc2;
uint8_t header = context->in_packet.command;
int res = 0;
struct mosquitto_msg_store *stored = NULL;
@ -304,11 +305,18 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
switch(qos){
case 0:
if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
if(rc2 > 0) rc = 1;
break;
case 1:
if(sub__messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
if(send__puback(context, mid, 0)) rc = 1;
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
if(send__puback(context, mid, 0)) rc = 1;
}else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){
if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS)) rc = 1;
}else{
rc = rc2;
}
break;
case 2:
if(!dup){

@ -74,6 +74,9 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
leaf = hier->subs;
if(topic[0] != '$'){
log__printf(NULL, MOSQ_LOG_INFO, "LEAF:%p", leaf);
}
if(retain && set_retain){
#ifdef WITH_PERSISTENCE
if(strncmp(topic, "$SYS", 4)){
@ -139,7 +142,11 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
}
leaf = leaf->next;
}
return rc;
if(hier->subs){
return rc;
}else{
return MOSQ_ERR_NO_SUBSCRIBERS;
}
}
static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub__token **topics, char *topic)
@ -378,11 +385,13 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
return MOSQ_ERR_SUCCESS;
}
static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
{
/* FIXME - need to take into account source_id if the client is a bridge */
struct mosquitto__subhier *branch, *branch_tmp;
bool sr;
int rc;
bool have_subscribers = false;
HASH_ITER(hh, subhier->children, branch, branch_tmp){
sr = set_retain;
@ -396,18 +405,38 @@ static void sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subh
/* Don't set a retained message where + is in the hierarchy. */
sr = false;
}
sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr);
rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr);
if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc;
}
if(!tokens->next){
subs__process(db, branch, source_id, topic, qos, retain, stored, sr);
rc = subs__process(db, branch, source_id, topic, qos, retain, stored, sr);
if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc;
}
}
}else if(!strcmp(UHPA_ACCESS_TOPIC(branch), "#") && !branch->children){
/* The topic matches due to a # wildcard - process the
* subscriptions but *don't* return. Although this branch has ended
* there may still be other subscriptions to deal with.
*/
subs__process(db, branch, source_id, topic, qos, retain, stored, false);
rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false);
if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc;
}
}
}
if(have_subscribers){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_NO_SUBSCRIBERS;
}
}
@ -527,7 +556,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch
*/
sub__add_recurse(db, NULL, 0, 0, 0, subhier, tokens);
}
sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
rc = sub__search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
}
sub__topic_tokens_free(tokens);

@ -32,15 +32,15 @@ helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5)
mid=1
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 4)
publish1_packet = mosq_test.gen_publish("subpub/expired", mid=mid, qos=1, retain=True, payload="message1", proto_ver=5, properties=props)
puback1_packet = mosq_test.gen_puback(mid)
puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid=2
publish2s_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5)
puback2s_packet = mosq_test.gen_puback(mid)
puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid=1
publish2r_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5)
puback2r_packet = mosq_test.gen_puback(mid)
puback2r_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()

@ -29,7 +29,7 @@ try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
helper("$SYS/broker/uptime", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)
helper("$SYS/broker/connection/me", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)
helper("$SYS/broker/connection/me/state", 0)
helper("$SYS/broker/connection/me/state", mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
helper("$share/share/topic", mqtt5_rc.MQTT_RC_NOT_AUTHORIZED)
rc = 0

@ -0,0 +1,72 @@
#!/usr/bin/env python
# Test whether a PUBLISH to a topic with QoS 1 results in the correct PUBACK
# packet when there are no subscribers.
from mosq_test_helper import *
rc = 1
keepalive = 60
connect_packet = mosq_test.gen_connect("pub-qos1-test", keepalive=keepalive, proto_ver=5)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
mid = 1
publish1_packet = mosq_test.gen_publish("pub", qos=1, mid=mid, payload="message", proto_ver=5)
puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 2
publish2_packet = mosq_test.gen_publish("pub/qos1", qos=1, mid=mid, payload="message", proto_ver=5)
puback2_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 3
publish3_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5)
puback3_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 4
publish4_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5, retain=True)
puback4_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 5
publish1b_packet = mosq_test.gen_publish("pub", qos=1, mid=mid, payload="message", proto_ver=5)
puback1b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 6
publish2b_packet = mosq_test.gen_publish("pub/qos1", qos=1, mid=mid, payload="message", proto_ver=5)
puback2b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
mid = 7
publish3b_packet = mosq_test.gen_publish("pub/qos1/test", qos=1, mid=mid, payload="message", proto_ver=5)
puback3b_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
# None of the pub/qos1/test topic tree exists here
mosq_test.do_send_receive(sock, publish1_packet, puback1_packet, "puback1a")
mosq_test.do_send_receive(sock, publish2_packet, puback2_packet, "puback2a")
mosq_test.do_send_receive(sock, publish3_packet, puback3_packet, "puback3a")
# This publish sets a retained message, which means the topic tree exists
mosq_test.do_send_receive(sock, publish4_packet, puback4_packet, "puback4")
# So now test again
mosq_test.do_send_receive(sock, publish1b_packet, puback1b_packet, "puback1b")
mosq_test.do_send_receive(sock, publish2b_packet, puback2b_packet, "puback2b")
mosq_test.do_send_receive(sock, publish3b_packet, puback3b_packet, "puback3b")
rc = 0
sock.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)

@ -94,6 +94,7 @@ endif
./03-publish-dollar-v5.py
./03-publish-dollar.py
./03-publish-invalid-utf8.py
./03-publish-qos1-no-subscribers-v5.py
./03-publish-qos1-retain-disabled.py
./03-publish-qos1.py
./03-publish-qos2.py

@ -74,6 +74,7 @@ tests = [
(1, './03-publish-dollar-v5.py'),
(1, './03-publish-dollar.py'),
(1, './03-publish-invalid-utf8.py'),
(1, './03-publish-qos1-no-subscribers-v5.py'),
(1, './03-publish-qos1-retain-disabled.py'),
(1, './03-publish-qos1.py'),
(1, './03-publish-qos2.py'),

Loading…
Cancel
Save