Persistence interface documentation, plus better heap/non heap params

Be clear about whether the plugin or the broker owns `plugin_*` memory after function calls are made.
pull/2438/head
Roger A. Light 4 years ago
parent 0f9e5dc65d
commit ff41157c2c

@ -284,6 +284,7 @@ struct mosquitto_evt_persist_msg {
char *plugin_source_id;
char *plugin_source_username;
const mosquitto_property *properties;
mosquitto_property *plugin_properties;
uint32_t payloadlen;
uint16_t source_mid;
uint16_t source_port;
@ -773,19 +774,304 @@ mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result
*/
mosq_EXPORT int mosquitto_broker_node_id_set(uint16_t id);
/* =================================================================
*
* Persistence interface
*
* ================================================================= */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client);
int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client);
/* Function: mosquitto_persist_client_add
*
* Use to add a new client session, in particular when restoring on starting
* the broker.
*
* Parameters:
* client->plugin_client_id - the client id of the client to add
* This must be allocated on the heap and becomes the property of the
* broker immediately this call is made. Must not be NULL.
* client->plugin_username - the username for the client session, or NULL. Must
* be allocated on the heap and becomes the property of the broker
* immediately this call is made.
* client->plugin_auth_method - the MQTT v5 extended authentication method,
* or NULL. Must be allocated on the heap and becomes the property of
* the broker immediately this call is made.
* client->clean_start - the new MQTT clean start parameter
* client->will_delay_time - the actual will delay time for this client
* client->session_expiry_time - the actual session expiry time for this
* client
* client->will_delay_interval - the MQTT v5 will delay interval for this
* client
* client->max_qos - the MQTT v5 maximum QoS parameter for this client
* client->maximum_packet_size - the MQTT v5 maximum packet size parameter
* for this client
* client->retain_available - the MQTT v5 retain available parameter for this
* client
* client->listener_port - the listener port that this client last connected to
*
* All other members of struct mosquitto_evt_persist_client are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client or client->plugin_client_id is NULL, or if a
* client with the same ID already exists.
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_persist_client_add(struct mosquitto_evt_persist_client *client);
/* Function: mosquitto_persist_client_update
*
* Use to update client session parameters
*
* Parameters:
* client->plugin_client_id - the client id of the client to update
* The broker will *not* modify this string and it remains the
* property of the plugin.
* client->username - the new username for the client session, or NULL. Must
* be allocated on the heap and becomes the property of the broker
* immediately this call is made.
* client->clean_start - the new MQTT clean start parameter
* client->will_delay_time - the actual will delay time for this client
* client->session_expiry_time - the actual session expiry time for this
* client
* client->will_delay_interval - the MQTT v5 will delay interval for this
* client
* client->max_qos - the MQTT v5 maximum QoS parameter for this client
* client->maximum_packet_size - the MQTT v5 maximum packet size parameter
* for this client
* client->retain_available - the MQTT v5 retain available parameter for this
* client
* client->listener_port - the listener port that this client last connected to
*
* All other members of struct mosquitto_evt_persist_client are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client or client->plugin_client_id is NULL
* MOSQ_ERR_NOT_FOUND - the client is not found
*/
int mosquitto_persist_client_update(struct mosquitto_evt_persist_client *client);
/* Function: mosquitto_persist_client_remove
*
* Use to remove client session for a client from the broker
*
* Parameters:
* client_id - the client id of the client to remove
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_id is NULL
* MOSQ_ERR_NOT_FOUND - the referenced client is not found
*/
int mosquitto_persist_client_remove(const char *client_id);
int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg);
/* Function: mosquitto_persist_client_msg_add
*
* Use to add a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->store_id - the store ID of the stored message that this client
* message references.
* client_msg->cmsg_id - the client message id of the new message
* client_msg->mid - the MQTT message id of the new message
* client_msg->qos - the MQTT QoS of the new message
* client_msg->direction - the direction of the new message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
* client_msg->retain - the retain flag of the message
* client_msg->subscription_identifier - the MQTT v5 subscription identifier,
* for outgoing messages only.
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_msg or client_msg->plugin_client_id is NULL
* MOSQ_ERR_NOT_FOUND - the client or stored message is not found
*/
int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *client_msg);
/* Function: mosquitto_persist_client_msg_remove
*
* Use to remove a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->cmsg_id - the client message id of the affected message
* client_msg->mid - the MQTT message id of the affected message
* client_msg->qos - the MQTT QoS of the affected message
* client_msg->direction - the direction of the message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_msg or client_msg->plugin_client_id is NULL
* MOSQ_ERR_NOT_FOUND - the client is not found
*/
int mosquitto_persist_client_msg_remove(struct mosquitto_evt_persist_client_msg *client_msg);
/* Function: mosquitto_persist_client_msg_update
*
* Use to update the state of a client message for a particular client.
*
* Parameters:
* client_msg->plugin_client_id - the client id of the client that the
* message belongs to. The broker will *not* modify this string and it
* remains the property of the plugin.
* client_msg->cmsg_id - the client message id of the affected message
* client_msg->mid - the MQTT message id of the affected message
* client_msg->qos - the MQTT QoS of the affected message
* client_msg->direction - the direction of the message from the perspective
* of the broker (mosq_md_in / mosq_md_out)
* client_msg->state - the new state of the message
*
* All other members of struct mosquitto_evt_persist_client_msg are unused.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_msg or client_msg->plugin_client_id is NULL
* MOSQ_ERR_NOT_FOUND - the client is not found
*/
int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg *client_msg);
/* Function: mosquitto_persist_msg_add
*
* Use to add a new stored message. Any client messages or retained message
* refering to this stored message must be added afterwards.
*
* Parameters:
* msg->store_id - the stored message ID
* msg->plugin_source_id - the client id of the client that the
* message originated with, or NULL.
* The broker will *not* modify this string and it remains the
* property of the plugin.
* msg->plugin_source_username - the username of the client that the
* message originated with, or NULL.
* The broker will *not* modify this string and it remains the
* property of the plugin.
* msg->topic - the message topic.
* Must be allocated on the heap and becomes the property of the
* broker immediately this call is made.
* msg->payload - the message payload.
* Must be allocated on the heap and becomes the property of the
* broker immediately this call is made.
* msg->payloadlen - the length of the payload, in bytes
* msg->expiry_time - the time at which the message expires, or 0.
* msg->properties - list of MQTT v5 message properties for this message.
* Must be allocated on the heap and becomes the property of the
* broker immediately this call is made.
* msg->retain - the message retain flag as delivered to the broker
* msg->qos - the message QoS as delivered to the broker
* msg->source_port - the network port number that the originating client was
* connected to, or 0.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_persist_msg_add(struct mosquitto_evt_persist_msg *msg);
/* Function: mosquitto_persist_msg_remove
*
* Use to remove a stored message.
*
* Parameters:
* store_id - the stored message ID
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
*/
int mosquitto_persist_msg_remove(uint64_t store_id);
/* Function: mosquitto_persist_subscription_add
*
* Use to add a new subscription for a client
*
* Parameters:
* client_id - the client id of the client the new subscription is for
* topic - the topic filter for the subscription
* subscription_options - the QoS and other flags for this subscription
* subscription_identifier - the MQTT v5 subscription id, or 0
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_id or topic are NULL, or are zero length
* MOSQ_ERR_NOT_FOUND - the referenced client was not found
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t subscription_options, uint32_t subscription_identifier);
/* Function: mosquitto_persist_subscription_remove
*
* Use to remove a subscription for a client
*
* Parameters:
* client_id - the client id of the client the new subscription is for
* topic - the topic filter for the subscription
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if client_id or topic are NULL, or are zero length
* MOSQ_ERR_NOT_FOUND - the referenced client was not found
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_subscription_remove(const char *client_id, const char *topic);
/* Function: mosquitto_persist_retain_add
*
* Use to add a retained message. It is not required to remove a retained
* message for an existing topic first.
*
* Parameters:
* msg->plugin_topic - the topic that the message references
* The broker will *not* modify this string and it remains the
* property of the plugin.
* msg->store_id - the store id of the stored message that is to be retained
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if msg or msg->plugin_topic are NULL
* MOSQ_ERR_NOT_FOUND - the referenced stored message was not found
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *retain);
/* Function: mosquitto_persist_retain_remove
*
* Use to remove a retained message.
*
* Parameters:
* msg->plugin_topic - the topic that the message references
* The broker will *not* modify this string and it remains the
* property of the plugin.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if msg or msg->plugin_topic are NULL
* MOSQ_ERR_NOMEM - on out of memory
*/
int mosquitto_persist_retain_remove(const char *topic);
#ifdef __cplusplus

