Rename topic -> topic_filter in events.

pull/2735/head
Roger A. Light 3 years ago
parent b94d0d5c81
commit 3e17494d34

@ -213,7 +213,7 @@ struct mosquitto_evt_disconnect {
struct mosquitto_evt_subscribe {
void *future;
struct mosquitto *client;
char *topic;
char *topic_filter;
const mosquitto_property *properties;
uint32_t subscription_identifier;
uint8_t subscription_options;
@ -227,7 +227,7 @@ struct mosquitto_evt_subscribe {
struct mosquitto_evt_unsubscribe {
void *future;
struct mosquitto *client;
char *topic;
char *topic_filter;
const mosquitto_property *properties;
void *future2[8];
};
@ -270,7 +270,7 @@ struct mosquitto_evt_persist_client {
struct mosquitto_subscription {
char *client_id;
char *topic;
char *topic_filter;
mosquitto_property *properties;
uint32_t identifier;
uint8_t options;

@ -173,7 +173,7 @@ static int callback_subscribe(int event, void *event_data, void *userdata)
/* put the client_id on front of the topic */
/* calculate the length of the new payload */
new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic) + 1;
new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic_filter) + 1;
/* Allocate some memory - use
* mosquitto_calloc/mosquitto_malloc/mosquitto_strdup when allocating, to
@ -184,12 +184,12 @@ static int callback_subscribe(int event, void *event_data, void *userdata)
}
/* prepend the client_id to the subscription */
snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic);
snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic_filter);
/* Assign the new topic to the event data structure. You
* must *not* free the original topic, it will be handled by the
* broker. */
ed->topic = new_sub;
ed->topic_filter = new_sub;
return MOSQ_ERR_SUCCESS;
}
@ -213,7 +213,7 @@ static int callback_unsubscribe(int event, void *event_data, void *userdata)
/* put the client_id on front of the topic */
/* calculate the length of the new payload */
new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic) + 1;
new_sub_len = strlen(client_id) + sizeof('/') + strlen(ed->topic_filter) + 1;
/* Allocate some memory - use
* mosquitto_calloc/mosquitto_malloc/mosquitto_strdup when allocating, to
@ -224,12 +224,12 @@ static int callback_unsubscribe(int event, void *event_data, void *userdata)
}
/* prepend the client_id to the subscription */
snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic);
snprintf(new_sub, new_sub_len, "%s/%s", client_id, ed->topic_filter);
/* Assign the new topic to the event data structure. You
* must *not* free the original topic, it will be handled by the
* broker. */
ed->topic = new_sub;
ed->topic_filter = new_sub;
return MOSQ_ERR_SUCCESS;
}

