diff --git a/src/loop.c b/src/loop.c index 67b41e77..51f0acc1 100644 --- a/src/loop.c +++ b/src/loop.c @@ -206,6 +206,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens plugin__handle_tick(); session_expiry__check(); will_delay__check(); + plugin_persist__process_retain_events(false); rc = mux__handle(listensock, listensock_count); if(rc) return rc; diff --git a/src/mosquitto.c b/src/mosquitto.c index a5586051..19ac12f1 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -346,13 +346,6 @@ int main(int argc, char *argv[]) g_run = 1; rc = mosquitto_main_loop(g_listensock, g_listensock_count); - db.shutdown = true; - - log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION); - -#ifdef WITH_CJSON - broker_control__cleanup(); -#endif /* FIXME - this isn't quite right, all wills with will delay zero should be * sent now, but those with positive will delay should be persisted and @@ -362,6 +355,16 @@ int main(int argc, char *argv[]) } will_delay__send_all(); + plugin_persist__process_retain_events(true); + + /* Set to true only after persistence events have been processed */ + db.shutdown = true; + log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION); + +#ifdef WITH_CJSON + broker_control__cleanup(); +#endif + #ifdef WITH_PERSISTENCE persist__backup(true); #endif diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 5274f305..0b931524 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -478,6 +478,11 @@ struct mosquitto__message_v5{ bool retain; }; +struct persist_retain_event{ + UT_hash_handle hh; + struct mosquitto_base_msg *msg; + int event; +}; struct mosquitto_db{ dbid_t last_db_id; @@ -488,6 +493,7 @@ struct mosquitto_db{ struct mosquitto *contexts_by_sock; struct mosquitto *contexts_by_id_delayed_auth; struct mosquitto *contexts_for_free; + struct persist_retain_event *persist_retain_events; #ifdef WITH_BRIDGE struct mosquitto **bridges; int bridge_count; @@ -873,6 +879,8 @@ void plugin_persist__handle_base_msg_add(struct mosquitto_base_msg *base_msg); void plugin_persist__handle_base_msg_delete(struct mosquitto_base_msg *base_msg); void plugin_persist__handle_retain_msg_set(struct mosquitto_base_msg *base_msg); void plugin_persist__handle_retain_msg_delete(struct mosquitto_base_msg *base_msg); +void plugin_persist__process_retain_events(bool force); +void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event); /* ============================================================ * Property related functions diff --git a/src/plugin_persist.c b/src/plugin_persist.c index c055db0e..046aeb42 100644 --- a/src/plugin_persist.c +++ b/src/plugin_persist.c @@ -349,6 +349,54 @@ void plugin_persist__handle_base_msg_delete(struct mosquitto_base_msg *msg) } +void plugin_persist__process_retain_events(bool force) +{ + struct persist_retain_event *evt, *evt_tmp; + static time_t last_run = 0; + + if(db.now_real_s > last_run + 10 || force){ + HASH_ITER(hh, db.persist_retain_events, evt, evt_tmp){ + HASH_DELETE(hh, db.persist_retain_events, evt); + if(evt->event == MOSQ_EVT_PERSIST_RETAIN_MSG_SET){ + plugin_persist__handle_base_msg_add(evt->msg); + plugin_persist__handle_retain_msg_set(evt->msg); + }else if(evt->event == MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE){ + plugin_persist__handle_retain_msg_delete(evt->msg); + } + db__msg_store_ref_dec(&evt->msg); + SAFE_FREE(evt); + } + last_run = db.now_real_s; + } +} + + +/* The retain event queue is used to store retain persistence events and periodically apply them. + * There can only be one event per topic queued at any one time, and this means that we + * can potentially remove a lot of set/delete persist events if a topic is + * being updated frequently. */ +void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event) +{ + struct persist_retain_event *evt; + + HASH_FIND(hh, db.persist_retain_events, msg->topic, strlen(msg->topic), evt); + if(evt){ + db__msg_store_ref_dec(&evt->msg); + HASH_DELETE(hh, db.persist_retain_events, evt); + }else{ + evt = mosquitto__calloc(1, sizeof(struct persist_retain_event)); + if(!evt){ + return; + } + } + + evt->event = event; + evt->msg = msg; + db__msg_store_ref_inc(msg); + HASH_ADD_KEYPTR(hh, db.persist_retain_events, evt->msg->topic, strlen(evt->msg->topic), evt); +} + + void plugin_persist__handle_retain_msg_set(struct mosquitto_base_msg *msg) { struct mosquitto_evt_persist_retain_msg event_data; diff --git a/src/plugin_public.c b/src/plugin_public.c index bd67f2dd..4287efd6 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -682,6 +682,7 @@ int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg) uint32_t message_expiry_interval; time_t message_expiry_interval_tt; int i; + int rc; memset(&context, 0, sizeof(context)); @@ -725,7 +726,10 @@ int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg) } } } - return db__message_store(&context, base_msg, message_expiry_interval, msg->store_id, mosq_mo_broker); + rc = db__message_store(&context, base_msg, message_expiry_interval, msg->store_id, mosq_mo_broker); + free(context.id); + free(context.username); + return rc; error: mosquitto_property_free_all(&msg->plugin_properties); diff --git a/src/retain.c b/src/retain.c index 1e2db194..cccc2aea 100644 --- a/src/retain.c +++ b/src/retain.c @@ -159,7 +159,7 @@ int retain__store(const char *topic, struct mosquitto_base_msg *base_msg, char * if(retainhier->retained){ if(persist && retainhier->retained->topic[0] != '$' && base_msg->payloadlen == 0){ /* Only delete if another retained message isn't replacing this one */ - plugin_persist__handle_retain_msg_delete(retainhier->retained); + plugin_persist__queue_retain_event(retainhier->retained, MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE); } db__msg_store_ref_dec(&retainhier->retained); #ifdef WITH_SYS_TREE @@ -173,8 +173,7 @@ int retain__store(const char *topic, struct mosquitto_base_msg *base_msg, char * retainhier->retained = base_msg; db__msg_store_ref_inc(retainhier->retained); if(persist && retainhier->retained->topic[0] != '$'){ - plugin_persist__handle_base_msg_add(retainhier->retained); - plugin_persist__handle_retain_msg_set(retainhier->retained); + plugin_persist__queue_retain_event(retainhier->retained, MOSQ_EVT_PERSIST_RETAIN_MSG_SET); } #ifdef WITH_SYS_TREE db.retained_count++; diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 25ffc869..4f1da80e 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -239,3 +239,14 @@ void plugin_persist__handle_base_msg_add(struct mosquitto_base_msg *msg) { UNUSED(msg); } + +void plugin_persist__process_retain_events(bool force) +{ + UNUSED(force); +} + +void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event) +{ + UNUSED(msg); + UNUSED(event); +} diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c index cdb6aa49..56100e22 100644 --- a/test/unit/persist_write_stubs.c +++ b/test/unit/persist_write_stubs.c @@ -211,3 +211,14 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, const UNUSED(context); UNUSED(sub); } + +void plugin_persist__process_retain_events(bool force) +{ + UNUSED(force); +} + +void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event) +{ + UNUSED(msg); + UNUSED(event); +} diff --git a/test/unit/stubs.c b/test/unit/stubs.c index 660dfe0f..5229d491 100644 --- a/test/unit/stubs.c +++ b/test/unit/stubs.c @@ -104,3 +104,14 @@ void plugin_persist__handle_retain_remove(struct mosquitto_base_msg *msg) { UNUSED(msg); } + +void plugin_persist__process_retain_events(bool force) +{ + UNUSED(force); +} + +void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event) +{ + UNUSED(msg); + UNUSED(event); +}