|
|
|
@ -150,7 +150,7 @@ static int subs__process(struct mosquitto__subhier *hier, const char *source_id,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int sub__add_leaf(struct mosquitto *context, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf)
|
|
|
|
|
static int sub__add_leaf(struct mosquitto *context, const char *topic_filter, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subleaf **head, struct mosquitto__subleaf **newleaf)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto__subleaf *leaf;
|
|
|
|
|
|
|
|
|
@ -164,17 +164,20 @@ static int sub__add_leaf(struct mosquitto *context, uint8_t qos, uint32_t identi
|
|
|
|
|
* indicate this to the calling function. */
|
|
|
|
|
leaf->qos = qos;
|
|
|
|
|
leaf->identifier = identifier;
|
|
|
|
|
leaf->no_local = ((options & MQTT_SUB_OPT_NO_LOCAL) != 0);
|
|
|
|
|
leaf->retain_as_published = ((options & MQTT_SUB_OPT_RETAIN_AS_PUBLISHED) != 0);
|
|
|
|
|
return MOSQ_ERR_SUB_EXISTS;
|
|
|
|
|
}
|
|
|
|
|
leaf = leaf->next;
|
|
|
|
|
}
|
|
|
|
|
leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf));
|
|
|
|
|
leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf) + strlen(topic_filter) + 1);
|
|
|
|
|
if(!leaf) return MOSQ_ERR_NOMEM;
|
|
|
|
|
leaf->context = context;
|
|
|
|
|
leaf->qos = qos;
|
|
|
|
|
leaf->identifier = identifier;
|
|
|
|
|
leaf->no_local = ((options & MQTT_SUB_OPT_NO_LOCAL) != 0);
|
|
|
|
|
leaf->retain_as_published = ((options & MQTT_SUB_OPT_RETAIN_AS_PUBLISHED) != 0);
|
|
|
|
|
strcpy(leaf->topic_filter, topic_filter);
|
|
|
|
|
|
|
|
|
|
DL_APPEND(*head, leaf);
|
|
|
|
|
*newleaf = leaf;
|
|
|
|
@ -190,7 +193,6 @@ static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct m
|
|
|
|
|
HASH_DELETE(hh, subhier->shared, shared);
|
|
|
|
|
mosquitto__FREE(shared);
|
|
|
|
|
}
|
|
|
|
|
mosquitto__FREE(leaf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -198,8 +200,7 @@ static int sub__add_shared(struct mosquitto *context, const char *sub, uint8_t q
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto__subleaf *newleaf;
|
|
|
|
|
struct mosquitto__subshared *shared = NULL;
|
|
|
|
|
struct mosquitto__client_sub **subs;
|
|
|
|
|
struct mosquitto__client_sub *csub;
|
|
|
|
|
struct mosquitto__subleaf **subs;
|
|
|
|
|
int i;
|
|
|
|
|
size_t slen;
|
|
|
|
|
int rc;
|
|
|
|
@ -217,7 +218,7 @@ static int sub__add_shared(struct mosquitto *context, const char *sub, uint8_t q
|
|
|
|
|
HASH_ADD(hh, subhier->shared, name, slen, shared);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rc = sub__add_leaf(context, qos, identifier, options, &shared->subs, &newleaf);
|
|
|
|
|
rc = sub__add_leaf(context, sub, qos, identifier, options, &shared->subs, &newleaf);
|
|
|
|
|
if(rc > 0){
|
|
|
|
|
if(shared->subs == NULL){
|
|
|
|
|
HASH_DELETE(hh, subhier->shared, shared);
|
|
|
|
@ -227,32 +228,27 @@ static int sub__add_shared(struct mosquitto *context, const char *sub, uint8_t q
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(rc != MOSQ_ERR_SUB_EXISTS){
|
|
|
|
|
slen = strlen(sub);
|
|
|
|
|
csub = mosquitto__calloc(1, sizeof(struct mosquitto__client_sub) + slen + 1);
|
|
|
|
|
if(csub == NULL) return MOSQ_ERR_NOMEM;
|
|
|
|
|
memcpy(csub->topic_filter, sub, slen);
|
|
|
|
|
csub->hier = subhier;
|
|
|
|
|
csub->shared = shared;
|
|
|
|
|
newleaf->hier = subhier;
|
|
|
|
|
newleaf->shared = shared;
|
|
|
|
|
|
|
|
|
|
bool assigned = false;
|
|
|
|
|
for(i=0; i<context->sub_count; i++){
|
|
|
|
|
if(!context->subs[i]){
|
|
|
|
|
context->subs[i] = csub;
|
|
|
|
|
context->subs[i] = newleaf;
|
|
|
|
|
assigned = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(assigned == false){
|
|
|
|
|
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__client_sub *)*(size_t)(context->sub_count + 1));
|
|
|
|
|
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->sub_count + 1));
|
|
|
|
|
if(!subs){
|
|
|
|
|
sub__remove_shared_leaf(subhier, shared, newleaf);
|
|
|
|
|
mosquitto__FREE(newleaf);
|
|
|
|
|
mosquitto__FREE(csub);
|
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
|
}
|
|
|
|
|
context->subs = subs;
|
|
|
|
|
context->sub_count++;
|
|
|
|
|
context->subs[context->sub_count-1] = csub;
|
|
|
|
|
context->subs[context->sub_count-1] = newleaf;
|
|
|
|
|
}
|
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
|
db.shared_subscription_count++;
|
|
|
|
@ -272,44 +268,37 @@ static int sub__add_shared(struct mosquitto *context, const char *sub, uint8_t q
|
|
|
|
|
static int sub__add_normal(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto__subleaf *newleaf = NULL;
|
|
|
|
|
struct mosquitto__client_sub **subs;
|
|
|
|
|
struct mosquitto__client_sub *csub;
|
|
|
|
|
struct mosquitto__subleaf **subs;
|
|
|
|
|
int i;
|
|
|
|
|
int rc;
|
|
|
|
|
size_t slen;
|
|
|
|
|
|
|
|
|
|
rc = sub__add_leaf(context, qos, identifier, options, &subhier->subs, &newleaf);
|
|
|
|
|
rc = sub__add_leaf(context, sub, qos, identifier, options, &subhier->subs, &newleaf);
|
|
|
|
|
if(rc > 0){
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(rc != MOSQ_ERR_SUB_EXISTS){
|
|
|
|
|
slen = strlen(sub);
|
|
|
|
|
csub = mosquitto__calloc(1, sizeof(struct mosquitto__client_sub) + slen + 1);
|
|
|
|
|
if(csub == NULL) return MOSQ_ERR_NOMEM;
|
|
|
|
|
memcpy(csub->topic_filter, sub, slen);
|
|
|
|
|
csub->hier = subhier;
|
|
|
|
|
csub->shared = NULL;
|
|
|
|
|
newleaf->hier = subhier;
|
|
|
|
|
newleaf->shared = NULL;
|
|
|
|
|
|
|
|
|
|
bool assigned = false;
|
|
|
|
|
for(i=0; i<context->sub_count; i++){
|
|
|
|
|
if(!context->subs[i]){
|
|
|
|
|
context->subs[i] = csub;
|
|
|
|
|
context->subs[i] = newleaf;
|
|
|
|
|
assigned = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(assigned == false){
|
|
|
|
|
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__client_sub *)*(size_t)(context->sub_count + 1));
|
|
|
|
|
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__subleaf *)*(size_t)(context->sub_count + 1));
|
|
|
|
|
if(!subs){
|
|
|
|
|
DL_DELETE(subhier->subs, newleaf);
|
|
|
|
|
mosquitto__FREE(newleaf);
|
|
|
|
|
mosquitto__FREE(csub);
|
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
|
}
|
|
|
|
|
context->subs = subs;
|
|
|
|
|
context->sub_count++;
|
|
|
|
|
context->subs[context->sub_count-1] = csub;
|
|
|
|
|
context->subs[context->sub_count-1] = newleaf;
|
|
|
|
|
}
|
|
|
|
|
#ifdef WITH_SYS_TREE
|
|
|
|
|
db.subscription_count++;
|
|
|
|
@ -373,14 +362,13 @@ static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhi
|
|
|
|
|
db.subscription_count--;
|
|
|
|
|
#endif
|
|
|
|
|
DL_DELETE(subhier->subs, leaf);
|
|
|
|
|
mosquitto__FREE(leaf);
|
|
|
|
|
|
|
|
|
|
/* Remove the reference to the sub that the client is keeping.
|
|
|
|
|
* It would be nice to be able to use the reference directly,
|
|
|
|
|
* but that would involve keeping a copy of the topic string in
|
|
|
|
|
* each subleaf. Might be worth considering though. */
|
|
|
|
|
for(i=0; i<context->sub_count; i++){
|
|
|
|
|
if(context->subs[i] && context->subs[i]->hier == subhier){
|
|
|
|
|
if(context->subs[i] == leaf){
|
|
|
|
|
mosquitto__FREE(context->subs[i]);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -409,17 +397,13 @@ static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhi
|
|
|
|
|
db.shared_subscription_count--;
|
|
|
|
|
#endif
|
|
|
|
|
DL_DELETE(shared->subs, leaf);
|
|
|
|
|
mosquitto__FREE(leaf);
|
|
|
|
|
|
|
|
|
|
/* Remove the reference to the sub that the client is keeping.
|
|
|
|
|
* It would be nice to be able to use the reference directly,
|
|
|
|
|
* but that would involve keeping a copy of the topic string in
|
|
|
|
|
* each subleaf. Might be worth considering though. */
|
|
|
|
|
for(i=0; i<context->sub_count; i++){
|
|
|
|
|
if(context->subs[i]
|
|
|
|
|
&& context->subs[i]->hier == subhier
|
|
|
|
|
&& context->subs[i]->shared == shared){
|
|
|
|
|
|
|
|
|
|
if(context->subs[i] == leaf){
|
|
|
|
|
mosquitto__FREE(context->subs[i]);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -697,7 +681,7 @@ int sub__clean_session(struct mosquitto *context)
|
|
|
|
|
struct mosquitto__subhier *hier;
|
|
|
|
|
|
|
|
|
|
for(i=0; i<context->sub_count; i++){
|
|
|
|
|
if(context->subs[i] == NULL){
|
|
|
|
|
if(context->subs[i] == NULL || context->subs[i]->hier == NULL){
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -724,7 +708,6 @@ int sub__clean_session(struct mosquitto *context)
|
|
|
|
|
db.subscription_count--;
|
|
|
|
|
#endif
|
|
|
|
|
DL_DELETE(hier->subs, leaf);
|
|
|
|
|
mosquitto__FREE(leaf);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
leaf = leaf->next;
|
|
|
|
|