From a57bba0aaeb5310e5cb9b2e1f3c7b0e93f69c737 Mon Sep 17 00:00:00 2001 From: Roger Light Date: Fri, 8 Mar 2019 23:11:21 +0000 Subject: [PATCH] Don't use uhpa for topics, incompatible with uthash. --- src/database.c | 2 +- src/mosquitto_broker_internal.h | 12 +---- src/persist.c | 4 +- src/subs.c | 87 +++++++++++++++------------------ 4 files changed, 43 insertions(+), 62 deletions(-) diff --git a/src/database.c b/src/database.c index dafb0fcd..221a2f0c 100644 --- a/src/database.c +++ b/src/database.c @@ -153,7 +153,7 @@ static void subhier_clean(struct mosquitto_db *db, struct mosquitto__subhier **s db__msg_store_deref(db, &peer->retained); } subhier_clean(db, &peer->children); - UHPA_FREE_TOPIC(peer); + mosquitto__free(peer->topic); HASH_DELETE(hh, *subhier, peer); mosquitto__free(peer); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 7d327931..5055fe66 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -125,16 +125,6 @@ typedef union { #define UHPA_FREE_PAYLOAD(A) UHPA_FREE((A)->payload, (A)->payloadlen) #define UHPA_MOVE_PAYLOAD(DEST, SRC) UHPA_MOVE((DEST)->payload, (SRC)->payload, (SRC)->payloadlen) -#define MOSQ_TOPIC_ELEMENT_UNION_SIZE 8 -typedef union { - char *ptr; - char array[MOSQ_TOPIC_ELEMENT_UNION_SIZE]; -} mosquitto__topic_element_uhpa; -#define UHPA_ALLOC_TOPIC(A) UHPA_ALLOC((A)->topic, (A)->topic_len+1) -#define UHPA_ACCESS_TOPIC(A) UHPA_ACCESS((A)->topic, (A)->topic_len+1) -#define UHPA_FREE_TOPIC(A) UHPA_FREE((A)->topic, (A)->topic_len+1) -#define UHPA_MOVE_TOPIC(DEST, SRC) UHPA_MOVE((DEST)->topic, (SRC)->topic, (SRC)->topic_len+1) - /* ======================================== * End UHPA data types * ======================================== */ @@ -312,7 +302,7 @@ struct mosquitto__subhier { struct mosquitto__subhier *children; struct mosquitto__subleaf *subs; struct mosquitto_msg_store *retained; - mosquitto__topic_element_uhpa topic; + char *topic; uint16_t topic_len; }; diff --git a/src/persist.c b/src/persist.c index 98f79146..a8fb75a0 100644 --- a/src/persist.c +++ b/src/persist.c @@ -349,9 +349,9 @@ static int persist__subs_retain_write(struct mosquitto_db *db, FILE *db_fptr, st thistopic = mosquitto__malloc(sizeof(char)*slen); if(!thistopic) return MOSQ_ERR_NOMEM; if(level > 1 || strlen(topic)){ - snprintf(thistopic, slen, "%s/%s", topic, UHPA_ACCESS_TOPIC(node)); + snprintf(thistopic, slen, "%s/%s", topic, node->topic); }else{ - snprintf(thistopic, slen, "%s", UHPA_ACCESS_TOPIC(node)); + snprintf(thistopic, slen, "%s", node->topic); } sub = node->subs; diff --git a/src/subs.c b/src/subs.c index 0f07d711..768c3bc8 100644 --- a/src/subs.c +++ b/src/subs.c @@ -58,7 +58,7 @@ Contributors: struct sub__token { struct sub__token *next; - mosquitto__topic_element_uhpa topic; + char *topic; uint16_t topic_len; }; @@ -74,9 +74,6 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie leaf = hier->subs; - if(topic[0] != '$'){ - log__printf(NULL, MOSQ_LOG_INFO, "LEAF:%p", leaf); - } if(retain && set_retain){ #ifdef WITH_PERSISTENCE if(strncmp(topic, "$SYS", 4)){ @@ -162,11 +159,12 @@ static struct sub__token *sub__topic_append(struct sub__token **tail, struct sub } new_topic->next = NULL; new_topic->topic_len = strlen(topic); - if(UHPA_ALLOC_TOPIC(new_topic) == 0){ + new_topic->topic = mosquitto__malloc(new_topic->topic_len+1); + if(!new_topic->topic){ mosquitto__free(new_topic); return NULL; } - strncpy(UHPA_ACCESS_TOPIC(new_topic), topic, new_topic->topic_len+1); + strncpy(new_topic->topic, topic, new_topic->topic_len+1); if(*tail){ (*tail)->next = new_topic; @@ -184,7 +182,7 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics) int len; int start, stop, tlen; int i; - mosquitto__topic_element_uhpa topic; + char *topic; assert(subtopic); assert(topics); @@ -213,11 +211,12 @@ static int sub__topic_tokenise(const char *subtopic, struct sub__token **topics) if(start != stop){ tlen = stop-start; - if(UHPA_ALLOC(topic, tlen+1) == 0) goto cleanup; - memcpy(UHPA_ACCESS(topic, tlen+1), &subtopic[start], tlen); - UHPA_ACCESS(topic, tlen+1)[tlen] = '\0'; - new_topic = sub__topic_append(&tail, topics, UHPA_ACCESS(topic, tlen+1)); - UHPA_FREE(topic, tlen+1); + topic = mosquitto__malloc(tlen+1); + if(!topic) goto cleanup; + memcpy(topic, &subtopic[start], tlen); + topic[tlen] = '\0'; + new_topic = sub__topic_append(&tail, topics, topic); + mosquitto__free(topic); }else{ new_topic = sub__topic_append(&tail, topics, ""); } @@ -232,7 +231,7 @@ cleanup: tail = *topics; *topics = NULL; while(tail){ - UHPA_FREE_TOPIC(tail); + mosquitto__free(tail->topic); new_topic = tail->next; mosquitto__free(tail); tail = new_topic; @@ -246,7 +245,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens) while(tokens){ tail = tokens->next; - UHPA_FREE_TOPIC(tokens); + mosquitto__free(tokens->topic); mosquitto__free(tokens); tokens = tail; } @@ -320,12 +319,12 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, return MOSQ_ERR_SUCCESS; } - HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); + HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); if(branch){ return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next); }else{ /* Not found */ - branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); + branch = sub__add_hier_entry(subhier, &subhier->children, tokens->topic, tokens->topic_len); if(!branch) return MOSQ_ERR_NOMEM; return sub__add_recurse(db, context, qos, identifier, options, branch, tokens->next); @@ -373,12 +372,12 @@ static int sub__remove_recurse(struct mosquitto_db *db, struct mosquitto *contex return MOSQ_ERR_SUCCESS; } - HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); + HASH_FIND(hh, subhier->children, tokens->topic, tokens->topic_len, branch); if(branch){ sub__remove_recurse(db, context, branch, tokens->next, reason); if(!branch->children && !branch->subs && !branch->retained){ HASH_DELETE(hh, subhier->children, branch); - UHPA_FREE_TOPIC(branch); + mosquitto__free(branch->topic); mosquitto__free(branch); } } @@ -396,12 +395,12 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi HASH_ITER(hh, subhier->children, branch, branch_tmp){ sr = set_retain; - if(tokens && UHPA_ACCESS_TOPIC(tokens) - && (!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens)) - || !strcmp(UHPA_ACCESS_TOPIC(branch), "+"))){ + if(tokens && tokens->topic + && (!strcmp(branch->topic, tokens->topic) + || !strcmp(branch->topic, "+"))){ /* The topic matches this subscription. * Doesn't include # wildcards */ - if(!strcmp(UHPA_ACCESS_TOPIC(branch), "+")){ + if(!strcmp(branch->topic, "+")){ /* Don't set a retained message where + is in the hierarchy. */ sr = false; } @@ -419,7 +418,7 @@ static int sub__search(struct mosquitto_db *db, struct mosquitto__subhier *subhi return rc; } } - }else if(!strcmp(UHPA_ACCESS_TOPIC(branch), "#") && !branch->children){ + }else if(!strcmp(branch->topic, "#") && !branch->children){ /* The topic matches due to a # wildcard - process the * subscriptions but *don't* return. Although this branch has ended * there may still be other subscriptions to deal with. @@ -453,28 +452,20 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent } child->parent = parent; child->topic_len = len; - if(UHPA_ALLOC_TOPIC(child) == 0){ + child->topic = malloc(len+1); + if(!child->topic){ child->topic_len = 0; mosquitto__free(child); log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return NULL; }else{ - strncpy(UHPA_ACCESS_TOPIC(child), topic, child->topic_len+1); + strncpy(child->topic, topic, child->topic_len+1); } child->subs = NULL; child->children = NULL; child->retained = NULL; - if(child->topic_len+1 > sizeof(child->topic.array)){ - if(child->topic.ptr){ - HASH_ADD_KEYPTR(hh, *sibling, child->topic.ptr, child->topic_len, child); - }else{ - mosquitto__free(child); - return NULL; - } - }else{ - HASH_ADD(hh, *sibling, topic.array, child->topic_len, child); - } + HASH_ADD_KEYPTR(hh, *sibling, child->topic, child->topic_len, child); return child; } @@ -492,9 +483,9 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub if(sub__topic_tokenise(sub, &tokens)) return 1; - HASH_FIND(hh, *root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier); + HASH_FIND(hh, *root, tokens->topic, tokens->topic_len, subhier); if(!subhier){ - subhier = sub__add_hier_entry(NULL, root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); + subhier = sub__add_hier_entry(NULL, root, tokens->topic, tokens->topic_len); if(!subhier){ sub__topic_tokens_free(tokens); log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); @@ -520,7 +511,7 @@ int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char * if(sub__topic_tokenise(sub, &tokens)) return 1; - HASH_FIND(hh, root, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier); + HASH_FIND(hh, root, tokens->topic, tokens->topic_len, subhier); if(subhier){ *reason = MQTT_RC_NO_SUBSCRIPTION_EXISTED; rc = sub__remove_recurse(db, context, subhier, tokens, reason); @@ -548,7 +539,7 @@ int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const ch */ (*stored)->ref_count++; - HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier); + HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier); if(subhier){ if(retain){ /* We have a message that needs to be retained, so ensure that the subscription @@ -582,7 +573,7 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub parent = sub->parent; HASH_DELETE(hh, parent->children, sub); - UHPA_FREE_TOPIC(sub); + mosquitto__free(sub->topic); mosquitto__free(sub); if(parent->subs == NULL @@ -658,7 +649,7 @@ void sub__tree_print(struct mosquitto__subhier *root, int level) for(i=0; i<(level+2)*2; i++){ printf(" "); } - printf("%s", UHPA_ACCESS_TOPIC(branch)); + printf("%s", branch->topic); leaf = branch->subs; while(leaf){ if(leaf->context){ @@ -752,7 +743,7 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su /* Subscriptions with wildcards in aren't really valid topics to publish to * so they can't have retained messages. */ - if(!strcmp(UHPA_ACCESS_TOPIC(tokens), "#") && !tokens->next){ + if(!strcmp(tokens->topic, "#") && !tokens->next){ /* Set flag to indicate that we should check for retained messages * on "foo" when we are subscribing to e.g. "foo/#" and then exit * this function and return to an earlier retain__search(). @@ -764,12 +755,12 @@ static int retain__search(struct mosquitto_db *db, struct mosquitto__subhier *su if(branch->children){ retain__search(db, branch, tokens, context, sub, sub_qos, subscription_identifier, now, level+1); } - }else if(strcmp(UHPA_ACCESS_TOPIC(branch), "+") - && (!strcmp(UHPA_ACCESS_TOPIC(branch), UHPA_ACCESS_TOPIC(tokens)) - || !strcmp(UHPA_ACCESS_TOPIC(tokens), "+"))){ + }else if(strcmp(branch->topic, "+") + && (!strcmp(branch->topic, tokens->topic) + || !strcmp(tokens->topic, "+"))){ if(tokens->next){ if(retain__search(db, branch, tokens->next, context, sub, sub_qos, subscription_identifier, now, level+1) == -1 - || (!branch_tmp && tokens->next && !strcmp(UHPA_ACCESS_TOPIC(tokens->next), "#") && level>0)){ + || (!branch_tmp && tokens->next && !strcmp(tokens->next->topic, "#") && level>0)){ if(branch->retained){ retain__process(db, branch, context, sub, sub_qos, subscription_identifier, now); @@ -797,7 +788,7 @@ int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const if(sub__topic_tokenise(sub, &tokens)) return 1; - HASH_FIND(hh, db->subs, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, subhier); + HASH_FIND(hh, db->subs, tokens->topic, tokens->topic_len, subhier); if(subhier){ now = time(NULL); @@ -805,7 +796,7 @@ int sub__retain_queue(struct mosquitto_db *db, struct mosquitto *context, const } while(tokens){ tail = tokens->next; - UHPA_FREE_TOPIC(tokens); + mosquitto__free(tokens->topic); mosquitto__free(tokens); tokens = tail; }