Plugin persistence interface.

pull/2438/head
Roger A. Light 4 years ago
parent cfe078cd32
commit 0f8733627a

@ -105,11 +105,12 @@ ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count)
return 0;
}
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics)
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist)
{
UNUSED(topic);
UNUSED(stored);
UNUSED(split_topics);
UNUSED(persist);
return 0;
}

@ -185,6 +185,16 @@ struct mosquitto_message{
struct mosquitto;
typedef struct mqtt5__property mosquitto_property;
struct mosquitto_message_v5{
void *payload;
char *topic;
mosquitto_property *properties;
uint32_t payloadlen;
uint8_t qos;
bool retain;
uint8_t padding[2];
};
/*
* Topic: Threads
* libmosquitto provides thread safe operation, with the exception of

@ -39,6 +39,8 @@ extern "C" {
#include <stdint.h>
#include <time.h>
#include <mosquitto.h>
struct mosquitto;
typedef struct mqtt5__property mosquitto_property;
@ -48,6 +50,11 @@ enum mosquitto_protocol {
mp_websockets
};
enum mosquitto_broker_msg_direction {
mosq_bmd_in = 0,
mosq_bmd_out = 1
};
/* =========================================================================
*
* Section: Register callbacks.
@ -67,6 +74,22 @@ enum mosquitto_plugin_event {
MOSQ_EVT_TICK = 9,
MOSQ_EVT_DISCONNECT = 10,
MOSQ_EVT_CONNECT = 11,
MOSQ_EVT_PERSIST_RESTORE = 12,
MOSQ_EVT_PERSIST_CONFIG_ADD = 13,
MOSQ_EVT_PERSIST_MSG_ADD = 14,
MOSQ_EVT_PERSIST_MSG_REMOVE = 15,
MOSQ_EVT_PERSIST_MSG_LOAD = 16,
MOSQ_EVT_PERSIST_RETAIN_ADD = 17,
MOSQ_EVT_PERSIST_RETAIN_REMOVE = 18,
MOSQ_EVT_PERSIST_CLIENT_ADD = 19,
MOSQ_EVT_PERSIST_CLIENT_REMOVE = 20,
MOSQ_EVT_PERSIST_CLIENT_UPDATE = 21,
MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 22,
MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE = 23,
MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 24,
MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE = 25,
MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26,
MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 27,
};
/* Data for the MOSQ_EVT_RELOAD event */
@ -179,6 +202,95 @@ struct mosquitto_evt_disconnect {
void *future2[4];
};
/* Data for the MOSQ_EVT_PERSIST_RESTORE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_restore {
void *future[8];
};
/* Data for the MOSQ_EVT_PERSIST_CLIENT_ADD/_REMOVE/_UPDATE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_client {
const char *client_id;
const char *username;
const struct mosquitto_message_v5 *will;
time_t will_delay_time; /* update */
time_t session_expiry_time; /* update */
uint32_t will_delay_interval;
uint32_t session_expiry_interval;
uint32_t max_packet_size;
uint16_t listener_port;
uint8_t max_qos;
bool retain_available;
uint8_t padding[6];
void *future[8];
};
/* Data for the MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD/_REMOVE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_subscription {
const char *client_id;
const char *topic;
uint32_t subscription_identifier;
uint8_t subscription_options;
uint8_t padding[3];
void *future[8];
};
/* Data for the MOSQ_EVT_PERSIST_CLIENT_MSG_ADD/_REMOVE/_UPDATE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_client_msg {
const char *client_id;
uint64_t cmsg_id;
uint64_t store_id;
uint32_t subscription_identifier;
uint16_t mid;
uint8_t qos;
bool retain;
bool dup; /* add, update */
uint8_t direction;
uint8_t state; /* add, update */
uint8_t padding[5];
void *future[8];
};
/* Data for the MOSQ_EVT_PERSIST_MSG_ADD/_REMOVE/_LOAD event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_msg {
uint64_t store_id;
int64_t expiry_time;
const char *topic;
const void *payload;
const char *source_id;
const char *source_username;
const mosquitto_property *properties;
uint32_t payloadlen;
uint16_t source_mid;
uint16_t source_port;
uint8_t qos;
bool retain;
uint8_t padding[6];
void *future[8];
};
/* Data for the MOSQ_EVT_PERSIST_RETAIN/_REMOVE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_retain {
const char *topic;
uint64_t store_id;
void *future[8];
};
/* Callback definition */
typedef int (*MOSQ_FUNC_generic_callback)(int, void *, void *);
@ -648,6 +760,21 @@ mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result
*/
mosq_EXPORT int mosquitto_broker_node_id_set(uint16_t id);
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client);
int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client);
int mosquitto_persist_client_remove(const char *client_id);
int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg);
int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg);
int mosquitto_persist_msg_remove(uint64_t store_id);
int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t subscription_options, uint32_t subscription_identifier);
int mosquitto_subscription_remove(const char *client_id, const char *topic);
int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *retain);
int mosquitto_persist_retain_remove(const char *topic);
#ifdef __cplusplus
}
#endif

@ -94,7 +94,7 @@ int handle__pubrec(struct mosquitto *mosq)
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", SAFE_PRINT(mosq->id), mid);
if(reason_code < 0x80){
rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2);
rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2, true);
}else{
return db__message_delete_outgoing(mosq, mid, mosq_ms_wait_for_pubrec, 2);
}

