Fix persist docs

Add mosquitto_persist_client_msg_clear() for completeness.
pull/2485/head
Roger A. Light 4 years ago
parent bf1d39746a
commit a61298d03a

@ -52,7 +52,8 @@ enum mosquitto_protocol {
enum mosquitto_broker_msg_direction {
mosq_bmd_in = 0,
mosq_bmd_out = 1
mosq_bmd_out = 1,
mosq_bmd_all = 2
};
/* =========================================================================
@ -298,7 +299,6 @@ struct mosquitto_evt_persist_base_msg {
* it may change in a future minor release. */
struct mosquitto_evt_persist_retain_msg {
const char *topic;
char *plugin_topic;
uint64_t store_id;
void *future[8];
};
@ -911,16 +911,15 @@ int mosquitto_persist_client_delete(const char *client_id);
* Use to add a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->client_id - the client id of the client that the
* message belongs to.
* client_msg->store_id - the ID of the base message that this client
* message references.
* client_msg->cmsg_id - the client message ID of the new message
* client_msg->mid - the MQTT message ID of the new message
* client_msg->qos - the MQTT QoS of the new message
* client_msg->direction - the direction of the new message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
* of the broker (mosq_bmd_in / mosq_bmd_out)
* client_msg->retain - the retain flag of the message
* client_msg->subscription_identifier - the MQTT v5 subscription identifier,
* for outgoing messages only.
@ -940,14 +939,13 @@ int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *cl
* Use to delete a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->client_id - the client id of the client that the
* message belongs to.
* client_msg->cmsg_id - the client message id of the affected message
* client_msg->mid - the MQTT message id of the affected message
* client_msg->qos - the MQTT QoS of the affected message
* client_msg->direction - the direction of the message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
* of the broker (mosq_bmd_in / mosq_bmd_out)
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
*
@ -964,14 +962,13 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg
* Use to update the state of a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->client_id - the client id of the client that the
* message belongs to.
* client_msg->cmsg_id - the client message id of the affected message
* client_msg->mid - the MQTT message id of the affected message
* client_msg->qos - the MQTT QoS of the affected message
* client_msg->direction - the direction of the message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
* of the broker (mosq_bmd_in / mosq_bmd_out)
* client_msg->state - the new state of the message
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
@ -984,6 +981,26 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg
int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg *client_msg);
/* Function: mosquitto_persist_client_msg_clear
*
* Clear all messages for the listed client and direction.
*
* Parameters:
* client_msg->client_id - the client id of the client that the
* message belongs to.
* client_msg->direction - the direction of the messages to be cleared, from
* the perspective of the broker (mosq_bmd_in / mosq_bmd_out / mosq_bmd_all)
* client_msg->state - the new state of the message
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_msg or client_msg->plugin_client_id is NULL
* MOSQ_ERR_NOT_FOUND - the client is not found
*/
int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persist_client_msg *client_msg);
/* Function: mosquitto_persist_base_msg_add
*
* Use to add a new base message. Any client messages or retained messages

@ -704,38 +704,58 @@ static void db__messages_delete_list(struct mosquitto_client_msg **head)
}
int db__messages_delete_incoming(struct mosquitto *context)
{
if(!context) return MOSQ_ERR_INVAL;
db__messages_delete_list(&context->msgs_in.inflight);
db__messages_delete_list(&context->msgs_in.queued);
context->msgs_in.inflight_bytes = 0;
context->msgs_in.inflight_bytes12 = 0;
context->msgs_in.inflight_count = 0;
context->msgs_in.inflight_count12 = 0;
context->msgs_in.queued_bytes = 0;
context->msgs_in.queued_bytes12 = 0;
context->msgs_in.queued_count = 0;
context->msgs_in.queued_count12 = 0;
plugin_persist__handle_client_msg_clear(context, mosq_md_in);
return MOSQ_ERR_SUCCESS;
}
int db__messages_delete_outgoing(struct mosquitto *context)
{
if(!context) return MOSQ_ERR_INVAL;
db__messages_delete_list(&context->msgs_out.inflight);
db__messages_delete_list(&context->msgs_out.queued);
context->msgs_out.inflight_bytes = 0;
context->msgs_out.inflight_bytes12 = 0;
context->msgs_out.inflight_count = 0;
context->msgs_out.inflight_count12 = 0;
context->msgs_out.queued_bytes = 0;
context->msgs_out.queued_bytes12 = 0;
context->msgs_out.queued_count = 0;
context->msgs_out.queued_count12 = 0;
plugin_persist__handle_client_msg_clear(context, mosq_md_out);
return MOSQ_ERR_SUCCESS;
}
int db__messages_delete(struct mosquitto *context, bool force_free)
{
if(!context) return MOSQ_ERR_INVAL;
if(force_free || context->clean_start || (context->bridge && context->bridge->clean_start)){
db__messages_delete_list(&context->msgs_in.inflight);
db__messages_delete_list(&context->msgs_in.queued);
context->msgs_in.inflight_bytes = 0;
context->msgs_in.inflight_bytes12 = 0;
context->msgs_in.inflight_count = 0;
context->msgs_in.inflight_count12 = 0;
context->msgs_in.queued_bytes = 0;
context->msgs_in.queued_bytes12 = 0;
context->msgs_in.queued_count = 0;
context->msgs_in.queued_count12 = 0;
plugin_persist__handle_client_msg_clear(context, mosq_md_in);
db__messages_delete_incoming(context);
}
if(force_free || (context->bridge && context->bridge->clean_start_local)
|| (context->bridge == NULL && context->clean_start)){
db__messages_delete_list(&context->msgs_out.inflight);
db__messages_delete_list(&context->msgs_out.queued);
context->msgs_out.inflight_bytes = 0;
context->msgs_out.inflight_bytes12 = 0;
context->msgs_out.inflight_count = 0;
context->msgs_out.inflight_count12 = 0;
context->msgs_out.queued_bytes = 0;
context->msgs_out.queued_bytes12 = 0;
context->msgs_out.queued_count = 0;
context->msgs_out.queued_count12 = 0;
plugin_persist__handle_client_msg_clear(context, mosq_md_out);
db__messages_delete_outgoing(context);
}
return MOSQ_ERR_SUCCESS;

@ -735,6 +735,8 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid);
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto *context, bool force_free);
int db__messages_delete_incoming(struct mosquitto *context);
int db__messages_delete_outgoing(struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
int db__message_store(const struct mosquitto *source, struct mosquitto_base_msg *base_msg, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_base_msg **base_msg);

@ -533,13 +533,13 @@ int mosquitto_persist_client_update(struct mosquitto_evt_persist_client *client)
if(client == NULL){
return MOSQ_ERR_INVAL;
}
if(client->plugin_client_id == NULL){
if(client->client_id == NULL){
rc = MOSQ_ERR_INVAL;
goto error;
}
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client->plugin_client_id, strlen(client->plugin_client_id), context);
HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context);
if(context == NULL){
rc = MOSQ_ERR_NOT_FOUND;
goto error;
@ -633,8 +633,10 @@ int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *cl
}
return db__message_insert_outgoing(context, client_msg->cmsg_id, client_msg->mid, client_msg->qos, client_msg->retain,
base_msg, client_msg->subscription_identifier, false, false);
}else{
}else if(client_msg->direction == mosq_md_in){
return db__message_insert_incoming(context, client_msg->cmsg_id, base_msg, false);
}else{
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}
@ -653,8 +655,10 @@ int mosquitto_persist_client_msg_delete(struct mosquitto_evt_persist_client_msg
if(client_msg->direction == mosq_md_out){
return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos);
}else{
}else if(client_msg->direction == mosq_md_in){
return db__message_remove_incoming(context, client_msg->mid);
}else{
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}
@ -673,8 +677,30 @@ int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg
if(client_msg->direction == mosq_md_out){
db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false);
}else{
}else if(client_msg->direction == mosq_md_in){
// FIXME db__message_update_incoming(context, client_msg->mid, client_msg->state, client_msg->qos, false);
}else{
return MOSQ_ERR_INVAL;
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
if(client_msg == NULL || client_msg->client_id == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
}
if(client_msg->direction == mosq_bmd_in || client_msg->direction == mosq_bmd_all){
db__messages_delete_incoming(context);
}else if(client_msg->direction == mosq_bmd_out || client_msg->direction == mosq_bmd_all){
db__messages_delete_outgoing(context);
}
return MOSQ_ERR_SUCCESS;
}

Loading…
Cancel
Save