Search sub topics, don't iterate (where possible).

pull/1203/head
Roger A. Light 7 years ago
parent 5be83ec1d7
commit 89f51aa54a

@ -387,38 +387,55 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex
static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain) static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhier, struct sub__token *tokens, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored, bool set_retain)
{ {
/* FIXME - need to take into account source_id if the client is a bridge */ /* FIXME - need to take into account source_id if the client is a bridge */
struct mosquitto__subhier *branch, *branch_tmp; struct mosquitto__subhier *branch;
bool sr;
int rc; int rc;
bool have_subscribers = false; bool have_subscribers = false;
HASH_ITER(hh, subhier->children, branch, branch_tmp){ if(tokens){
sr = set_retain; /* Check for literal match */
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
if(tokens && tokens->topic
&& (!strcmp(branch->topic, tokens->topic) if(branch){
|| !strcmp(branch->topic, "+"))){ rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, set_retain);
/* The topic matches this subscription. if(rc == MOSQ_ERR_SUCCESS){
* Doesn't include # wildcards */ have_subscribers = true;
if(!strcmp(branch->topic, "+")){ }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
/* Don't set a retained message where + is in the hierarchy. */ return rc;
sr = false; }
} if(!tokens->next){
rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, sr); rc = subs__process(db, branch, source_id, topic, qos, retain, stored, set_retain);
if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc;
}
}
}
/* Check for + match */
HASH_FIND(hh, subhier->children, "+", 1, branch);
if(branch){
rc = sub__search(db, branch, tokens->next, source_id, topic, qos, retain, stored, false);
if(rc == MOSQ_ERR_SUCCESS){ if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true; have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc; return rc;
} }
if(!tokens->next){ if(!tokens->next){
rc = subs__process(db, branch, source_id, topic, qos, retain, stored, sr); rc = subs__process(db, branch, source_id, topic, qos, retain, stored, false);
if(rc == MOSQ_ERR_SUCCESS){ if(rc == MOSQ_ERR_SUCCESS){
have_subscribers = true; have_subscribers = true;
}else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){ }else if(rc != MOSQ_ERR_NO_SUBSCRIBERS){
return rc; return rc;
} }
} }
}else if(!strcmp(branch->topic, "#") && !branch->children){ }
}
/* Check for # match */
HASH_FIND(hh, subhier->children, "#", 1, branch);
if(branch && !branch->children){
/* The topic matches due to a # wildcard - process the /* The topic matches due to a # wildcard - process the
* subscriptions but *don't* return. Although this branch has ended * subscriptions but *don't* return. Although this branch has ended
* there may still be other subscriptions to deal with. * there may still be other subscriptions to deal with.
@ -430,7 +447,7 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi
return rc; return rc;
} }
} }
}
if(have_subscribers){ if(have_subscribers){
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
}else{ }else{
@ -739,11 +756,8 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
struct mosquitto__subhier *branch, *branch_tmp; struct mosquitto__subhier *branch, *branch_tmp;
int flag = 0; int flag = 0;
HASH_ITER(hh, subhier->children, branch, branch_tmp){
/* Subscriptions with wildcards in aren't really valid topics to publish to
* so they can't have retained messages.
*/
if(!strcmp(tokens->topic, "#") && !tokens->next){ if(!strcmp(tokens->topic, "#") && !tokens->next){
HASH_ITER(hh, subhier->children, branch, branch_tmp){
/* Set flag to indicate that we should check for retained messages /* Set flag to indicate that we should check for retained messages
* on "foo" when we are subscribing to e.g. "foo/#" and then exit * on "foo" when we are subscribing to e.g. "foo/#" and then exit
* this function and return to an earlier retain__search(). * this function and return to an earlier retain__search().
@ -755,12 +769,13 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
if(branch->children){ if(branch->children){
retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1); retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1);
} }
}else if(strcmp(branch->topic, "+") }
&& (!strcmp(branch->topic, tokens->topic) }else{
|| !strcmp(tokens->topic, "+"))){ if(!strcmp(tokens->topic, "+")){
HASH_ITER(hh, subhier->children, branch, branch_tmp){
if(tokens->next){ if(tokens->next){
if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1
|| (!branch_tmp && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ || (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){
if(branch->retained){ if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now); retain__process(db, branch, context, sub_qos, subscription_identifier, now);
@ -772,6 +787,24 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su
} }
} }
} }
}else{
HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch);
if(branch){
if(tokens->next){
if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1
|| (tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
}
}
}else{
if(branch->retained){
retain__process(db, branch, context, sub_qos, subscription_identifier, now);
}
}
}
}
} }
return flag; return flag;
} }

Loading…
Cancel
Save