@ -44,7 +44,7 @@ set (MOSQ_SRCS
persist_write_v5.c persist_write.c
persist.h
plugin_callbacks.c plugin_v5.c plugin_v4.c plugin_v3.c plugin_v2.c
plugin_init.c plugin_cleanup.c
plugin_init.c plugin_cleanup.c plugin_persist.c
plugin_acl_check.c plugin_basic_auth.c plugin_connect.c plugin_disconnect.c
plugin_extended_auth.c plugin_message.c plugin_psk_key.c plugin_public.c
plugin_tick.c

@ -68,6 +68,7 @@ OBJS= mosquitto.o \
plugin_extended_auth.o \
plugin_init.o \
plugin_message.o \
plugin_persist.o \
plugin_psk_key.o \
plugin_public.o \
plugin_tick.o \
@ -303,6 +304,9 @@ plugin_init.o : plugin_init.c ../include/mosquitto_plugin.h mosquitto_broker_int
plugin_message.o : plugin_message.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_persist.o : plugin_persist.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@
plugin_psk_key.o : plugin_psk_key.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@

@ -234,6 +234,7 @@ void context__disconnect(struct mosquitto *context)
context__send_will(context);
net__socket_close(context);
if(context->session_expiry_interval == 0){
plugin_persist__handle_client_remove(context);
/* Client session is due to be expired now */
#ifdef WITH_BRIDGE
if(context->bridge == NULL)

@ -275,13 +275,15 @@ void db__msg_store_free(struct mosquitto_msg_store *store)
mosquitto__free(store);
}
void db__msg_store_remove(struct mosquitto_msg_store *store)
void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify)
{
if(store == NULL) return;
HASH_DELETE(hh, db.msg_store, store);
db.msg_store_count--;
db.msg_store_bytes -= store->payloadlen;
if(db.shutdown == false || notify == false){
plugin_persist__handle_msg_remove(store);
}
db__msg_store_free(store);
}
@ -291,7 +293,7 @@ void db__msg_store_clean(void)
struct mosquitto_msg_store *store, *store_tmp;
HASH_ITER(hh, db.msg_store, store, store_tmp){
db__msg_store_remove(store);
db__msg_store_remove(store, false);
}
}
@ -304,7 +306,7 @@ void db__msg_store_ref_dec(struct mosquitto_msg_store **store)
{
(*store)->ref_count--;
if((*store)->ref_count == 0){
db__msg_store_remove(*store);
db__msg_store_remove(*store, true);
*store = NULL;
}
}
@ -316,18 +318,20 @@ void db__msg_store_compact(void)
HASH_ITER(hh, db.msg_store, store, store_tmp){
if(store->ref_count < 1){
db__msg_store_remove(store);
db__msg_store_remove(store, true);
}
}
}
static void db__message_remove(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
static void db__message_remove(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
{
if(!msg_data || !item){
if(!context || !msg_data || !item){
return;
}
plugin_persist__handle_client_msg_remove(context, item);
DL_DELETE(msg_data->inflight, item);
if(item->store){
db__msg_remove_from_inflight_stats(msg_data, item);
@ -372,7 +376,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
return MOSQ_ERR_PROTOCOL;
}
msg_index--;
db__message_remove(&context->msgs_out, tail);
db__message_remove(context, &context->msgs_out, tail);
break;
}
}
@ -394,6 +398,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
tail->state = mosq_ms_publish_qos2;
break;
}
plugin_persist__handle_client_msg_update(context, tail);
db__message_dequeue_first(context, &context->msgs_out);
}
#ifdef WITH_PERSISTENCE
@ -405,7 +410,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
/* Only for QoS 2 messages */
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored)
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored, bool persist)
{
struct mosquitto_client_msg *msg;
struct mosquitto_msg_data *msg_data;
@ -476,13 +481,18 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
db__msg_add_to_inflight_stats(msg_data, msg);
}
if(persist){
plugin_persist__handle_msg_add(msg->store);
plugin_persist__handle_client_msg_add(context, msg);
}
if(msg->store->qos > 0){
util__decrement_receive_quota(context);
}
return rc;
}
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update)
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist)
{
struct mosquitto_client_msg *msg;
struct mosquitto_msg_data *msg_data;
@ -613,6 +623,11 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
db__msg_add_to_inflight_stats(msg_data, msg);
}
if(persist){
plugin_persist__handle_msg_add(msg->store);
plugin_persist__handle_client_msg_add(context, msg);
}
if(db.config->allow_duplicate_messages == false && retain == false){
/* Record which client ids this message has been sent to so we can avoid duplicates.
* Outgoing messages only.
@ -656,7 +671,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
return rc;
}
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos)
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
{
struct mosquitto_client_msg *tail;
@ -666,6 +681,9 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo
return MOSQ_ERR_PROTOCOL;
}
tail->state = state;
if(persist){
plugin_persist__handle_client_msg_update(context, tail);
}
return MOSQ_ERR_SUCCESS;
}
}
@ -933,6 +951,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
}
break;
}
plugin_persist__handle_client_msg_update(context, msg);
}
/* Messages received when the client was disconnected are put
* in the mosq_ms_queued state. If we don't change them to the
@ -955,6 +974,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
break;
}
db__message_dequeue_first(context, &context->msgs_out);
plugin_persist__handle_client_msg_update(context, msg);
}
}
@ -986,7 +1006,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
if(msg->qos != 2){
/* Anything <QoS 2 can be completely retried by the client at
* no harm. */
db__message_remove(&context->msgs_in, msg);
db__message_remove(context, &context->msgs_in, msg);
}else{
/* Message state can be preserved here because it should match
* whatever the client has got. */
@ -1014,6 +1034,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
break;
}
db__message_dequeue_first(context, &context->msgs_in);
plugin_persist__handle_client_msg_update(context, msg);
}
}
@ -1042,7 +1063,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid)
if(tail->store->qos != 2){
return MOSQ_ERR_PROTOCOL;
}
db__message_remove(&context->msgs_in, tail);
db__message_remove(context, &context->msgs_in, tail);
return MOSQ_ERR_SUCCESS;
}
}
@ -1078,12 +1099,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
* keep resending it. That means we don't send it to other
* clients. */
if(topic == NULL){
db__message_remove(&context->msgs_in, tail);
db__message_remove(context, &context->msgs_in, tail);
deleted = true;
}else{
rc = sub__messages_queue(source_id, topic, 2, retain, &tail->store);
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
db__message_remove(&context->msgs_in, tail);
db__message_remove(context, &context->msgs_in, tail);
deleted = true;
}else{
return 1;
@ -1103,6 +1124,7 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
send__pubrec(context, tail->mid, 0, NULL);
tail->state = mosq_ms_wait_for_pubrel;
db__message_dequeue_first(context, &context->msgs_in);
plugin_persist__handle_client_msg_update(context, tail);
}
}
if(deleted){
@ -1133,7 +1155,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
if(msg->direction == mosq_md_out && msg->qos > 0){
util__increment_send_quota(context);
}
db__message_remove(&context->msgs_out, msg);
db__message_remove(context, &context->msgs_out, msg);
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(msg->store->message_expiry_time - db.now_real_s);
@ -1153,7 +1175,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(&context->msgs_out, msg);
db__message_remove(context, &context->msgs_out, msg);
}else{
return rc;
}
@ -1165,7 +1187,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_puback;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(&context->msgs_out, msg);
db__message_remove(context, &context->msgs_out, msg);
}else{
return rc;
}
@ -1177,7 +1199,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_pubrec;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(&context->msgs_out, msg);
db__message_remove(context, &context->msgs_out, msg);
}else{
return rc;
}
@ -1288,7 +1310,9 @@ int db__message_write_queued_in(struct mosquitto *context)
rc = send__pubrec(context, tail->mid, 0, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubrel;
plugin_persist__handle_client_msg_update(context, tail);
}else{
plugin_persist__handle_client_msg_update(context, tail);
return rc;
}
}
@ -1322,6 +1346,7 @@ int db__message_write_queued_out(struct mosquitto *context)
break;
}
db__message_dequeue_first(context, &context->msgs_out);
plugin_persist__handle_client_msg_update(context, tail);
}
return MOSQ_ERR_SUCCESS;
}

