diff --git a/src/subs.c b/src/subs.c index 79def04a..75f66660 100644 --- a/src/subs.c +++ b/src/subs.c @@ -387,50 +387,67 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) { /* FIXME - need to take into account source_id if the client is a bridge */ - struct mosquitto__subhier *branch, *branch_tmp; - bool sr; + struct mosquitto__subhier *branch; int rc; bool have_subscribers = false; - HASH_ITER(hh, subhier->children, branch, branch_tmp){ - sr = set_retain; - - if(tokens && tokens->topic - && (!strcmp(branch->topic, tokens->topic) - || !strcmp(branch->topic, "+"))){ - /* The topic matches this subscription. - * Doesn't include # wildcards */ - if(!strcmp(branch->topic, "+")){ - /* Don't set a retained message where + is in the hierarchy. */ - sr = false; - } - rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr); + if(tokens){ + /* Check for literal match */ + HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); + + if(branch){ + rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, set_retain); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ return rc; } if(!tokens->next){ - rc = subs__process(db, branch, source_id, topic, qos, retain, stored, sr); + rc = subs__process(db, branch, source_id, topic, qos, retain, stored, set_retain); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ return rc; } } - }else if(!strcmp(branch->topic, "#") && !branch->children){ - /* The topic matches due to a # wildcard - process the - * subscriptions but *don't* return. Although this branch has ended - * there may still be other subscriptions to deal with. - */ - rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + } + + /* Check for + match */ + HASH_FIND(hh, subhier->children, "+", 1, branch); + + if(branch){ + rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, false); if(rc == MOSQ_ERR_SUCCESS){ have_subscribers = true; }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ return rc; } + if(!tokens->next){ + rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + if(rc == MOSQ_ERR_SUCCESS){ + have_subscribers = true; + }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ + return rc; + } + } } } + + /* Check for # match */ + HASH_FIND(hh, subhier->children, "#", 1, branch); + if(branch && !branch->children){ + /* The topic matches due to a # wildcard - process the + * subscriptions but *don't* return. Although this branch has ended + * there may still be other subscriptions to deal with. + */ + rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false); + if(rc == MOSQ_ERR_SUCCESS){ + have_subscribers = true; + }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ + return rc; + } + } + if(have_subscribers){ return MOSQ_ERR_SUCCESS; }else{ @@ -739,11 +756,8 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su struct mosquitto__subhier *branch, *branch_tmp; int flag = 0; - HASH_ITER(hh, subhier->children, branch, branch_tmp){ - /* Subscriptions with wildcards in aren't really valid topics to publish to - * so they can't have retained messages. - */ - if(!strcmp(tokens->topic, "#") && !tokens->next){ + if(!strcmp(tokens->topic, "#") && !tokens->next){ + HASH_ITER(hh, subhier->children, branch, branch_tmp){ /* Set flag to indicate that we should check for retained messages * on "foo" when we are subscribing to e.g. "foo/#" and then exit * this function and return to an earlier retain__search(). @@ -755,20 +769,39 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su if(branch->children){ retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1); } - }else if(strcmp(branch->topic, "+") - && (!strcmp(branch->topic, tokens->topic) - || !strcmp(tokens->topic, "+"))){ - if(tokens->next){ - if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 - || (!branch_tmp && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ - + } + }else{ + if(!strcmp(tokens->topic, "+")){ + HASH_ITER(hh, subhier->children, branch, branch_tmp){ + if(tokens->next){ + if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ + + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + }else{ if(branch->retained){ retain__process(db, branch, context, sub_qos, subscription_identifier, now); } } - }else{ - if(branch->retained){ - retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + }else{ + HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); + if(branch){ + if(tokens->next){ + if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 + || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ + + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } + } + }else{ + if(branch->retained){ + retain__process(db, branch, context, sub_qos, subscription_identifier, now); + } } } }