diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index c1528094..19f9ab94 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -52,7 +52,8 @@ enum mosquitto_protocol { enum mosquitto_broker_msg_direction { mosq_bmd_in = 0, - mosq_bmd_out = 1 + mosq_bmd_out = 1, + mosq_bmd_all = 2 }; /* ========================================================================= @@ -298,7 +299,6 @@ struct mosquitto_evt_persist_base_msg { * it may change in a future minor release. */ struct mosquitto_evt_persist_retain_msg { const char *topic; - char *plugin_topic; uint64_t store_id; void *future[8]; }; @@ -911,16 +911,15 @@ int mosquitto_persist_client_delete(const char *client_id); * Use to add a client message for a particular client. * * Parameters: - * client_msg->plugin_client_id - the client id of the client that the - * message belongs to. The broker will *not* modify this string and it - * remains the property of the plugin. + * client_msg->client_id - the client id of the client that the + * message belongs to. * client_msg->store_id - the ID of the base message that this client * message references. * client_msg->cmsg_id - the client message ID of the new message * client_msg->mid - the MQTT message ID of the new message * client_msg->qos - the MQTT QoS of the new message * client_msg->direction - the direction of the new message from the perspective - * of the broker (mosq_md_in / mosq_md_out) + * of the broker (mosq_bmd_in / mosq_bmd_out) * client_msg->retain - the retain flag of the message * client_msg->subscription_identifier - the MQTT v5 subscription identifier, * for outgoing messages only. @@ -940,14 +939,13 @@ int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *cl * Use to delete a client message for a particular client. * * Parameters: - * client_msg->plugin_client_id - the client id of the client that the - * message belongs to. The broker will *not* modify this string and it - * remains the property of the plugin. + * client_msg->client_id - the client id of the client that the + * message belongs to. * client_msg->cmsg_id - the client message id of the affected message * client_msg->mid - the MQTT message id of the affected message * client_msg->qos - the MQTT QoS of the affected message * client_msg->direction - the direction of the message from the perspective - * of the broker (mosq_md_in / mosq_md_out) + * of the broker (mosq_bmd_in / mosq_bmd_out) * * All other members of struct mosquitto_evt_persist_client_msg are unused. * @@ -964,14 +962,13 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg * Use to update the state of a client message for a particular client. * * Parameters: - * client_msg->plugin_client_id - the client id of the client that the - * message belongs to. The broker will *not* modify this string and it - * remains the property of the plugin. + * client_msg->client_id - the client id of the client that the + * message belongs to. * client_msg->cmsg_id - the client message id of the affected message * client_msg->mid - the MQTT message id of the affected message * client_msg->qos - the MQTT QoS of the affected message * client_msg->direction - the direction of the message from the perspective - * of the broker (mosq_md_in / mosq_md_out) + * of the broker (mosq_bmd_in / mosq_bmd_out) * client_msg->state - the new state of the message * * All other members of struct mosquitto_evt_persist_client_msg are unused. @@ -984,6 +981,26 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg *client_msg); +/* Function: mosquitto_persist_client_msg_clear + * + * Clear all messages for the listed client and direction. + * + * Parameters: + * client_msg->client_id - the client id of the client that the + * message belongs to. + * client_msg->direction - the direction of the messages to be cleared, from + * the perspective of the broker (mosq_bmd_in / mosq_bmd_out / mosq_bmd_all) + * client_msg->state - the new state of the message + * + * All other members of struct mosquitto_evt_persist_client_msg are unused. + * + * Returns: + * MOSQ_ERR_SUCCESS - on success + * MOSQ_ERR_INVAL - if client_msg or client_msg->plugin_client_id is NULL + * MOSQ_ERR_NOT_FOUND - the client is not found + */ +int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persist_client_msg *client_msg); + /* Function: mosquitto_persist_base_msg_add * * Use to add a new base message. Any client messages or retained messages diff --git a/src/database.c b/src/database.c index bd0c8f41..af9b53ce 100644 --- a/src/database.c +++ b/src/database.c @@ -704,38 +704,58 @@ static void db__messages_delete_list(struct mosquitto_client_msg **head) } +int db__messages_delete_incoming(struct mosquitto *context) +{ + if(!context) return MOSQ_ERR_INVAL; + + db__messages_delete_list(&context->msgs_in.inflight); + db__messages_delete_list(&context->msgs_in.queued); + context->msgs_in.inflight_bytes = 0; + context->msgs_in.inflight_bytes12 = 0; + context->msgs_in.inflight_count = 0; + context->msgs_in.inflight_count12 = 0; + context->msgs_in.queued_bytes = 0; + context->msgs_in.queued_bytes12 = 0; + context->msgs_in.queued_count = 0; + context->msgs_in.queued_count12 = 0; + plugin_persist__handle_client_msg_clear(context, mosq_md_in); + + return MOSQ_ERR_SUCCESS; +} + + +int db__messages_delete_outgoing(struct mosquitto *context) +{ + if(!context) return MOSQ_ERR_INVAL; + + db__messages_delete_list(&context->msgs_out.inflight); + db__messages_delete_list(&context->msgs_out.queued); + context->msgs_out.inflight_bytes = 0; + context->msgs_out.inflight_bytes12 = 0; + context->msgs_out.inflight_count = 0; + context->msgs_out.inflight_count12 = 0; + context->msgs_out.queued_bytes = 0; + context->msgs_out.queued_bytes12 = 0; + context->msgs_out.queued_count = 0; + context->msgs_out.queued_count12 = 0; + plugin_persist__handle_client_msg_clear(context, mosq_md_out); + + return MOSQ_ERR_SUCCESS; +} + + int db__messages_delete(struct mosquitto *context, bool force_free) { if(!context) return MOSQ_ERR_INVAL; if(force_free || context->clean_start || (context->bridge && context->bridge->clean_start)){ - db__messages_delete_list(&context->msgs_in.inflight); - db__messages_delete_list(&context->msgs_in.queued); - context->msgs_in.inflight_bytes = 0; - context->msgs_in.inflight_bytes12 = 0; - context->msgs_in.inflight_count = 0; - context->msgs_in.inflight_count12 = 0; - context->msgs_in.queued_bytes = 0; - context->msgs_in.queued_bytes12 = 0; - context->msgs_in.queued_count = 0; - context->msgs_in.queued_count12 = 0; - plugin_persist__handle_client_msg_clear(context, mosq_md_in); + db__messages_delete_incoming(context); } if(force_free || (context->bridge && context->bridge->clean_start_local) || (context->bridge == NULL && context->clean_start)){ - db__messages_delete_list(&context->msgs_out.inflight); - db__messages_delete_list(&context->msgs_out.queued); - context->msgs_out.inflight_bytes = 0; - context->msgs_out.inflight_bytes12 = 0; - context->msgs_out.inflight_count = 0; - context->msgs_out.inflight_count12 = 0; - context->msgs_out.queued_bytes = 0; - context->msgs_out.queued_bytes12 = 0; - context->msgs_out.queued_count = 0; - context->msgs_out.queued_count12 = 0; - plugin_persist__handle_client_msg_clear(context, mosq_md_out); + db__messages_delete_outgoing(context); } return MOSQ_ERR_SUCCESS; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index ecb9d8e8..22e3988e 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -735,6 +735,8 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid); int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto *context, bool force_free); +int db__messages_delete_incoming(struct mosquitto *context); +int db__messages_delete_outgoing(struct mosquitto *context); int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); int db__message_store(const struct mosquitto *source, struct mosquitto_base_msg *base_msg, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_base_msg **base_msg); diff --git a/src/plugin_public.c b/src/plugin_public.c index 25f4f4ba..50ef4649 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -533,13 +533,13 @@ int mosquitto_persist_client_update(struct mosquitto_evt_persist_client *client) if(client == NULL){ return MOSQ_ERR_INVAL; } - if(client->plugin_client_id == NULL){ + if(client->client_id == NULL){ rc = MOSQ_ERR_INVAL; goto error; } context = NULL; - HASH_FIND(hh_id, db.contexts_by_id, client->plugin_client_id, strlen(client->plugin_client_id), context); + HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context); if(context == NULL){ rc = MOSQ_ERR_NOT_FOUND; goto error; @@ -633,8 +633,10 @@ int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *cl } return db__message_insert_outgoing(context, client_msg->cmsg_id, client_msg->mid, client_msg->qos, client_msg->retain, base_msg, client_msg->subscription_identifier, false, false); - }else{ + }else if(client_msg->direction == mosq_md_in){ return db__message_insert_incoming(context, client_msg->cmsg_id, base_msg, false); + }else{ + return MOSQ_ERR_INVAL; } return MOSQ_ERR_SUCCESS; } @@ -653,8 +655,10 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg if(client_msg->direction == mosq_md_out){ return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos); - }else{ + }else if(client_msg->direction == mosq_md_in){ return db__message_remove_incoming(context, client_msg->mid); + }else{ + return MOSQ_ERR_INVAL; } return MOSQ_ERR_SUCCESS; } @@ -673,8 +677,30 @@ int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg if(client_msg->direction == mosq_md_out){ db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false); - }else{ + }else if(client_msg->direction == mosq_md_in){ // FIXME db__message_update_incoming(context, client_msg->mid, client_msg->state, client_msg->qos, false); + }else{ + return MOSQ_ERR_INVAL; + } + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persist_client_msg *client_msg) +{ + struct mosquitto *context; + + if(client_msg == NULL || client_msg->client_id == NULL) return MOSQ_ERR_INVAL; + + HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + if(context == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + + if(client_msg->direction == mosq_bmd_in || client_msg->direction == mosq_bmd_all){ + db__messages_delete_incoming(context); + }else if(client_msg->direction == mosq_bmd_out || client_msg->direction == mosq_bmd_all){ + db__messages_delete_outgoing(context); } return MOSQ_ERR_SUCCESS; }