@ -185,6 +185,8 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
if(context->clean_start == true){
sub__clean_session(found_context);
found_context->session_expiry_interval = 0;
plugin_persist__handle_client_remove(found_context);
}
if((found_context->protocol == mosq_p_mqtt5 && found_context->session_expiry_interval == 0)
|| (found_context->protocol != mosq_p_mqtt5 && found_context->clean_start == true)
@ -335,6 +337,10 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
if(rc == MOSQ_ERR_SUCCESS){
plugin__handle_connect(context);
if(context->session_expiry_interval != 0){
plugin_persist__handle_client_add(context);
}
}
return rc;
error:

@ -355,7 +355,7 @@ int handle__publish(struct mosquitto *context)
break;
case 2:
if(dup == 0){
res = db__message_insert_incoming(context, 0, stored);
res = db__message_insert_incoming(context, 0, stored, true);
}else{
res = 0;
}

@ -212,6 +212,10 @@ int handle__subscribe(struct mosquitto *context)
}
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
if(context->session_expiry_interval > 0){
plugin_persist__handle_subscription_add(context, sub, subscription_options, subscription_identifier);
}
}
mosquitto__free(sub);

@ -130,6 +130,9 @@ int handle__unsubscribe(struct mosquitto *context)
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub);
if(allowed){
rc = sub__remove(context, sub, db.subs, &reason);
if(context->session_expiry_interval > 0){
plugin_persist__handle_subscription_remove(context, sub);
}
}else{
rc = MOSQ_ERR_SUCCESS;
}

@ -20,6 +20,16 @@ _mosquitto_kick_client_by_clientid
_mosquitto_kick_client_by_username
_mosquitto_log_printf
_mosquitto_malloc
_mosquitto_persist_client_add
_mosquitto_persist_client_update
_mosquitto_persist_client_remove
_mosquitto_persist_client_msg_add
_mosquitto_persist_client_msg_remove
_mosquitto_persist_client_msg_update
_mosquitto_persist_msg_add
_mosquitto_persist_msg_remove
_mosquitto_persist_retain_add
_mosquitto_persist_retain_remove
_mosquitto_plugin_set_info
_mosquitto_property_add_binary
_mosquitto_property_add_byte
@ -36,6 +46,8 @@ _mosquitto_strdup
_mosquitto_sub_matches_acl
_mosquitto_sub_matches_acl_with_pattern
_mosquitto_sub_topic_check
_mosquitto_subscription_add
_mosquitto_subscription_remove
_mosquitto_topic_matches_sub
_mosquitto_topic_matches_sub_with_pattern
_mosquitto_validate_utf8

@ -21,6 +21,16 @@
mosquitto_kick_client_by_username;
mosquitto_log_printf;
mosquitto_malloc;
mosquitto_persist_client_msg_add;
mosquitto_persist_client_msg_remove;
mosquitto_persist_client_msg_update;
mosquitto_persist_client_add;
mosquitto_persist_client_update;
mosquitto_persist_client_remove;
mosquitto_persist_msg_add;
mosquitto_persist_msg_remove;
mosquitto_persist_retain_add;
mosquitto_persist_retain_remove;
mosquitto_plugin_set_info;
mosquitto_property_add_binary;
mosquitto_property_add_byte;
@ -37,6 +47,8 @@
mosquitto_sub_matches_acl;
mosquitto_sub_matches_acl_with_pattern;
mosquitto_sub_topic_check;
mosquitto_subscription_add;
mosquitto_subscription_remove;
mosquitto_topic_matches_sub;
mosquitto_topic_matches_sub_with_pattern;
mosquitto_validate_utf8;

