diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 5b01bfc0..5612cc9c 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -201,6 +201,8 @@ struct mosquitto { int pollfd_index; struct _mosquitto_packet *out_packet_last; bool is_dropping; + struct _mosquitto_subhier **subs; + int sub_count; # ifdef WITH_WEBSOCKETS struct libwebsocket_context *ws_context; struct libwebsocket *wsi; diff --git a/src/bridge.c b/src/bridge.c index 55a3ad52..89beb32f 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -156,7 +156,7 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) * remove any messages and the next loop carries out the resubscription * anyway. This means any unwanted subs will be removed. */ - mqtt3_subs_clean_session(db, context, &db->subs); + mqtt3_subs_clean_session(db, context); for(i=0; ibridge->topic_count; i++){ if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ diff --git a/src/context.c b/src/context.c index 9b200c2f..615069a3 100644 --- a/src/context.c +++ b/src/context.c @@ -134,7 +134,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b #endif _mosquitto_socket_close(db, context); if((do_free || context->clean_session) && db){ - mqtt3_subs_clean_session(db, context, &db->subs); + mqtt3_subs_clean_session(db, context); mqtt3_db_messages_delete(context); } if(context->address){ diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 2c94bfce..166ae030 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -393,7 +393,7 @@ int mqtt3_sub_add(struct mosquitto_db *db, struct mosquitto *context, const char int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct _mosquitto_subhier *root); int mqtt3_sub_search(struct mosquitto_db *db, struct _mosquitto_subhier *root, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); void mqtt3_sub_tree_print(struct _mosquitto_subhier *root, int level); -int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root); +int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context); /* ============================================================ * Context functions diff --git a/src/subs.c b/src/subs.c index 188e80eb..80f9b4d7 100644 --- a/src/subs.c +++ b/src/subs.c @@ -247,6 +247,8 @@ static int _sub_add(struct mosquitto_db *db, struct mosquitto *context, int qos, { struct _mosquitto_subhier *branch, *last = NULL; struct _mosquitto_subleaf *leaf, *last_leaf; + struct _mosquitto_subhier **subs; + int i; if(!tokens){ if(context){ @@ -281,6 +283,32 @@ static int _sub_add(struct mosquitto_db *db, struct mosquitto *context, int qos, subhier->subs = leaf; leaf->prev = NULL; } + if(context->subs){ + for(i=0; isub_count; i++){ + if(!context->subs[i]){ + context->subs[i] = subhier; + } + break; + } + if(i == context->sub_count){ + context->sub_count++; + subs = _mosquitto_realloc(context->subs, sizeof(struct _mosquitto_subhier *)*context->sub_count); + if(!subs){ + _mosquitto_free(leaf); + return MOSQ_ERR_NOMEM; + } + context->subs = subs; + context->subs[context->sub_count-1] = subhier; + } + }else{ + context->sub_count = 1; + context->subs = _mosquitto_malloc(sizeof(struct _mosquitto_subhier *)*context->sub_count); + if(!context->subs){ + _mosquitto_free(leaf); + return MOSQ_ERR_NOMEM; + } + context->subs[0] = subhier; + } #ifdef WITH_SYS_TREE db->subscription_count++; #endif @@ -316,6 +344,7 @@ static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struc { struct _mosquitto_subhier *branch, *last = NULL; struct _mosquitto_subleaf *leaf; + int i; if(!tokens){ leaf = subhier->subs; @@ -333,6 +362,17 @@ static int _sub_remove(struct mosquitto_db *db, struct mosquitto *context, struc leaf->next->prev = leaf->prev; } _mosquitto_free(leaf); + + /* Remove the reference to the sub that the client is keeping. + * It would be nice to be able to use the reference directly, + * but that would involve keeping a copy of the topic string in + * each subleaf. Might be worth considering though. */ + for(i=0; isub_count; i++){ + if(context->subs[i] == subhier){ + context->subs[i] = NULL; + break; + } + } return MOSQ_ERR_SUCCESS; } leaf = leaf->next; @@ -499,71 +539,37 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons return rc; } -static int _subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root) +/* Remove all subscriptions for a client. + */ +int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context) { - int rc = 0; - struct _mosquitto_subhier *child, *last = NULL; - struct _mosquitto_subleaf *leaf, *next; - - if(!root) return MOSQ_ERR_SUCCESS; + int i; + struct _mosquitto_subleaf *leaf; - leaf = root->subs; - while(leaf){ - if(leaf->context == context){ + for(i=0; isub_count; i++){ + leaf = context->subs[i]->subs; + while(leaf){ + if(leaf->context==context){ #ifdef WITH_SYS_TREE - db->subscription_count--; + db->subscription_count--; #endif - if(leaf->prev){ - leaf->prev->next = leaf->next; - }else{ - root->subs = leaf->next; - } - if(leaf->next){ - leaf->next->prev = leaf->prev; + if(leaf->prev){ + leaf->prev->next = leaf->next; + }else{ + context->subs[i]->subs = leaf->next; + } + if(leaf->next){ + leaf->next->prev = leaf->prev; + } + _mosquitto_free(leaf); + break; } - next = leaf->next; - _mosquitto_free(leaf); - leaf = next; - }else{ leaf = leaf->next; } } - - child = root->children; - while(child){ - _subs_clean_session(db, context, child); - if(!child->children && !child->subs && !child->retained){ - if(last){ - last->next = child->next; - }else{ - root->children = child->next; - } - _mosquitto_free(child->topic); - _mosquitto_free(child); - if(last){ - child = last->next; - }else{ - child = root->children; - } - }else{ - last = child; - child = child->next; - } - } - return rc; -} - -/* Remove all subscriptions for a client. - */ -int mqtt3_subs_clean_session(struct mosquitto_db *db, struct mosquitto *context, struct _mosquitto_subhier *root) -{ - struct _mosquitto_subhier *child; - - child = root->children; - while(child){ - _subs_clean_session(db, context, child); - child = child->next; - } + _mosquitto_free(context->subs); + context->subs = NULL; + context->sub_count = 0; return MOSQ_ERR_SUCCESS; }