@ -386,44 +386,40 @@ int mosquitto_kick_client_by_username(const char *username, bool with_will)
}
int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client)
int mosquitto_persist_client_add(struct mosquitto_evt_persist_client *client)
{
struct mosquitto *context;
int i;
int rc;
if(client == NULL || client->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
if(client == NULL){
return MOSQ_ERR_INVAL;
}
if(client->plugin_client_id == NULL){
rc = MOSQ_ERR_INVAL;
goto error;
}
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client->plugin_client_id, strlen(client->plugin_client_id), context);
if(context){
return MOSQ_ERR_INVAL;
rc = MOSQ_ERR_INVAL;
goto error;
}
context = context__init();
if(!context) return MOSQ_ERR_NOMEM;
context->id = mosquitto__strdup(client->plugin_client_id);
if(client->plugin_username){
context->username = mosquitto__strdup(client->plugin_username);
if(!context->username){
mosquitto__free(context->id);
mosquitto__free(context);
return MOSQ_ERR_NOMEM;
}
}else{
context->username = NULL;
}
if(client->auth_method){
context->auth_method = mosquitto__strdup(client->plugin_auth_method);
if(!context->auth_method){
mosquitto__free(context->username);
mosquitto__free(context->id);
mosquitto__free(context);
return MOSQ_ERR_NOMEM;
}
}else{
context->auth_method = NULL;
if(!context){
rc = MOSQ_ERR_NOMEM;
goto error;
}
context->id = client->plugin_client_id;
client->plugin_client_id = NULL;
context->username = client->plugin_username;
client->plugin_username = NULL;
context->auth_method = client->plugin_auth_method;
client->plugin_auth_method = NULL;
context->clean_start = false;
context->will_delay_time = client->will_delay_time;
context->session_expiry_time = client->session_expiry_time;
@ -446,34 +442,39 @@ int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *clie
context__add_to_by_id(context);
return MOSQ_ERR_SUCCESS;
error:
free(client->plugin_client_id);
free(client->plugin_username);
free(client->plugin_auth_method);
return rc;
}
int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client)
int mosquitto_persist_client_update(struct mosquitto_evt_persist_client *client)
{
struct mosquitto *context;
int i;
char *username;
int rc;
if(client == NULL || client->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
if(client == NULL){
return MOSQ_ERR_INVAL;
}
if(client->plugin_client_id == NULL){
rc = MOSQ_ERR_INVAL;
goto error;
}
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client->plugin_client_id, strlen(client->plugin_client_id), context);
if(context == NULL){
return MOSQ_ERR_INVAL;
rc = MOSQ_ERR_NOT_FOUND;
goto error;
}
if(client->plugin_username){
username = mosquitto__strdup(client->plugin_username);
if(!username){
return MOSQ_ERR_NOMEM;
}
mosquitto_free(context->username);
context->username = username;
}else{
mosquitto_free(context->username);
context->username = NULL;
}
mosquitto_free(context->username);
context->username = client->plugin_username;
client->plugin_username = NULL;
context->clean_start = false;
context->will_delay_time = client->will_delay_time;
context->session_expiry_time = client->session_expiry_time;
@ -494,6 +495,9 @@ int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *c
}
return MOSQ_ERR_SUCCESS;
error:
free(client->plugin_username);
return rc;
}
@ -530,14 +534,13 @@ struct mosquitto_msg_store *find_store_msg(uint64_t store_id)
return stored;
}
int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg)
int mosquitto_persist_client_msg_add(struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
struct mosquitto_msg_store *stored;
if(client_msg == NULL || client_msg->plugin_client_id == NULL){
return MOSQ_ERR_INVAL;
}
if(client_msg == NULL || client_msg->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh_id, db.contexts_by_id, client_msg->plugin_client_id, strlen(client_msg->plugin_client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
@ -560,27 +563,18 @@ int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_m
}
int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg)
int mosquitto_persist_client_msg_remove(struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
struct mosquitto_msg_store *stored;
if(client_msg == NULL || client_msg->plugin_client_id == NULL){
return MOSQ_ERR_INVAL;
}
if(client_msg == NULL || client_msg->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh_id, db.contexts_by_id, client_msg->plugin_client_id, strlen(client_msg->plugin_client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
}
stored = find_store_msg(client_msg->store_id);
if(stored == NULL){
return MOSQ_ERR_NOT_FOUND;
}
if(client_msg->direction == mosq_md_out){
if(client_msg->qos > 0){
context->last_mid = client_msg->mid;
}
return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos);
}else{
return db__message_remove_incoming(context, client_msg->mid);
@ -589,13 +583,12 @@ int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_clien
}
int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg)
int mosquitto_persist_client_msg_update(struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
if(client_msg == NULL || client_msg->plugin_client_id == NULL){
return MOSQ_ERR_INVAL;
}
if(client_msg == NULL || client_msg->plugin_client_id == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh_id, db.contexts_by_id, client_msg->plugin_client_id, strlen(client_msg->plugin_client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
@ -603,6 +596,8 @@ int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_clien
if(client_msg->direction == mosq_md_out){
db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false);
}else{
// FIXME db__message_update_incoming(context, client_msg->mid, client_msg->state, client_msg->qos, false);
}
return MOSQ_ERR_SUCCESS;
}
@ -621,7 +616,7 @@ int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t
if(context){
return sub__add(context, topic, subscription_options&0x03, subscription_identifier, subscription_options, &db.subs);
}else{
return MOSQ_ERR_INVAL;
return MOSQ_ERR_NOT_FOUND;
}
}
@ -640,12 +635,12 @@ int mosquitto_subscription_remove(const char *client_id, const char *topic)
if(context){
return sub__remove(context, topic, db.subs, &reason);
}else{
return MOSQ_ERR_INVAL;
return MOSQ_ERR_NOT_FOUND;
}
}
int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg)
int mosquitto_persist_msg_add(struct mosquitto_evt_persist_msg *msg)
{
struct mosquitto context;
struct mosquitto_msg_store *stored;
@ -654,16 +649,9 @@ int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg)
int i;
memset(&context, 0, sizeof(context));
memset(&stored, 0, sizeof(stored));
if(msg->plugin_source_id){
context.id = mosquitto__strdup(msg->plugin_source_id);
if(!context.id) return MOSQ_ERR_NOMEM;
}
if(msg->plugin_source_username){
context.username = mosquitto__strdup(msg->plugin_source_username);
if(!context.username) return MOSQ_ERR_NOMEM;
}
context.id = msg->plugin_source_id;
context.username = msg->plugin_source_username;
if(msg->expiry_time == 0){
message_expiry_interval = 0;
@ -687,18 +675,12 @@ int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg)
stored->qos = msg->qos;
stored->retain = msg->retain;
stored->payload = mosquitto_malloc(stored->payloadlen+1);
if(stored->payload == NULL){
goto error;
}
memcpy(stored->payload, msg->plugin_payload, stored->payloadlen);
((uint8_t *)stored->payload)[stored->payloadlen] = 0; /* Always zero terminate */
stored->topic = mosquitto_strdup(msg->plugin_topic);
if(stored->topic == NULL){
goto error;
}
stored->properties = NULL; /* FIXME */
stored->payload = msg->plugin_payload;
msg->plugin_payload = NULL;
stored->topic = msg->plugin_topic;
msg->plugin_topic = NULL;
stored->properties = msg->plugin_properties;
msg->plugin_properties = NULL;
if(msg->source_port){
for(i=0; i<db.config->listener_count; i++){
@ -711,12 +693,11 @@ int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg)
return db__message_store(&context, stored, message_expiry_interval, msg->store_id, mosq_mo_broker);
error:
if(stored){
mosquitto_property_free_all(&stored->properties);
mosquitto_free(stored->topic);
mosquitto_free(stored->payload);
mosquitto_free(stored);
}
mosquitto_property_free_all(&msg->plugin_properties);
mosquitto_free(msg->plugin_topic);
mosquitto_free(msg->plugin_payload);
mosquitto_free(stored);
return MOSQ_ERR_NOMEM;
}

@ -78,13 +78,13 @@ int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *msg)
char **split_topics = NULL;
char *local_topic = NULL;
if(msg == NULL || msg->topic == NULL) return MOSQ_ERR_INVAL;
if(msg == NULL || msg->plugin_topic == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh, db.msg_store, &msg->store_id, sizeof(msg->store_id), stored);
if(stored){
if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
if(sub__topic_tokenise(msg->plugin_topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
rc = retain__store(msg->topic, stored, split_topics, false);
rc = retain__store(msg->plugin_topic, stored, split_topics, false);
mosquitto__free(split_topics);
mosquitto__free(local_topic);
}

Loading…
Cancel
Save