@ -65,7 +65,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l)
static struct lws_sorted_usec_list sul;
#endif
static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry)
static int single_publish(struct mosquitto *context, struct mosquitto__message_v5 *msg, uint32_t message_expiry)
{
struct mosquitto_msg_store *stored;
uint16_t mid;
@ -98,7 +98,7 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5
}else{
mid = 0;
}
return db__message_insert_outgoing(context, 0, mid, (uint8_t)msg->qos, 0, stored, 0, true);
return db__message_insert_outgoing(context, 0, mid, (uint8_t)msg->qos, 0, stored, 0, true, true);
}
@ -130,7 +130,7 @@ static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t
static void queue_plugin_msgs(void)
{
struct mosquitto_message_v5 *msg, *tmp;
struct mosquitto__message_v5 *msg, *tmp;
struct mosquitto *context;
uint32_t message_expiry;

@ -305,6 +305,9 @@ int main(int argc, char *argv[])
rc = mosquitto_security_init(false);
if(rc) return rc;
plugin_persist__handle_restore();
db__msg_store_compact();
/* After loading persisted clients and ACLs, try to associate them,
* so persisted subscriptions can start storing messages */
HASH_ITER(hh_id, db.contexts_by_id, ctxt, ctxt_tmp){
@ -343,6 +346,7 @@ int main(int argc, char *argv[])
g_run = 1;
rc = mosquitto_main_loop(g_listensock, g_listensock_count);
db.shutdown = true;
log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION);

@ -164,6 +164,20 @@ struct plugin__callbacks{
struct mosquitto__callback *message;
struct mosquitto__callback *psk_key;
struct mosquitto__callback *reload;
struct mosquitto__callback *persist_restore;
struct mosquitto__callback *persist_client_add;
struct mosquitto__callback *persist_client_remove;
struct mosquitto__callback *persist_client_update;
struct mosquitto__callback *persist_subscription_add;
struct mosquitto__callback *persist_subscription_remove;
struct mosquitto__callback *persist_client_msg_add;
struct mosquitto__callback *persist_client_msg_remove;
struct mosquitto__callback *persist_client_msg_update;
struct mosquitto__callback *persist_msg_add;
struct mosquitto__callback *persist_msg_remove;
struct mosquitto__callback *persist_msg_load;
struct mosquitto__callback *persist_retain_add;
struct mosquitto__callback *persist_retain_remove;
};
struct mosquitto__security_options {
@ -404,6 +418,7 @@ struct mosquitto_msg_store{
uint16_t source_mid;
uint8_t qos;
bool retain;
bool stored;
};
struct mosquitto_client_msg{
@ -450,8 +465,8 @@ struct mosquitto__acl_user{
};
struct mosquitto_message_v5{
struct mosquitto_message_v5 *next, *prev;
struct mosquitto__message_v5{
struct mosquitto__message_v5 *next, *prev;
char *topic;
void *payload;
mosquitto_property *properties;
@ -501,12 +516,13 @@ struct mosquitto_db{
#ifdef WITH_KQUEUE
int kqueuefd;
#endif
struct mosquitto_message_v5 *plugin_msgs;
struct mosquitto__message_v5 *plugin_msgs;
#ifdef WITH_TLS
char *tls_keylog; /* This can't be in the config struct because it is used
before the config is allocated. Config probably
shouldn't be separately allocated. */
#endif
bool shutdown;
};
enum mosquitto__bridge_direction{
@ -702,18 +718,18 @@ int persist__restore(void);
/* Return the number of in-flight messages in count. */
int db__message_count(int *count);
int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos);
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update);
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored);
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist);
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored, bool persist);
int db__message_remove_incoming(struct mosquitto* context, uint16_t mid);
int db__message_release_incoming(struct mosquitto *context, uint16_t mid);
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos);
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto *context, bool force_free);
int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify);
void db__msg_store_ref_inc(struct mosquitto_msg_store *store);
void db__msg_store_ref_dec(struct mosquitto_msg_store **store);
void db__msg_store_clean(void);
@ -840,6 +856,19 @@ int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store
void LIB_ERROR(void);
void plugin__handle_tick(void);
int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier);
void plugin_persist__handle_restore(void);
void plugin_persist__handle_client_add(struct mosquitto *context);
void plugin_persist__handle_client_remove(struct mosquitto *context);
void plugin_persist__handle_client_update(struct mosquitto *context);
void plugin_persist__handle_subscription_add(struct mosquitto *context, const char *sub, uint8_t subscription_options, uint32_t subscription_identifier);
void plugin_persist__handle_subscription_remove(struct mosquitto *context, const char *sub);
void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg);
void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg);
void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg);
void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg);
void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg);
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg);
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg);
/* ============================================================
* Property related functions
@ -864,7 +893,7 @@ int property__process_disconnect(struct mosquitto *context, mosquitto_property *
int retain__init(void);
void retain__clean(struct mosquitto__retainhier **retainhier);
int retain__queue(struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier);
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics);
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist);
/* ============================================================
* Security related functions

@ -306,6 +306,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
stored->retain = chunk.F.retain;
stored->properties = chunk.properties;
stored->payload = chunk.payload;
stored->source_listener = chunk.source.listener;
rc = db__message_store(&chunk.source, stored, message_expiry_interval,
chunk.F.store_id, mosq_mo_client);
@ -345,7 +346,7 @@ static int persist__retain_chunk_restore(FILE *db_fptr)
HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), msg);
if(msg){
if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return 1;
retain__store(msg->topic, msg, split_topics);
retain__store(msg->topic, msg, split_topics, true);
mosquitto__free(local_topic);
mosquitto__free(split_topics);
}else{

@ -89,6 +89,34 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__
return &security_options->plugin_callbacks.disconnect;
case MOSQ_EVT_CONNECT:
return &security_options->plugin_callbacks.connect;
case MOSQ_EVT_PERSIST_RESTORE:
return &security_options->plugin_callbacks.persist_restore;
case MOSQ_EVT_PERSIST_CLIENT_ADD:
return &security_options->plugin_callbacks.persist_client_add;
case MOSQ_EVT_PERSIST_CLIENT_REMOVE:
return &security_options->plugin_callbacks.persist_client_remove;
case MOSQ_EVT_PERSIST_CLIENT_UPDATE:
return &security_options->plugin_callbacks.persist_client_update;
case MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD:
return &security_options->plugin_callbacks.persist_subscription_add;
case MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE:
return &security_options->plugin_callbacks.persist_subscription_remove;
case MOSQ_EVT_PERSIST_CLIENT_MSG_ADD:
return &security_options->plugin_callbacks.persist_client_msg_add;
case MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE:
return &security_options->plugin_callbacks.persist_client_msg_remove;
case MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE:
return &security_options->plugin_callbacks.persist_client_msg_update;
case MOSQ_EVT_PERSIST_MSG_ADD:
return &security_options->plugin_callbacks.persist_msg_add;
case MOSQ_EVT_PERSIST_MSG_REMOVE:
return &security_options->plugin_callbacks.persist_msg_remove;
case MOSQ_EVT_PERSIST_MSG_LOAD:
return &security_options->plugin_callbacks.persist_msg_load;
case MOSQ_EVT_PERSIST_RETAIN_ADD:
return &security_options->plugin_callbacks.persist_retain_add;
case MOSQ_EVT_PERSIST_RETAIN_REMOVE:
return &security_options->plugin_callbacks.persist_retain_remove;
default:
return NULL;
}

@ -0,0 +1,327 @@
/*
Copyright (c) 2021 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include "mosquitto_broker_internal.h"
#include "mosquitto_internal.h"
#include "mosquitto_broker.h"
#include "memory_mosq.h"
#include "mqtt_protocol.h"
#include "send_mosq.h"
#include "util_mosq.h"
#include "utlist.h"
#include "lib_load.h"
#include "will_mosq.h"
void plugin_persist__handle_restore(void)
{
struct mosquitto_evt_persist_restore event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
DL_FOREACH(opts->plugin_callbacks.persist_restore, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_RESTORE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_add(struct mosquitto *context)
{
struct mosquitto_evt_persist_client event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
struct mosquitto_message_v5 will;
UNUSED(will); /* FIXME */
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.username = context->username;
event_data.will_delay_time = context->will_delay_time;
event_data.session_expiry_time = context->session_expiry_time;
event_data.will_delay_interval = context->will_delay_interval;
event_data.session_expiry_interval = context->session_expiry_interval;
if(context->listener){
event_data.listener_port = context->listener->port;
}else{
event_data.listener_port = 0;
}
event_data.max_qos = context->max_qos;
event_data.retain_available = context->retain_available;
event_data.max_packet_size = context->maximum_packet_size;
DL_FOREACH(opts->plugin_callbacks.persist_client_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_ADD, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_update(struct mosquitto *context)
{
struct mosquitto_evt_persist_client event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
struct mosquitto_message_v5 will;
UNUSED(will); /* FIXME */
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.username = context->username;
event_data.will_delay_time = context->will_delay_time;
event_data.session_expiry_time = context->session_expiry_time;
event_data.will_delay_interval = context->will_delay_interval;
event_data.session_expiry_interval = context->session_expiry_interval;
if(context->listener){
event_data.listener_port = context->listener->port;
}else{
event_data.listener_port = 0;
}
event_data.max_qos = context->max_qos;
event_data.retain_available = context->retain_available;
event_data.max_packet_size = context->maximum_packet_size;
DL_FOREACH(opts->plugin_callbacks.persist_client_update, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_UPDATE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_remove(struct mosquitto *context)
{
struct mosquitto_evt_persist_client event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(context->session_expiry_interval > 0 || context->id == NULL || context->state == mosq_cs_duplicate){
return;
}
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
DL_FOREACH(opts->plugin_callbacks.persist_client_remove, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_REMOVE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_subscription_add(struct mosquitto *context, const char *sub, uint8_t subscription_options, uint32_t subscription_identifier)
{
struct mosquitto_evt_persist_subscription event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.topic = sub;
event_data.subscription_identifier = subscription_identifier;
event_data.subscription_options = subscription_options;
DL_FOREACH(opts->plugin_callbacks.persist_subscription_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_subscription_remove(struct mosquitto *context, const char *sub)
{
struct mosquitto_evt_persist_subscription event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.topic = sub;
DL_FOREACH(opts->plugin_callbacks.persist_subscription_remove, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
struct mosquitto_evt_persist_client_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(context->session_expiry_interval == 0
|| (cmsg->qos == 0 && db.config->queue_qos0_messages == false)){
return;
}
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.cmsg_id = cmsg->cmsg_id;
event_data.store_id = cmsg->store->db_id;
event_data.mid = cmsg->mid;
event_data.qos = cmsg->qos;
event_data.retain = cmsg->retain;
event_data.dup = cmsg->dup;
event_data.direction = cmsg->direction;
event_data.state = cmsg->state;
DL_FOREACH(opts->plugin_callbacks.persist_client_msg_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_ADD, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
struct mosquitto_evt_persist_client_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.cmsg_id = cmsg->cmsg_id;
event_data.mid = cmsg->mid;
event_data.state = cmsg->state;
event_data.qos = cmsg->qos;
event_data.store_id = cmsg->store->db_id;
event_data.direction = cmsg->direction;
DL_FOREACH(opts->plugin_callbacks.persist_client_msg_remove, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
struct mosquitto_evt_persist_client_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.client_id = context->id;
event_data.cmsg_id = cmsg->cmsg_id;
event_data.store_id = cmsg->store->db_id;
event_data.state = cmsg->state;
event_data.dup = cmsg->dup;
event_data.direction = cmsg->direction;
DL_FOREACH(opts->plugin_callbacks.persist_client_msg_update, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg)
{
struct mosquitto_evt_persist_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(msg->stored) return;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
event_data.expiry_time = msg->message_expiry_time;
event_data.topic = msg->topic;
event_data.payload = msg->payload;
event_data.source_id = msg->source_id;
event_data.source_username = msg->source_username;
event_data.properties = msg->properties;
event_data.payloadlen = msg->payloadlen;
event_data.source_mid = msg->source_mid;
if(msg->source_listener){
event_data.source_port = msg->source_listener->port;
}else{
event_data.source_port = 0;
}
event_data.qos = msg->qos;
event_data.retain = msg->retain;
DL_FOREACH(opts->plugin_callbacks.persist_msg_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_MSG_ADD, &event_data, cb_base->userdata);
}
msg->stored = true;
}
void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg)
{
struct mosquitto_evt_persist_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(msg->stored == false) return;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
DL_FOREACH(opts->plugin_callbacks.persist_msg_remove, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_MSG_REMOVE, &event_data, cb_base->userdata);
}
msg->stored = false;
}
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg)
{
struct mosquitto_evt_persist_retain event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
event_data.topic = msg->topic;
DL_FOREACH(opts->plugin_callbacks.persist_retain_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_MSG_ADD, &event_data, cb_base->userdata);
}
}
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg)
{
struct mosquitto_evt_persist_retain event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.topic = msg->topic;
DL_FOREACH(opts->plugin_callbacks.persist_retain_remove, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_MSG_REMOVE, &event_data, cb_base->userdata);
}
}

@ -23,6 +23,7 @@ Contributors:
#include "mqtt_protocol.h"
#include "send_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
#include "utlist.h"
#include "will_mosq.h"
@ -189,7 +190,7 @@ int mosquitto_broker_publish(
bool retain,
mosquitto_property *properties)
{
struct mosquitto_message_v5 *msg;
struct mosquitto__message_v5 *msg;
if(topic == NULL
|| payloadlen < 0
@ -199,7 +200,7 @@ int mosquitto_broker_publish(
return MOSQ_ERR_INVAL;
}
msg = mosquitto__malloc(sizeof(struct mosquitto_message_v5));
msg = mosquitto__malloc(sizeof(struct mosquitto__message_v5));
if(msg == NULL) return MOSQ_ERR_NOMEM;
msg->next = NULL;
@ -385,6 +386,340 @@ int mosquitto_kick_client_by_username(const char *username, bool with_will)
}
int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client)
{
struct mosquitto *context;
int i;
if(client == NULL || client->client_id == NULL) return MOSQ_ERR_INVAL;
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context);
if(context){
return MOSQ_ERR_INVAL;
}
context = context__init();
if(!context) return MOSQ_ERR_NOMEM;
context->id = mosquitto__strdup(client->client_id);
if(client->username){
context->username = mosquitto__strdup(client->username);
if(!context->username){
mosquitto__free(context->id);
mosquitto__free(context);
return MOSQ_ERR_NOMEM;
}
}else{
context->username = NULL;
}
context->clean_start = false;
context->will_delay_time = client->will_delay_time;
context->session_expiry_time = client->session_expiry_time;
context->will_delay_interval = client->will_delay_interval;
context->session_expiry_interval = client->session_expiry_interval;
context->max_qos = client->max_qos;
context->maximum_packet_size = client->max_packet_size;
context->retain_available = client->retain_available;
/* in per_listener_settings mode, try to find the listener by persisted port */
if(db.config->per_listener_settings && client->listener_port > 0){
for(i=0; i < db.config->listener_count; i++){
if(db.config->listeners[i].port == client->listener_port){
context->listener = &db.config->listeners[i];
break;
}
}
}
HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context);
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client)
{
struct mosquitto *context;
int i;
char *username;
if(client == NULL || client->client_id == NULL) return MOSQ_ERR_INVAL;
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context);
if(context == NULL){
return MOSQ_ERR_INVAL;
}
if(client->username){
username = mosquitto__strdup(client->username);
if(!username){
return MOSQ_ERR_NOMEM;
}
mosquitto_free(context->username);
context->username = username;
}else{
mosquitto_free(context->username);
context->username = NULL;
}
context->clean_start = false;
context->will_delay_time = client->will_delay_time;
context->session_expiry_time = client->session_expiry_time;
context->will_delay_interval = client->will_delay_interval;
context->session_expiry_interval = client->session_expiry_interval;
context->max_qos = client->max_qos;
context->maximum_packet_size = client->max_packet_size;
context->retain_available = client->retain_available;
/* in per_listener_settings mode, try to find the listener by persisted port */
if(db.config->per_listener_settings && client->listener_port > 0){
for(i=0; i < db.config->listener_count; i++){
if(db.config->listeners[i].port == client->listener_port){
context->listener = &db.config->listeners[i];
break;
}
}
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_client_remove(const char *client_id)
{
struct mosquitto *context;
if(client_id == NULL) return MOSQ_ERR_INVAL;
context = NULL;
HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context);
if(context == NULL){
return MOSQ_ERR_SUCCESS;
}
session_expiry__remove(context);
will_delay__remove(context);
will__clear(context);
context->clean_start = true;
context->session_expiry_interval = 0;
mosquitto__set_state(context, mosq_cs_duplicate);
do_disconnect(context, MOSQ_ERR_SUCCESS);
return MOSQ_ERR_SUCCESS;
}
struct mosquitto_msg_store *find_store_msg(uint64_t store_id)
{
struct mosquitto_msg_store *stored;
HASH_FIND(hh, db.msg_store, &store_id, sizeof(store_id), stored);
return stored;
}
int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
struct mosquitto_msg_store *stored;
if(client_msg == NULL || client_msg->client_id == NULL){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
}
stored = find_store_msg(client_msg->store_id);
if(stored == NULL){
return MOSQ_ERR_NOT_FOUND;
}
if(client_msg->direction == mosq_md_out){
if(client_msg->qos > 0){
context->last_mid = client_msg->mid;
}
return db__message_insert_outgoing(context, client_msg->cmsg_id, client_msg->mid, client_msg->qos, client_msg->retain,
stored, client_msg->subscription_identifier, false, false);
}else{
return db__message_insert_incoming(context, client_msg->cmsg_id, stored, false);
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
struct mosquitto_msg_store *stored;
if(client_msg == NULL || client_msg->client_id == NULL){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
}
stored = find_store_msg(client_msg->store_id);
if(stored == NULL){
return MOSQ_ERR_NOT_FOUND;
}
if(client_msg->direction == mosq_md_out){
if(client_msg->qos > 0){
context->last_mid = client_msg->mid;
}
return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos);
}else{
return db__message_remove_incoming(context, client_msg->mid);
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg)
{
struct mosquitto *context;
if(client_msg == NULL || client_msg->client_id == NULL){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context);
if(context == NULL){
return MOSQ_ERR_NOT_FOUND;
}
if(client_msg->direction == mosq_md_out){
db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false);
}
return MOSQ_ERR_SUCCESS;
}
int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t subscription_options, uint32_t subscription_identifier)
{
struct mosquitto *context;
if(client_id == NULL || topic == NULL || client_id[0] == '\0' || topic[0] == '\0'){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context);
if(context){
return sub__add(context, topic, subscription_options&0x03, subscription_identifier, subscription_options, &db.subs);
}else{
return MOSQ_ERR_INVAL;
}
}
int mosquitto_subscription_remove(const char *client_id, const char *topic)
{
struct mosquitto *context;
uint8_t reason;
if(client_id == NULL || topic == NULL || client_id[0] == '\0' || topic[0] == '\0'){
return MOSQ_ERR_INVAL;
}
HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context);
if(context){
return sub__remove(context, topic, db.subs, &reason);
}else{
return MOSQ_ERR_INVAL;
}
}
int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg)
{
struct mosquitto context;
struct mosquitto_msg_store *stored;
uint32_t message_expiry_interval;
time_t message_expiry_interval_tt;
int i;
memset(&context, 0, sizeof(context));
memset(&stored, 0, sizeof(stored));
if(msg->source_id){
context.id = mosquitto__strdup(msg->source_id);
if(!context.id) return MOSQ_ERR_NOMEM;
}
if(msg->source_username){
context.username = mosquitto__strdup(msg->source_username);
if(!context.username) return MOSQ_ERR_NOMEM;
}
if(msg->expiry_time == 0){
message_expiry_interval = 0;
}else if(msg->expiry_time <= db.now_real_s){
message_expiry_interval = 1;
}else{
message_expiry_interval_tt = msg->expiry_time - db.now_real_s;
if(message_expiry_interval_tt > UINT32_MAX){
message_expiry_interval = UINT32_MAX;
}else{
message_expiry_interval = (uint32_t)message_expiry_interval_tt;
}
}
stored = mosquitto_calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL){
goto error;
}
stored->payloadlen = msg->payloadlen;
stored->source_mid = msg->source_mid;
stored->qos = msg->qos;
stored->retain = msg->retain;
stored->payload = mosquitto_malloc(stored->payloadlen+1);
if(stored->payload == NULL){
goto error;
}
memcpy(stored->payload, msg->payload, stored->payloadlen);
((uint8_t *)stored->payload)[stored->payloadlen] = 0; /* Always zero terminate */
stored->topic = mosquitto_strdup(msg->topic);
if(stored->topic == NULL){
goto error;
}
stored->properties = NULL; /* FIXME */
if(msg->source_port){
for(i=0; i<db.config->listener_count; i++){
if(db.config->listeners[i].port == msg->source_port){
stored->source_listener = &db.config->listeners[i];
break;
}
}
}
return db__message_store(&context, stored, message_expiry_interval, msg->store_id, mosq_mo_broker);
error:
if(stored){
mosquitto_property_free_all(&stored->properties);
mosquitto_free(stored->topic);
mosquitto_free(stored->payload);
mosquitto_free(stored);
}
return MOSQ_ERR_NOMEM;
}
int mosquitto_persist_msg_remove(uint64_t store_id)
{
struct mosquitto_msg_store *stored;
stored = find_store_msg(store_id);
db__msg_store_remove(stored, false);
return MOSQ_ERR_SUCCESS;
}
void mosquitto_complete_basic_auth(const char *client_id, int result)
{
struct mosquitto *context;

@ -71,8 +71,54 @@ int retain__init(void)
return MOSQ_ERR_SUCCESS;
}
int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *msg)
{
struct mosquitto_msg_store *stored;
int rc = MOSQ_ERR_UNKNOWN;
char **split_topics = NULL;
char *local_topic = NULL;
if(msg == NULL || msg->topic == NULL) return MOSQ_ERR_INVAL;
HASH_FIND(hh, db.msg_store, &msg->store_id, sizeof(msg->store_id), stored);
if(stored){
if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
rc = retain__store(msg->topic, stored, split_topics, false);
mosquitto__free(split_topics);
mosquitto__free(local_topic);
}
return rc;
}
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics)
int mosquitto_persist_retain_remove(const char *topic)
{
struct mosquitto_msg_store stored;
int rc = MOSQ_ERR_UNKNOWN;
char **split_topics = NULL;
char *local_topic = NULL;
if(topic == NULL) return MOSQ_ERR_INVAL;
memset(&stored, 0, sizeof(stored));
stored.ref_count = 10; /* Ensure this isn't freed */
if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM;
/* With stored->payloadlen == 0, this means the message will be removed */
rc = retain__store(topic, &stored, split_topics, false);
mosquitto__free(split_topics);
mosquitto__free(local_topic);
return rc;
}
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist)
{
struct mosquitto__retainhier *retainhier;
struct mosquitto__retainhier *branch;
@ -111,19 +157,29 @@ int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **
#endif
if(retainhier->retained){
if(persist){
plugin_persist__handle_retain_remove(retainhier->retained);
}
db__msg_store_ref_dec(&retainhier->retained);
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
if(stored->payloadlen == 0){
retainhier->retained = NULL;
}
}
if(stored->payloadlen){
retainhier->retained = stored;
db__msg_store_ref_inc(retainhier->retained);
if(retainhier->retained->topic[0] != '$'){
if(persist){
plugin_persist__handle_msg_add(retainhier->retained);
plugin_persist__handle_retain_add(retainhier->retained);
}
}
#ifdef WITH_SYS_TREE
db.retained_count++;
#endif
}else{
retainhier->retained = NULL;
}
return MOSQ_ERR_SUCCESS;
@ -138,6 +194,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
struct mosquitto_msg_store *retained;
if(branch->retained->message_expiry_time > 0 && db.now_real_s >= branch->retained->message_expiry_time){
plugin_persist__handle_retain_remove(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
@ -188,7 +245,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
}else{
mid = 0;
}
return db__message_insert_outgoing(context, 0, mid, qos, true, retained, subscription_identifier, false);
return db__message_insert_outgoing(context, 0, mid, qos, true, retained, subscription_identifier, false, true);
}

@ -78,6 +78,8 @@ int session_expiry__add(struct mosquitto *context)
DL_INSERT_INORDER(expiry_list, item, session_expiry__cmp);
plugin_persist__handle_client_update(context);
return MOSQ_ERR_SUCCESS;
}
@ -146,6 +148,7 @@ void session_expiry__check(void)
context->will_delay_interval = 0;
will_delay__remove(context);
context__send_will(context);
plugin_persist__handle_client_remove(context);
context__add_to_disused(context);
}else{
timeout = (item->context->session_expiry_time - db.now_real_s + 1) * 1000;

@ -93,7 +93,7 @@ static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_
}else{
client_retain = false;
}
if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true) == 1){
if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){
return 1;
}
}else{
@ -661,7 +661,7 @@ int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, i
}
if(retain){
rc2 = retain__store(topic, *stored, split_topics);
rc2 = retain__store(topic, *stored, split_topics, true);
if(rc2) rc = rc2;
}

