diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 613c7e80..05203b03 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -213,7 +213,7 @@ struct mosquitto_evt_disconnect { struct mosquitto_evt_subscribe { void *future; struct mosquitto *client; - char *topic; + char *topic_filter; const mosquitto_property *properties; uint32_t subscription_identifier; uint8_t subscription_options; @@ -227,7 +227,7 @@ struct mosquitto_evt_subscribe { struct mosquitto_evt_unsubscribe { void *future; struct mosquitto *client; - char *topic; + char *topic_filter; const mosquitto_property *properties; void *future2[8]; }; @@ -270,7 +270,7 @@ struct mosquitto_evt_persist_client { struct mosquitto_subscription { char *client_id; - char *topic; + char *topic_filter; mosquitto_property *properties; uint32_t identifier; uint8_t options; diff --git a/plugins/examples/topic-jail/mosquitto_topic_jail.c b/plugins/examples/topic-jail/mosquitto_topic_jail.c index 832b1133..3d8e21e8 100644 --- a/plugins/examples/topic-jail/mosquitto_topic_jail.c +++ b/plugins/examples/topic-jail/mosquitto_topic_jail.c @@ -173,7 +173,7 @@ static int callback_subscribe(int event, void *event_data, void *userdata) /* put the client_id on front of the topic */ /* calculate the length of the new payload */ - new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic) + 1; + new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic_filter) + 1; /* Allocate some memory - use * mosquitto_calloc/mosquitto_malloc/mosquitto_strdup when allocating, to @@ -184,12 +184,12 @@ static int callback_subscribe(int event, void *event_data, void *userdata) } /* prepend the client_id to the subscription */ - snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic); + snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic_filter); /* Assign the new topic to the event data structure. You * must *not* free the original topic, it will be handled by the * broker. */ - ed->topic = new_sub; + ed->topic_filter = new_sub; return MOSQ_ERR_SUCCESS; } @@ -213,7 +213,7 @@ static int callback_unsubscribe(int event, void *event_data, void *userdata) /* put the client_id on front of the topic */ /* calculate the length of the new payload */ - new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic) + 1; + new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic_filter) + 1; /* Allocate some memory - use * mosquitto_calloc/mosquitto_malloc/mosquitto_strdup when allocating, to @@ -224,12 +224,12 @@ static int callback_unsubscribe(int event, void *event_data, void *userdata) } /* prepend the client_id to the subscription */ - snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic); + snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic_filter); /* Assign the new topic to the event data structure. You * must *not* free the original topic, it will be handled by the * broker. */ - ed->topic = new_sub; + ed->topic_filter = new_sub; return MOSQ_ERR_SUCCESS; } diff --git a/plugins/persist-sqlite/restore.c b/plugins/persist-sqlite/restore.c index a5a900d7..746cdae7 100644 --- a/plugins/persist-sqlite/restore.c +++ b/plugins/persist-sqlite/restore.c @@ -251,7 +251,7 @@ static int subscription_restore(struct mosquitto_sqlite *ms) while(sqlite3_step(stmt) == SQLITE_ROW){ memset(&sub, 0, sizeof(sub)); sub.client_id = (char *)sqlite3_column_text(stmt, 0); - sub.topic = (char *)sqlite3_column_text(stmt, 1); + sub.topic_filter = (char *)sqlite3_column_text(stmt, 1); sub.options = (uint8_t)sqlite3_column_int(stmt, 2); sub.identifier = (uint32_t)sqlite3_column_int(stmt, 3); diff --git a/plugins/persist-sqlite/subscriptions.c b/plugins/persist-sqlite/subscriptions.c index eca71c1e..f45f4af8 100644 --- a/plugins/persist-sqlite/subscriptions.c +++ b/plugins/persist-sqlite/subscriptions.c @@ -35,7 +35,7 @@ int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userd ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_text(ms->subscription_add_stmt, 2, - ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){ + ed->data.topic_filter, (int)strlen(ed->data.topic_filter), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_int(ms->subscription_add_stmt, 3, ed->data.options) == SQLITE_OK){ @@ -71,7 +71,7 @@ int persist_sqlite__subscription_remove_cb(int event, void *event_data, void *us ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_text(ms->subscription_remove_stmt, 2, - ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){ + ed->data.topic_filter, (int)strlen(ed->data.topic_filter), SQLITE_STATIC) == SQLITE_OK){ ms->event_count++; rc = sqlite3_step(ms->subscription_remove_stmt); diff --git a/src/bridge.c b/src/bridge.c index dd4a91cd..ef6c8d3f 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -492,7 +492,7 @@ int bridge__connect(struct mosquitto *context) qos = cur_topic->qos; } struct mosquitto_subscription sub; - sub.topic = cur_topic->local_topic; + sub.topic_filter = cur_topic->local_topic; sub.identifier = 0; sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos; if(sub__add(context, &sub) > 0){ @@ -695,7 +695,7 @@ int bridge__on_connect(struct mosquitto *context) struct mosquitto_subscription sub; memset(&sub, 0, sizeof(sub)); LL_FOREACH(context->bridge->topics, cur_topic){ - sub.topic = cur_topic->local_topic; + sub.topic_filter = cur_topic->local_topic; if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ if(cur_topic->qos > context->max_qos){ sub.options = context->max_qos; diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 9c63da0f..1698a074 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -96,36 +96,36 @@ int handle__subscribe(struct mosquitto *context) memset(&sub, 0, sizeof(sub)); sub.identifier = subscription_identifier; sub.properties = properties; - if(packet__read_string(&context->in_packet, &sub.topic, &slen)){ + if(packet__read_string(&context->in_packet, &sub.topic_filter, &slen)){ mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } - if(sub.topic){ + if(sub.topic_filter){ if(!slen){ log__printf(NULL, MOSQ_LOG_INFO, "Empty subscription string from %s, disconnecting.", context->address); - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } - if(mosquitto_sub_topic_check(sub.topic)){ + if(mosquitto_sub_topic_check(sub.topic_filter)){ log__printf(NULL, MOSQ_LOG_INFO, "Invalid subscription string from %s, disconnecting.", context->address); - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } if(packet__read_byte(&context->in_packet, &sub.options)){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } - if(sub.options & MQTT_SUB_OPT_NO_LOCAL && !strncmp(sub.topic, "$share/", strlen("$share/"))){ - mosquitto__FREE(sub.topic); + if(sub.options & MQTT_SUB_OPT_NO_LOCAL && !strncmp(sub.topic_filter, "$share/", strlen("$share/"))){ + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_PROTOCOL; } @@ -142,7 +142,7 @@ int handle__subscribe(struct mosquitto *context) retain_handling = MQTT_SUB_OPT_GET_SEND_RETAIN(sub.options); if(retain_handling == 0x30 || (sub.options & 0xC0) != 0){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } @@ -151,7 +151,7 @@ int handle__subscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_INFO, "Invalid QoS in subscription command from %s, disconnecting.", context->address); - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_MALFORMED_PACKET; } @@ -165,21 +165,21 @@ int handle__subscribe(struct mosquitto *context) len = strlen(context->listener->mount_point) + slen + 1; sub_mount = mosquitto__malloc(len+1); if(!sub_mount){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(payload); return MOSQ_ERR_NOMEM; } - snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub.topic); + snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub.topic_filter); sub_mount[len] = '\0'; - mosquitto__FREE(sub.topic); - sub.topic = sub_mount; + mosquitto__FREE(sub.topic_filter); + sub.topic_filter = sub_mount; } - log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub.topic, qos); + log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub.topic_filter, qos); allowed = true; - rc2 = mosquitto_acl_check(context, sub.topic, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE); + rc2 = mosquitto_acl_check(context, sub.topic_filter, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE); switch(rc2){ case MOSQ_ERR_SUCCESS: break; @@ -192,20 +192,20 @@ int handle__subscribe(struct mosquitto *context) } break; default: - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); return rc2; } if(allowed){ rc2 = plugin__handle_subscribe(context, &sub); if(rc2){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); return rc2; } rc2 = sub__add(context, &sub); if(rc2 > 0){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); return rc2; } if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ @@ -220,16 +220,16 @@ int handle__subscribe(struct mosquitto *context) } } - log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub.topic); + log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub.topic_filter); rc = plugin__handle_subscribe(context, &sub); if(rc){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); return rc; } plugin_persist__handle_subscription_add(context, &sub); } - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); tmp_payload = mosquitto__realloc(payload, payloadlen + 1); if(tmp_payload){ diff --git a/src/handle_unsubscribe.c b/src/handle_unsubscribe.c index 189d1426..9b73c0ee 100644 --- a/src/handle_unsubscribe.c +++ b/src/handle_unsubscribe.c @@ -90,7 +90,7 @@ int handle__unsubscribe(struct mosquitto *context) while(context->in_packet.pos < context->in_packet.remaining_length){ memset(&sub, 0, sizeof(sub)); sub.properties = properties; - if(packet__read_string(&context->in_packet, &sub.topic, &slen)){ + if(packet__read_string(&context->in_packet, &sub.topic_filter, &slen)){ mosquitto__FREE(reason_codes); return MOSQ_ERR_MALFORMED_PACKET; } @@ -99,22 +99,22 @@ int handle__unsubscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_INFO, "Empty unsubscription string from %s, disconnecting.", context->id); - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(reason_codes); return MOSQ_ERR_MALFORMED_PACKET; } - if(mosquitto_sub_topic_check(sub.topic)){ + if(mosquitto_sub_topic_check(sub.topic_filter)){ log__printf(NULL, MOSQ_LOG_INFO, "Invalid unsubscription string from %s, disconnecting.", context->id); - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(reason_codes); return MOSQ_ERR_MALFORMED_PACKET; } /* ACL check */ allowed = true; - rc = mosquitto_acl_check(context, sub.topic, 0, NULL, 0, false, MOSQ_ACL_UNSUBSCRIBE); + rc = mosquitto_acl_check(context, sub.topic_filter, 0, NULL, 0, false, MOSQ_ACL_UNSUBSCRIBE); switch(rc){ case MOSQ_ERR_SUCCESS: break; @@ -123,26 +123,26 @@ int handle__unsubscribe(struct mosquitto *context) reason = MQTT_RC_NOT_AUTHORIZED; break; default: - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(reason_codes); return rc; } - log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub.topic); + log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub.topic_filter); if(allowed){ rc = plugin__handle_unsubscribe(context, &sub); if(rc){ - mosquitto__FREE(sub.topic); + mosquitto__FREE(sub.topic_filter); mosquitto__FREE(reason_codes); return rc; } - rc = sub__remove(context, sub.topic, &reason); - plugin_persist__handle_subscription_delete(context, sub.topic); + rc = sub__remove(context, sub.topic_filter, &reason); + plugin_persist__handle_subscription_delete(context, sub.topic_filter); }else{ rc = MOSQ_ERR_SUCCESS; } - log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub.topic); - mosquitto__FREE(sub.topic); + log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub.topic_filter); + mosquitto__FREE(sub.topic_filter); if(rc){ mosquitto__FREE(reason_codes); return rc; diff --git a/src/persist_read.c b/src/persist_read.c index ed5bf924..668f1db1 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -376,7 +376,7 @@ static int persist__sub_chunk_restore(FILE *db_fptr) } sub.client_id = chunk.client_id; - sub.topic = chunk.topic; + sub.topic_filter = chunk.topic; sub.options = chunk.F.qos | chunk.F.options; sub.identifier = chunk.F.identifier; rc = persist__restore_sub(&sub); @@ -548,7 +548,7 @@ static int persist__restore_sub(const struct mosquitto_subscription *sub) assert(sub); assert(sub->client_id); - assert(sub->topic); + assert(sub->topic_filter); context = persist__find_or_add_context(sub->client_id, 0); if(!context) return 1; diff --git a/src/plugin_persist.c b/src/plugin_persist.c index 1dc2b5e1..382e5fa3 100644 --- a/src/plugin_persist.c +++ b/src/plugin_persist.c @@ -153,7 +153,7 @@ void plugin_persist__handle_subscription_add(struct mosquitto *context, const st opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); event_data.data.client_id = context->id; - event_data.data.topic = sub->topic; + event_data.data.topic_filter = sub->topic_filter; event_data.data.identifier = sub->identifier; event_data.data.options = sub->options; @@ -175,7 +175,7 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, char opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); event_data.data.client_id = context->id; - event_data.data.topic = sub; + event_data.data.topic_filter = sub; DL_FOREACH(opts->plugin_callbacks.persist_subscription_delete, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE, &event_data, cb_base->userdata); diff --git a/src/plugin_public.c b/src/plugin_public.c index acd893fb..6eb110f2 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -713,7 +713,7 @@ BROKER_EXPORT int mosquitto_subscription_add(const struct mosquitto_subscription { struct mosquitto *context; - if(sub == NULL || sub->client_id == NULL || sub->topic == NULL || sub->client_id[0] == '\0' || sub->topic[0] == '\0'){ + if(sub == NULL || sub->client_id == NULL || sub->topic_filter == NULL || sub->client_id[0] == '\0' || sub->topic_filter[0] == '\0'){ return MOSQ_ERR_INVAL; } diff --git a/src/plugin_subscribe.c b/src/plugin_subscribe.c index eb3ae3fc..9f3e2731 100644 --- a/src/plugin_subscribe.c +++ b/src/plugin_subscribe.c @@ -36,7 +36,7 @@ static int plugin__handle_subscribe_single(struct mosquitto__security_options *o options = sub->options &= 0xFC; memset(&event_data, 0, sizeof(event_data)); event_data.client = context; - event_data.topic = sub->topic; + event_data.topic_filter = sub->topic_filter; event_data.qos = qos; event_data.subscription_options = options; event_data.subscription_identifier = sub->identifier; @@ -48,9 +48,9 @@ static int plugin__handle_subscribe_single(struct mosquitto__security_options *o break; } - if(sub->topic != event_data.topic){ - mosquitto__free(sub->topic); - sub->topic = event_data.topic; + if(sub->topic_filter != event_data.topic_filter){ + mosquitto__free(sub->topic_filter); + sub->topic_filter = event_data.topic_filter; } } if(event_data.qos < qos){ diff --git a/src/plugin_unsubscribe.c b/src/plugin_unsubscribe.c index 131c217c..00b4d6b0 100644 --- a/src/plugin_unsubscribe.c +++ b/src/plugin_unsubscribe.c @@ -32,7 +32,7 @@ static int plugin__handle_unsubscribe_single(struct mosquitto__security_options memset(&event_data, 0, sizeof(event_data)); event_data.client = context; - event_data.topic = sub->topic; + event_data.topic_filter = sub->topic_filter; event_data.properties = sub->properties; DL_FOREACH(opts->plugin_callbacks.unsubscribe, cb_base){ @@ -41,9 +41,9 @@ static int plugin__handle_unsubscribe_single(struct mosquitto__security_options break; } - if(sub->topic != event_data.topic){ - mosquitto__free(sub->topic); - sub->topic = event_data.topic; + if(sub->topic_filter != event_data.topic_filter){ + mosquitto__free(sub->topic_filter); + sub->topic_filter = event_data.topic_filter; } } diff --git a/src/retain.c b/src/retain.c index 4dc39046..5d9401cc 100644 --- a/src/retain.c +++ b/src/retain.c @@ -327,11 +327,11 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription assert(context); assert(sub); - if(!strncmp(sub->topic, "$share/", strlen("$share/"))){ + if(!strncmp(sub->topic_filter, "$share/", strlen("$share/"))){ return MOSQ_ERR_SUCCESS; } - rc = sub__topic_tokenise(sub->topic, &local_sub, &split_topics, NULL); + rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &split_topics, NULL); if(rc) return rc; HASH_FIND(hh, db.retains, split_topics[0], strlen(split_topics[0]), retainhier); diff --git a/src/subs.c b/src/subs.c index ebac3d22..20540e13 100644 --- a/src/subs.c +++ b/src/subs.c @@ -168,12 +168,12 @@ static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subsc } leaf = leaf->next; } - leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic) + 1); + leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1); if(!leaf) return MOSQ_ERR_NOMEM; leaf->context = context; leaf->identifier = sub->identifier; leaf->subscription_options = sub->options; - strcpy(leaf->topic_filter, sub->topic); + strcpy(leaf->topic_filter, sub->topic_filter); DL_APPEND(*head, leaf); *newleaf = leaf; @@ -555,9 +555,9 @@ int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub size_t topiclen; assert(sub); - assert(sub->topic); + assert(sub->topic_filter); - rc = sub__topic_tokenise(sub->topic, &local_sub, &topics, &sharename); + rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename); if(rc) return rc; topiclen = strlen(topics[0]); diff --git a/test/broker/c/plugin_evt_subscribe.c b/test/broker/c/plugin_evt_subscribe.c index 1752e9a0..5b135101 100644 --- a/test/broker/c/plugin_evt_subscribe.c +++ b/test/broker/c/plugin_evt_subscribe.c @@ -18,7 +18,7 @@ int callback_subscribe(int event, void *event_data, void *user_data) if(event != MOSQ_EVT_SUBSCRIBE){ abort(); } - ed->topic = mosquitto_strdup("new-topic"); + ed->topic_filter = mosquitto_strdup("new-topic"); ed->qos = 0; return MOSQ_ERR_SUCCESS; diff --git a/test/broker/c/plugin_evt_unsubscribe.c b/test/broker/c/plugin_evt_unsubscribe.c index 84e1b05f..e45d6419 100644 --- a/test/broker/c/plugin_evt_unsubscribe.c +++ b/test/broker/c/plugin_evt_unsubscribe.c @@ -18,7 +18,7 @@ int callback_unsubscribe(int event, void *event_data, void *user_data) if(event != MOSQ_EVT_UNSUBSCRIBE){ abort(); } - ed->topic = mosquitto_strdup("missing-topic"); + ed->topic_filter = mosquitto_strdup("missing-topic"); return MOSQ_ERR_SUCCESS; } diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index a9e783d2..528ebfd7 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -152,7 +152,7 @@ int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub { UNUSED(context); - last_sub = strdup(sub->topic); + last_sub = strdup(sub->topic_filter); last_qos = sub->options & 0x03; last_identifier = sub->identifier; diff --git a/test/unit/subs_test.c b/test/unit/subs_test.c index 2dbbdd4e..f326b347 100644 --- a/test/unit/subs_test.c +++ b/test/unit/subs_test.c @@ -54,7 +54,7 @@ static void TEST_sub_add_single(void) db__open(&config); - sub.topic = "a/b/c/d/e"; + sub.topic_filter = "a/b/c/d/e"; rc = sub__add(&context, &sub); CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS); CU_ASSERT_PTR_NOT_NULL(db.subs);