@ -251,7 +251,7 @@ static int subscription_restore(struct mosquitto_sqlite *ms)
while(sqlite3_step(stmt) == SQLITE_ROW){
memset(&sub, 0, sizeof(sub));
sub.client_id = (char *)sqlite3_column_text(stmt, 0);
sub.topic = (char *)sqlite3_column_text(stmt, 1);
sub.topic_filter = (char *)sqlite3_column_text(stmt, 1);
sub.options = (uint8_t)sqlite3_column_int(stmt, 2);
sub.identifier = (uint32_t)sqlite3_column_int(stmt, 3);

@ -35,7 +35,7 @@ int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userd
ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){
if(sqlite3_bind_text(ms->subscription_add_stmt, 2,
ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){
ed->data.topic_filter, (int)strlen(ed->data.topic_filter), SQLITE_STATIC) == SQLITE_OK){
if(sqlite3_bind_int(ms->subscription_add_stmt, 3,
ed->data.options) == SQLITE_OK){
@ -71,7 +71,7 @@ int persist_sqlite__subscription_remove_cb(int event, void *event_data, void *us
ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){
if(sqlite3_bind_text(ms->subscription_remove_stmt, 2,
ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){
ed->data.topic_filter, (int)strlen(ed->data.topic_filter), SQLITE_STATIC) == SQLITE_OK){
ms->event_count++;
rc = sqlite3_step(ms->subscription_remove_stmt);

@ -492,7 +492,7 @@ int bridge__connect(struct mosquitto *context)
qos = cur_topic->qos;
}
struct mosquitto_subscription sub;
sub.topic = cur_topic->local_topic;
sub.topic_filter = cur_topic->local_topic;
sub.identifier = 0;
sub.options = MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | qos;
if(sub__add(context, &sub) > 0){
@ -695,7 +695,7 @@ int bridge__on_connect(struct mosquitto *context)
struct mosquitto_subscription sub;
memset(&sub, 0, sizeof(sub));
LL_FOREACH(context->bridge->topics, cur_topic){
sub.topic = cur_topic->local_topic;
sub.topic_filter = cur_topic->local_topic;
if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
if(cur_topic->qos > context->max_qos){
sub.options = context->max_qos;

@ -96,36 +96,36 @@ int handle__subscribe(struct mosquitto *context)
memset(&sub, 0, sizeof(sub));
sub.identifier = subscription_identifier;
sub.properties = properties;
if(packet__read_string(&context->in_packet, &sub.topic, &slen)){
if(packet__read_string(&context->in_packet, &sub.topic_filter, &slen)){
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(sub.topic){
if(sub.topic_filter){
if(!slen){
log__printf(NULL, MOSQ_LOG_INFO,
"Empty subscription string from %s, disconnecting.",
context->address);
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(mosquitto_sub_topic_check(sub.topic)){
if(mosquitto_sub_topic_check(sub.topic_filter)){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid subscription string from %s, disconnecting.",
context->address);
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(packet__read_byte(&context->in_packet, &sub.options)){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(sub.options & MQTT_SUB_OPT_NO_LOCAL && !strncmp(sub.topic, "$share/", strlen("$share/"))){
mosquitto__FREE(sub.topic);
if(sub.options & MQTT_SUB_OPT_NO_LOCAL && !strncmp(sub.topic_filter, "$share/", strlen("$share/"))){
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_PROTOCOL;
}
@ -142,7 +142,7 @@ int handle__subscribe(struct mosquitto *context)
retain_handling = MQTT_SUB_OPT_GET_SEND_RETAIN(sub.options);
if(retain_handling == 0x30 || (sub.options & 0xC0) != 0){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
@ -151,7 +151,7 @@ int handle__subscribe(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in subscription command from %s, disconnecting.",
context->address);
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
@ -165,21 +165,21 @@ int handle__subscribe(struct mosquitto *context)
len = strlen(context->listener->mount_point) + slen + 1;
sub_mount = mosquitto__malloc(len+1);
if(!sub_mount){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(payload);
return MOSQ_ERR_NOMEM;
}
snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub.topic);
snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub.topic_filter);
sub_mount[len] = '\0';
mosquitto__FREE(sub.topic);
sub.topic = sub_mount;
mosquitto__FREE(sub.topic_filter);
sub.topic_filter = sub_mount;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub.topic, qos);
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub.topic_filter, qos);
allowed = true;
rc2 = mosquitto_acl_check(context, sub.topic, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE);
rc2 = mosquitto_acl_check(context, sub.topic_filter, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE);
switch(rc2){
case MOSQ_ERR_SUCCESS:
break;
@ -192,20 +192,20 @@ int handle__subscribe(struct mosquitto *context)
}
break;
default:
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
return rc2;
}
if(allowed){
rc2 = plugin__handle_subscribe(context, &sub);
if(rc2){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
return rc2;
}
rc2 = sub__add(context, &sub);
if(rc2 > 0){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
return rc2;
}
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
@ -220,16 +220,16 @@ int handle__subscribe(struct mosquitto *context)
}
}
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub.topic);
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub.topic_filter);
rc = plugin__handle_subscribe(context, &sub);
if(rc){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
return rc;
}
plugin_persist__handle_subscription_add(context, &sub);
}
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
tmp_payload = mosquitto__realloc(payload, payloadlen + 1);
if(tmp_payload){

@ -90,7 +90,7 @@ int handle__unsubscribe(struct mosquitto *context)
while(context->in_packet.pos < context->in_packet.remaining_length){
memset(&sub, 0, sizeof(sub));
sub.properties = properties;
if(packet__read_string(&context->in_packet, &sub.topic, &slen)){
if(packet__read_string(&context->in_packet, &sub.topic_filter, &slen)){
mosquitto__FREE(reason_codes);
return MOSQ_ERR_MALFORMED_PACKET;
}
@ -99,22 +99,22 @@ int handle__unsubscribe(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_INFO,
"Empty unsubscription string from %s, disconnecting.",
context->id);
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(reason_codes);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(mosquitto_sub_topic_check(sub.topic)){
if(mosquitto_sub_topic_check(sub.topic_filter)){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid unsubscription string from %s, disconnecting.",
context->id);
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(reason_codes);
return MOSQ_ERR_MALFORMED_PACKET;
}
/* ACL check */
allowed = true;
rc = mosquitto_acl_check(context, sub.topic, 0, NULL, 0, false, MOSQ_ACL_UNSUBSCRIBE);
rc = mosquitto_acl_check(context, sub.topic_filter, 0, NULL, 0, false, MOSQ_ACL_UNSUBSCRIBE);
switch(rc){
case MOSQ_ERR_SUCCESS:
break;
@ -123,26 +123,26 @@ int handle__unsubscribe(struct mosquitto *context)
reason = MQTT_RC_NOT_AUTHORIZED;
break;
default:
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(reason_codes);
return rc;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub.topic);
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub.topic_filter);
if(allowed){
rc = plugin__handle_unsubscribe(context, &sub);
if(rc){
mosquitto__FREE(sub.topic);
mosquitto__FREE(sub.topic_filter);
mosquitto__FREE(reason_codes);
return rc;
}
rc = sub__remove(context, sub.topic, &reason);
plugin_persist__handle_subscription_delete(context, sub.topic);
rc = sub__remove(context, sub.topic_filter, &reason);
plugin_persist__handle_subscription_delete(context, sub.topic_filter);
}else{
rc = MOSQ_ERR_SUCCESS;
}
log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub.topic);
mosquitto__FREE(sub.topic);
log__printf(NULL, MOSQ_LOG_UNSUBSCRIBE, "%s %s", context->id, sub.topic_filter);
mosquitto__FREE(sub.topic_filter);
if(rc){
mosquitto__FREE(reason_codes);
return rc;

@ -376,7 +376,7 @@ static int persist__sub_chunk_restore(FILE *db_fptr)
}
sub.client_id = chunk.client_id;
sub.topic = chunk.topic;
sub.topic_filter = chunk.topic;
sub.options = chunk.F.qos | chunk.F.options;
sub.identifier = chunk.F.identifier;
rc = persist__restore_sub(&sub);
@ -548,7 +548,7 @@ static int persist__restore_sub(const struct mosquitto_subscription *sub)
assert(sub);
assert(sub->client_id);
assert(sub->topic);
assert(sub->topic_filter);
context = persist__find_or_add_context(sub->client_id, 0);
if(!context) return 1;

@ -153,7 +153,7 @@ void plugin_persist__handle_subscription_add(struct mosquitto *context, const st
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.data.client_id = context->id;
event_data.data.topic = sub->topic;
event_data.data.topic_filter = sub->topic_filter;
event_data.data.identifier = sub->identifier;
event_data.data.options = sub->options;
@ -175,7 +175,7 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, char
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.data.client_id = context->id;
event_data.data.topic = sub;
event_data.data.topic_filter = sub;
DL_FOREACH(opts->plugin_callbacks.persist_subscription_delete, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE, &event_data, cb_base->userdata);

@ -713,7 +713,7 @@ BROKER_EXPORT int mosquitto_subscription_add(const struct mosquitto_subscription
{
struct mosquitto *context;
if(sub == NULL || sub->client_id == NULL || sub->topic == NULL || sub->client_id[0] == '\0' || sub->topic[0] == '\0'){
if(sub == NULL || sub->client_id == NULL || sub->topic_filter == NULL || sub->client_id[0] == '\0' || sub->topic_filter[0] == '\0'){
return MOSQ_ERR_INVAL;
}

@ -36,7 +36,7 @@ static int plugin__handle_subscribe_single(struct mosquitto__security_options *o
options = sub->options &= 0xFC;
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = sub->topic;
event_data.topic_filter = sub->topic_filter;
event_data.qos = qos;
event_data.subscription_options = options;
event_data.subscription_identifier = sub->identifier;
@ -48,9 +48,9 @@ static int plugin__handle_subscribe_single(struct mosquitto__security_options *o
break;
}
if(sub->topic != event_data.topic){
mosquitto__free(sub->topic);
sub->topic = event_data.topic;
if(sub->topic_filter != event_data.topic_filter){
mosquitto__free(sub->topic_filter);
sub->topic_filter = event_data.topic_filter;
}
}
if(event_data.qos < qos){

@ -32,7 +32,7 @@ static int plugin__handle_unsubscribe_single(struct mosquitto__security_options
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = sub->topic;
event_data.topic_filter = sub->topic_filter;
event_data.properties = sub->properties;
DL_FOREACH(opts->plugin_callbacks.unsubscribe, cb_base){
@ -41,9 +41,9 @@ static int plugin__handle_unsubscribe_single(struct mosquitto__security_options
break;
}
if(sub->topic != event_data.topic){
mosquitto__free(sub->topic);
sub->topic = event_data.topic;
if(sub->topic_filter != event_data.topic_filter){
mosquitto__free(sub->topic_filter);
sub->topic_filter = event_data.topic_filter;
}
}

@ -327,11 +327,11 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription
assert(context);
assert(sub);
if(!strncmp(sub->topic, "$share/", strlen("$share/"))){
if(!strncmp(sub->topic_filter, "$share/", strlen("$share/"))){
return MOSQ_ERR_SUCCESS;
}
rc = sub__topic_tokenise(sub->topic, &local_sub, &split_topics, NULL);
rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &split_topics, NULL);
if(rc) return rc;
HASH_FIND(hh, db.retains, split_topics[0], strlen(split_topics[0]), retainhier);

@ -168,12 +168,12 @@ static int sub__add_leaf(struct mosquitto *context, const struct mosquitto_subsc
}
leaf = leaf->next;
}
leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic) + 1);
leaf = mosquitto__calloc(1, sizeof(struct mosquitto__subleaf) + strlen(sub->topic_filter) + 1);
if(!leaf) return MOSQ_ERR_NOMEM;
leaf->context = context;
leaf->identifier = sub->identifier;
leaf->subscription_options = sub->options;
strcpy(leaf->topic_filter, sub->topic);
strcpy(leaf->topic_filter, sub->topic_filter);
DL_APPEND(*head, leaf);
*newleaf = leaf;
@ -555,9 +555,9 @@ int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub
size_t topiclen;
assert(sub);
assert(sub->topic);
assert(sub->topic_filter);
rc = sub__topic_tokenise(sub->topic, &local_sub, &topics, &sharename);
rc = sub__topic_tokenise(sub->topic_filter, &local_sub, &topics, &sharename);
if(rc) return rc;
topiclen = strlen(topics[0]);

@ -18,7 +18,7 @@ int callback_subscribe(int event, void *event_data, void *user_data)
if(event != MOSQ_EVT_SUBSCRIBE){
abort();
}
ed->topic = mosquitto_strdup("new-topic");
ed->topic_filter = mosquitto_strdup("new-topic");
ed->qos = 0;
return MOSQ_ERR_SUCCESS;

@ -18,7 +18,7 @@ int callback_unsubscribe(int event, void *event_data, void *user_data)
if(event != MOSQ_EVT_UNSUBSCRIBE){
abort();
}
ed->topic = mosquitto_strdup("missing-topic");
ed->topic_filter = mosquitto_strdup("missing-topic");
return MOSQ_ERR_SUCCESS;
}

@ -152,7 +152,7 @@ int sub__add(struct mosquitto *context, const struct mosquitto_subscription *sub
{
UNUSED(context);
last_sub = strdup(sub->topic);
last_sub = strdup(sub->topic_filter);
last_qos = sub->options & 0x03;
last_identifier = sub->identifier;

@ -54,7 +54,7 @@ static void TEST_sub_add_single(void)
db__open(&config);
sub.topic = "a/b/c/d/e";
sub.topic_filter = "a/b/c/d/e";
rc = sub__add(&context, &sub);
CU_ASSERT_EQUAL(rc, MOSQ_ERR_SUCCESS);
CU_ASSERT_PTR_NOT_NULL(db.subs);

Loading…
Cancel
Save