@ -54,6 +54,7 @@ int will_delay__add(struct mosquitto *context)
DL_INSERT_INORDER(delay_list, item, will_delay__cmp);
loop__update_next_event(item->context->will_delay_interval*1000);
plugin_persist__handle_client_update(context);
return MOSQ_ERR_SUCCESS;
}

@ -0,0 +1,78 @@
#!/usr/bin/env python3
# Connect a client, start a QoS 2 flow, disconnect, restore, carry on with the
# QoS 2 flow. Is it received?
from mosq_test_helper import *
import persist_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
persist_help.write_config(conf_file, port)
rc = 1
persist_help.init(port)
keepalive = 10
client_id = "persist-client-msg-in-v3-1-1"
proto_ver = 4
helper_id = "persist-client-msg-in-v3-1-1-helper"
topic = "client-msg-in/2"
qos = 2
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
publish_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver)
suback_packet = mosq_test.gen_suback(mid=mid, qos=qos, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, start flow, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec send")
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect helper and subscribe
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper")
# Complete the flow
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubrel send")
mosq_test.do_receive_send(helper, publish_packet, pubrec_packet, "pubrec receive")
mosq_test.do_receive_send(helper, pubrel_packet, pubcomp_packet, "pubcomp receive")
rc = 0
finally:
if broker is not None:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, send a message with a
# different client, restore, reconnect, check it is received.
from mosq_test_helper import *
import persist_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
persist_help.write_config(conf_file, port)
rc = 1
persist_help.init(port)
keepalive = 10
client_id = "persist-client-msg-v3-1-1"
proto_ver = 4
helper_id = "persist-client-msg-v3-1-1-helper"
topic0 = "client-msg/0"
topic1 = "client-msg/1"
topic2 = "client-msg/2"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client, subscribe, disconnect
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
sock.close()
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# Does the client get the messages
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
sock.close()
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
# If there are messages, the ping will fail
mosq_test.do_ping(sock)
rc = 0
finally:
if broker is not None:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -0,0 +1,82 @@
#!/usr/bin/env python3
# Connect a client, check it is restored, clear the client, check it is not there.
from mosq_test_helper import *
import persist_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
persist_help.write_config(conf_file, port)
rc = 1
persist_help.init(port)
keepalive = 10
client_id = "persist-client-v3-1-1"
proto_ver = 4
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
connect_packet_clean = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2")
mosq_test.do_ping(sock)
sock.close()
# Clear the client
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 3")
mosq_test.do_ping(sock)
sock.close()
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 4")
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client, it should not have a session
sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 5")
mosq_test.do_ping(sock)
sock.close()
rc = 0
finally:
if broker is not None:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -0,0 +1,89 @@
#!/usr/bin/env python3
# Publish a retained messages, check they are restored
from mosq_test_helper import *
import persist_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
persist_help.write_config(conf_file, port)
rc = 1
persist_help.init(port)
keepalive = 10
topic1 = "test/retain1"
topic2 = "test/retain2"
topic3 = "test/retain3"
source_id = "persist-retain-v3-1-1"
qos = 0
payload2 = "retained message 2"
payload3 = "retained message 3"
proto_ver = 4
connect_packet = mosq_test.gen_connect(source_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver)
publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=False, proto_ver=proto_ver)
publish3_packet = mosq_test.gen_publish(topic3, qos=qos, payload=payload3, retain=True, proto_ver=proto_ver)
mid = 1
subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=4)
suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=4)
mid = 2
unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=4)
unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=4)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Check no retained messages exist
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Ping will fail if a PUBLISH is received
mosq_test.do_ping(sock)
# Unsubscribe, so we don't receive the messages
mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback")
# Send some retained messages
sock.send(publish1_packet)
mosq_test.do_ping(sock)
sock.send(publish2_packet) # Not retained
mosq_test.do_ping(sock)
sock.send(publish3_packet)
mosq_test.do_ping(sock)
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
# Subscribe
mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
# Check retained messages exist
mosq_test.receive_unordered(sock, publish1_packet, publish3_packet, "publish 1 / 3")
mosq_test.do_ping(sock)
rc = 0
finally:
if broker is not None:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -0,0 +1,129 @@
#!/usr/bin/env python3
# Connect a client, add a subscription, disconnect, restore, reconnect, send a
# message with a different client, check it is received.
from mosq_test_helper import *
import persist_help
port = mosq_test.get_port()
conf_file = os.path.basename(__file__).replace('.py', '.conf')
persist_help.write_config(conf_file, port)
rc = 1
persist_help.init(port)
keepalive = 10
client_id = "persist-subscription-v3-1-1"
proto_ver = 4
helper_id = "persist-subscription-v3-1-1-helper"
topic0 = "subscription/0"
topic1 = "subscription/1"
topic2 = "subscription/2"
connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False)
connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver)
mid = 1
subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver)
suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver)
subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver)
suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver)
subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver)
suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver)
unsubscribe_packet2 = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver)
unsuback_packet2 = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver)
connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True)
publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver)
mid = 1
publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver)
puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver)
mid = 2
publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver)
pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver)
pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver)
pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver)
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
con = None
try:
# Connect client
sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port)
mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0")
mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1")
mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2")
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
mosq_test.do_ping(sock)
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", publish_packet0)
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2")
mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2")
# Unsubscribe
mosq_test.do_send_receive(sock, unsubscribe_packet2, unsuback_packet2, "unsuback 2")
sock.close()
# Kill broker
broker.terminate()
broker.wait()
# Restart broker
broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
# Connect client again, it should have a session
sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port)
mosq_test.do_ping(sock)
# Connect helper and publish
helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port)
helper.send(publish_packet0)
mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper")
mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper")
mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper")
helper.close()
# Does the client get the messages
mosq_test.expect_packet(sock, "publish 0", publish_packet0)
mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1")
mosq_test.do_ping(sock)
rc = 0
finally:
if broker is not None:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
os.remove(conf_file)
rc += persist_help.cleanup(port)
if rc:
print(stde.decode('utf-8'))
exit(rc)

