Process retain persist events only every 10 seconds.

pull/2485/head
Roger A. Light 4 years ago
parent 1bb16a68dc
commit b4a0255f1c

@ -206,6 +206,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
plugin__handle_tick();
session_expiry__check();
will_delay__check();
plugin_persist__process_retain_events(false);
rc = mux__handle(listensock, listensock_count);
if(rc) return rc;

@ -346,13 +346,6 @@ int main(int argc, char *argv[])
g_run = 1;
rc = mosquitto_main_loop(g_listensock, g_listensock_count);
db.shutdown = true;
log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION);
#ifdef WITH_CJSON
broker_control__cleanup();
#endif
/* FIXME - this isn't quite right, all wills with will delay zero should be
* sent now, but those with positive will delay should be persisted and
@ -362,6 +355,16 @@ int main(int argc, char *argv[])
}
will_delay__send_all();
plugin_persist__process_retain_events(true);
/* Set to true only after persistence events have been processed */
db.shutdown = true;
log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION);
#ifdef WITH_CJSON
broker_control__cleanup();
#endif
#ifdef WITH_PERSISTENCE
persist__backup(true);
#endif

@ -478,6 +478,11 @@ struct mosquitto__message_v5{
bool retain;
};
struct persist_retain_event{
UT_hash_handle hh;
struct mosquitto_base_msg *msg;
int event;
};
struct mosquitto_db{
dbid_t last_db_id;
@ -488,6 +493,7 @@ struct mosquitto_db{
struct mosquitto *contexts_by_sock;
struct mosquitto *contexts_by_id_delayed_auth;
struct mosquitto *contexts_for_free;
struct persist_retain_event *persist_retain_events;
#ifdef WITH_BRIDGE
struct mosquitto **bridges;
int bridge_count;
@ -873,6 +879,8 @@ void plugin_persist__handle_base_msg_add(struct mosquitto_base_msg *base_msg);
void plugin_persist__handle_base_msg_delete(struct mosquitto_base_msg *base_msg);
void plugin_persist__handle_retain_msg_set(struct mosquitto_base_msg *base_msg);
void plugin_persist__handle_retain_msg_delete(struct mosquitto_base_msg *base_msg);
void plugin_persist__process_retain_events(bool force);
void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event);
/* ============================================================
* Property related functions

@ -349,6 +349,54 @@ void plugin_persist__handle_base_msg_delete(struct mosquitto_base_msg *msg)
}
void plugin_persist__process_retain_events(bool force)
{
struct persist_retain_event *evt, *evt_tmp;
static time_t last_run = 0;
if(db.now_real_s > last_run + 10 || force){
HASH_ITER(hh, db.persist_retain_events, evt, evt_tmp){
HASH_DELETE(hh, db.persist_retain_events, evt);
if(evt->event == MOSQ_EVT_PERSIST_RETAIN_MSG_SET){
plugin_persist__handle_base_msg_add(evt->msg);
plugin_persist__handle_retain_msg_set(evt->msg);
}else if(evt->event == MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE){
plugin_persist__handle_retain_msg_delete(evt->msg);
}
db__msg_store_ref_dec(&evt->msg);
SAFE_FREE(evt);
}
last_run = db.now_real_s;
}
}
/* The retain event queue is used to store retain persistence events and periodically apply them.
* There can only be one event per topic queued at any one time, and this means that we
* can potentially remove a lot of set/delete persist events if a topic is
* being updated frequently. */
void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event)
{
struct persist_retain_event *evt;
HASH_FIND(hh, db.persist_retain_events, msg->topic, strlen(msg->topic), evt);
if(evt){
db__msg_store_ref_dec(&evt->msg);
HASH_DELETE(hh, db.persist_retain_events, evt);
}else{
evt = mosquitto__calloc(1, sizeof(struct persist_retain_event));
if(!evt){
return;
}
}
evt->event = event;
evt->msg = msg;
db__msg_store_ref_inc(msg);
HASH_ADD_KEYPTR(hh, db.persist_retain_events, evt->msg->topic, strlen(evt->msg->topic), evt);
}
void plugin_persist__handle_retain_msg_set(struct mosquitto_base_msg *msg)
{
struct mosquitto_evt_persist_retain_msg event_data;

@ -682,6 +682,7 @@ int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg)
uint32_t message_expiry_interval;
time_t message_expiry_interval_tt;
int i;
int rc;
memset(&context, 0, sizeof(context));
@ -725,7 +726,10 @@ int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg)
}
}
}
return db__message_store(&context, base_msg, message_expiry_interval, msg->store_id, mosq_mo_broker);
rc = db__message_store(&context, base_msg, message_expiry_interval, msg->store_id, mosq_mo_broker);
free(context.id);
free(context.username);
return rc;
error:
mosquitto_property_free_all(&msg->plugin_properties);

@ -159,7 +159,7 @@ int retain__store(const char *topic, struct mosquitto_base_msg *base_msg, char *
if(retainhier->retained){
if(persist && retainhier->retained->topic[0] != '$' && base_msg->payloadlen == 0){
/* Only delete if another retained message isn't replacing this one */
plugin_persist__handle_retain_msg_delete(retainhier->retained);
plugin_persist__queue_retain_event(retainhier->retained, MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE);
}
db__msg_store_ref_dec(&retainhier->retained);
#ifdef WITH_SYS_TREE
@ -173,8 +173,7 @@ int retain__store(const char *topic, struct mosquitto_base_msg *base_msg, char *
retainhier->retained = base_msg;
db__msg_store_ref_inc(retainhier->retained);
if(persist && retainhier->retained->topic[0] != '$'){
plugin_persist__handle_base_msg_add(retainhier->retained);
plugin_persist__handle_retain_msg_set(retainhier->retained);
plugin_persist__queue_retain_event(retainhier->retained, MOSQ_EVT_PERSIST_RETAIN_MSG_SET);
}
#ifdef WITH_SYS_TREE
db.retained_count++;

@ -239,3 +239,14 @@ void plugin_persist__handle_base_msg_add(struct mosquitto_base_msg *msg)
{
UNUSED(msg);
}
void plugin_persist__process_retain_events(bool force)
{
UNUSED(force);
}
void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event)
{
UNUSED(msg);
UNUSED(event);
}

@ -211,3 +211,14 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, const
UNUSED(context);
UNUSED(sub);
}
void plugin_persist__process_retain_events(bool force)
{
UNUSED(force);
}
void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event)
{
UNUSED(msg);
UNUSED(event);
}

@ -104,3 +104,14 @@ void plugin_persist__handle_retain_remove(struct mosquitto_base_msg *msg)
{
UNUSED(msg);
}
void plugin_persist__process_retain_events(bool force)
{
UNUSED(force);
}
void plugin_persist__queue_retain_event(struct mosquitto_base_msg *msg, int event)
{
UNUSED(msg);
UNUSED(event);
}

Loading…
Cancel
Save