No local support.

pull/1203/head
Roger A. Light 7 years ago
parent db7901884f
commit 2919510384

@ -36,7 +36,6 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
uint8_t subscription_options; uint8_t subscription_options;
uint8_t qos; uint8_t qos;
uint8_t retain_handling = 0; uint8_t retain_handling = 0;
uint8_t retain_as_published = 0;
uint8_t *payload = NULL, *tmp_payload; uint8_t *payload = NULL, *tmp_payload;
uint32_t payloadlen = 0; uint32_t payloadlen = 0;
int len; int len;
@ -95,17 +94,14 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){ if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){
qos = subscription_options; qos = subscription_options;
if(context->is_bridge){ if(context->is_bridge){
retain_as_published = 1; subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL;
} }
}else{ }else{
qos = subscription_options & 0x03; qos = subscription_options & 0x03;
subscription_options &= 0xFC;
retain_as_published = subscription_options & 0x08;
retain_handling = (subscription_options & 0x30) >> 4; retain_handling = (subscription_options & 0x30) >> 4;
if(retain_handling == 3){ if(retain_handling == 3 || (subscription_options & 0xC0) != 0){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_PROTOCOL; return MOSQ_ERR_PROTOCOL;
} }
} }
@ -151,7 +147,7 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
} }
if(qos != 0x80){ if(qos != 0x80){
rc2 = sub__add(db, context, sub, qos, retain_as_published, &db->subs); rc2 = sub__add(db, context, sub, qos, subscription_options, &db->subs);
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(sub__retain_queue(db, context, sub, qos)) rc = 1; if(sub__retain_queue(db, context, sub, qos)) rc = 1;

@ -290,6 +290,7 @@ struct mosquitto__subleaf {
struct mosquitto__subleaf *next; struct mosquitto__subleaf *next;
struct mosquitto *context; struct mosquitto *context;
int qos; int qos;
bool no_local;
bool retain_as_published; bool retain_as_published;
}; };
@ -573,7 +574,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);
/* ============================================================ /* ============================================================
* Subscription functions * Subscription functions
* ============================================================ */ * ============================================================ */
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root); int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root);
struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len); struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len);
int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root); int sub__remove(struct mosquitto_db *db, struct mosquitto *context, const char *sub, struct mosquitto__subhier *root);
void sub__tree_print(struct mosquitto__subhier *root, int level); void sub__tree_print(struct mosquitto__subhier *root, int level);

@ -97,7 +97,7 @@ static int subs__process(struct mosquitto_db *db, struct mosquitto__subhier *hie
} }
} }
while(source_id && leaf){ while(source_id && leaf){
if(!leaf->context->id || (leaf->context->is_bridge && !strcmp(leaf->context->id, source_id))){ if(!leaf->context->id || leaf->no_local){
leaf = leaf->next; leaf = leaf->next;
continue; continue;
} }
@ -240,7 +240,7 @@ static void sub__topic_tokens_free(struct sub__token *tokens)
} }
} }
static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, bool retain_as_published, struct mosquitto__subhier *subhier, struct sub__token *tokens) static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context, int qos, int options, struct mosquitto__subhier *subhier, struct sub__token *tokens)
/* FIXME - this function has the potential to leak subhier, audit calling functions. */ /* FIXME - this function has the potential to leak subhier, audit calling functions. */
{ {
struct mosquitto__subhier *branch; struct mosquitto__subhier *branch;
@ -274,7 +274,8 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
leaf->next = NULL; leaf->next = NULL;
leaf->context = context; leaf->context = context;
leaf->qos = qos; leaf->qos = qos;
leaf->retain_as_published = retain_as_published; leaf->no_local = ((options & 0x04) != 0);
leaf->retain_as_published = ((options & 0x08) != 0);
for(i=0; i<context->sub_count; i++){ for(i=0; i<context->sub_count; i++){
if(!context->subs[i]){ if(!context->subs[i]){
context->subs[i] = subhier; context->subs[i] = subhier;
@ -307,13 +308,13 @@ static int sub__add_recurse(struct mosquitto_db *db, struct mosquitto *context,
HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch); HASH_FIND(hh, subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len, branch);
if(branch){ if(branch){
return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); return sub__add_recurse(db, context, qos, options, branch, tokens->next);
}else{ }else{
/* Not found */ /* Not found */
branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len); branch = sub__add_hier_entry(subhier, &subhier->children, UHPA_ACCESS_TOPIC(tokens), tokens->topic_len);
if(!branch) return MOSQ_ERR_NOMEM; if(!branch) return MOSQ_ERR_NOMEM;
return sub__add_recurse(db, context, qos, retain_as_published, branch, tokens->next); return sub__add_recurse(db, context, qos, options, branch, tokens->next);
} }
} }
@ -442,7 +443,7 @@ struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent
} }
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, bool retain_as_published, struct mosquitto__subhier **root) int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, int options, struct mosquitto__subhier **root)
{ {
int rc = 0; int rc = 0;
struct mosquitto__subhier *subhier; struct mosquitto__subhier *subhier;
@ -464,7 +465,7 @@ int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub
} }
} }
rc = sub__add_recurse(db, context, qos, retain_as_published, subhier, tokens); rc = sub__add_recurse(db, context, qos, options, subhier, tokens);
sub__topic_tokens_free(tokens); sub__topic_tokens_free(tokens);

Loading…
Cancel
Save