@ -250,3 +250,10 @@ ifeq ($(WITH_CJSON),yes)
./14-dynsec-role-invalid.py
endif
endif
15 :
./15-persist-client-v3-1-1.py
./15-persist-subscription-v3-1-1.py
./15-persist-retain-v3-1-1.py
./15-persist-client-msg-in-v3-1-1.py
./15-persist-client-msg-out-v3-1-1.py

@ -0,0 +1,13 @@
# This can be rewritten to easily support persistence plugins with all of
# the 15-persist-* tests.
def init(port):
pass
def cleanup(port):
return 0
def write_config(filename, port):
with open(filename, 'w') as f:
#f.write("plugin ..\n")
pass

@ -211,6 +211,12 @@ tests = [
(1, './14-dynsec-plugin-invalid.py'),
(1, './14-dynsec-role.py'),
(1, './14-dynsec-role-invalid.py'),
#(1, './15-persist-client-msg-in-v3-1-1.py'),
#(1, './15-persist-client-msg-out-v3-1-1.py'),
#(1, './15-persist-client-v3-1-1.py'),
#(1, './15-persist-retain-v3-1-1.py'),
#(1, './15-persist-subscription-v3-1-1.py'),
]
ptest.run_tests(tests)

@ -165,16 +165,17 @@ int sub__add(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t i
return MOSQ_ERR_SUCCESS;
}
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *msg)
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *msg, bool persist)
{
UNUSED(context);
UNUSED(cmsg_id);
UNUSED(msg);
UNUSED(persist);
return MOSQ_ERR_SUCCESS;
}
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update)
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist)
{
UNUSED(context);
UNUSED(cmsg_id);
@ -184,6 +185,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
UNUSED(stored);
UNUSED(subscription_identifier);
UNUSED(update);
UNUSED(persist);
return MOSQ_ERR_SUCCESS;
}
@ -224,3 +226,16 @@ void context__add_to_by_id(struct mosquitto *context)
HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context);
}
}
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}

@ -170,3 +170,39 @@ void context__add_to_by_id(struct mosquitto *context)
HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context);
}
}
void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_clear(struct mosquitto *context, uint8_t direction)
{
UNUSED(context);
UNUSED(direction);
}
void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}

@ -13,6 +13,10 @@ struct mosquitto_db{
};
struct mosquitto_msg_store{
};
int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt, ...)
{
UNUSED(mosq);
@ -90,3 +94,13 @@ ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count)
UNUSED(count);
return 1;
}
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}

@ -146,11 +146,12 @@ int retain__queue(struct mosquitto *context, const char *sub, uint8_t sub_qos, u
return MOSQ_ERR_SUCCESS;
}
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics)
int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist)
{
UNUSED(topic);
UNUSED(stored);
UNUSED(split_topics);
UNUSED(persist);
return MOSQ_ERR_SUCCESS;
}
@ -188,3 +189,40 @@ int util__random_bytes(void *bytes, int count)
return MOSQ_ERR_SUCCESS;
}
void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg)
{
UNUSED(context);
UNUSED(cmsg);
}
void plugin_persist__handle_client_msg_clear(struct mosquitto *context, uint8_t direction)
{
UNUSED(context);
UNUSED(direction);
}
void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}
void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg)
{
UNUSED(msg);
}

Loading…
Cancel
Save