From d2f598b6f62990a7491873692bd9f9bdcc228477 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Fri, 11 Feb 2022 23:38:50 +0000 Subject: [PATCH] persist-sqlite: commit transactions once every 5 seconds --- plugins/persist-sqlite/CMakeLists.txt | 1 + plugins/persist-sqlite/Makefile | 6 +++- plugins/persist-sqlite/client_msgs.c | 4 +++ plugins/persist-sqlite/clients.c | 4 +++ plugins/persist-sqlite/init.c | 2 ++ plugins/persist-sqlite/msg_store.c | 2 ++ plugins/persist-sqlite/persist_sqlite.h | 4 +++ plugins/persist-sqlite/plugin.c | 3 ++ plugins/persist-sqlite/retains.c | 2 ++ plugins/persist-sqlite/subscriptions.c | 2 ++ plugins/persist-sqlite/tick.c | 43 +++++++++++++++++++++++++ 11 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 plugins/persist-sqlite/tick.c diff --git a/plugins/persist-sqlite/CMakeLists.txt b/plugins/persist-sqlite/CMakeLists.txt index 287abddf..a1579658 100644 --- a/plugins/persist-sqlite/CMakeLists.txt +++ b/plugins/persist-sqlite/CMakeLists.txt @@ -21,6 +21,7 @@ if(SQLITE3_FOUND AND CJSON_FOUND) restore.c retains.c subscriptions.c + tick.c ) target_include_directories(mosquitto_persist_sqlite PRIVATE diff --git a/plugins/persist-sqlite/Makefile b/plugins/persist-sqlite/Makefile index 893f516a..df0ab157 100644 --- a/plugins/persist-sqlite/Makefile +++ b/plugins/persist-sqlite/Makefile @@ -14,7 +14,8 @@ OBJS= \ plugin.o \ restore.o \ retains.o \ - subscriptions.o + subscriptions.o \ + tick.o ifeq ($(WITH_SQLITE),yes) ALL_DEPS:= binary @@ -52,6 +53,9 @@ retains.o : retains.c persist_sqlite.h subscriptions.o : subscriptions.c persist_sqlite.h ${CROSS_COMPILE}${CC} $(LOCAL_CPPFLAGS) $(PLUGIN_CPPFLAGS) $(PLUGIN_CFLAGS) -c $< -o $@ +tick.o : tick.c persist_sqlite.h + ${CROSS_COMPILE}${CC} $(LOCAL_CPPFLAGS) $(PLUGIN_CPPFLAGS) $(PLUGIN_CFLAGS) -c $< -o $@ + reallyclean : clean clean: -rm -f *.o ${PLUGIN_NAME}.so *.gcda *.gcno diff --git a/plugins/persist-sqlite/client_msgs.c b/plugins/persist-sqlite/client_msgs.c index b2e5e38c..9cf75086 100644 --- a/plugins/persist-sqlite/client_msgs.c +++ b/plugins/persist-sqlite/client_msgs.c @@ -42,6 +42,7 @@ int persist_sqlite__client_msg_add_cb(int event, void *event_data, void *userdat && sqlite3_bind_int(ms->client_msg_add_stmt, 8, ed->state) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_msg_add_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -68,6 +69,7 @@ int persist_sqlite__client_msg_remove_cb(int event, void *event_data, void *user && sqlite3_bind_int(ms->client_msg_remove_stmt, 3, ed->direction) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_msg_remove_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -95,6 +97,7 @@ int persist_sqlite__client_msg_update_cb(int event, void *event_data, void *user && sqlite3_bind_int64(ms->client_msg_update_stmt, 4, (int64_t)ed->store_id) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_msg_update_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -120,6 +123,7 @@ int persist_sqlite__client_msg_clear_cb(int event, void *event_data, void *userd && sqlite3_bind_int64(ms->client_msg_clear_stmt, 2, ed->direction) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_msg_clear_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/clients.c b/plugins/persist-sqlite/clients.c index 235851ed..22b3c95f 100644 --- a/plugins/persist-sqlite/clients.c +++ b/plugins/persist-sqlite/clients.c @@ -55,6 +55,7 @@ int persist_sqlite__client_add_cb(int event, void *event_data, void *userdata) && sqlite3_bind_int(ms->client_add_stmt, 11, (int)ed->will_delay_interval) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_add_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -79,12 +80,14 @@ int persist_sqlite__client_remove_cb(int event, void *event_data, void *userdata if(sqlite3_bind_text(ms->subscription_clear_stmt, 1, ed->client_id, (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK){ + ms->event_count++; sqlite3_step(ms->subscription_clear_stmt); sqlite3_reset(ms->subscription_clear_stmt); } if(sqlite3_bind_text(ms->client_remove_stmt, 1, ed->client_id, (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK){ + ms->event_count++; rc = sqlite3_step(ms->client_remove_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -112,6 +115,7 @@ int persist_sqlite__client_update_cb(int event, void *event_data, void *userdata (int)strlen(ed->client_id), SQLITE_STATIC) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->client_update_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/init.c b/plugins/persist-sqlite/init.c index 82c9d97c..dd891019 100644 --- a/plugins/persist-sqlite/init.c +++ b/plugins/persist-sqlite/init.c @@ -278,6 +278,7 @@ int persist_sqlite__init(struct mosquitto_sqlite *ms) rc = prepare_statements(ms); if(rc) return rc; + sqlite3_exec(ms->db, "BEGIN;", NULL, NULL, NULL); return MOSQ_ERR_SUCCESS; fail: mosquitto_log_printf(MOSQ_LOG_ERR, "Sqlite persistence: Error opening database: %s", sqlite3_errstr(rc)); @@ -303,6 +304,7 @@ void persist_sqlite__cleanup(struct mosquitto_sqlite *ms) sqlite3_finalize(ms->retain_remove_stmt); if(ms->db){ + sqlite3_exec(ms->db, "END;", NULL, NULL, NULL); sqlite3_close(ms->db); ms->db = NULL; } diff --git a/plugins/persist-sqlite/msg_store.c b/plugins/persist-sqlite/msg_store.c index 4a282729..e618a1f9 100644 --- a/plugins/persist-sqlite/msg_store.c +++ b/plugins/persist-sqlite/msg_store.c @@ -200,6 +200,7 @@ int persist_sqlite__msg_add_cb(int event, void *event_data, void *userdata) } if(rc == 0){ + ms->event_count++; rc = sqlite3_step(ms->msg_add_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -222,6 +223,7 @@ int persist_sqlite__msg_remove_cb(int event, void *event_data, void *userdata) UNUSED(event); if(sqlite3_bind_int64(ms->msg_remove_stmt, 1, (int64_t)ed->store_id) == SQLITE_OK){ + ms->event_count++; rc = sqlite3_step(ms->msg_remove_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/persist_sqlite.h b/plugins/persist-sqlite/persist_sqlite.h index a484d786..0a924b39 100644 --- a/plugins/persist-sqlite/persist_sqlite.h +++ b/plugins/persist-sqlite/persist_sqlite.h @@ -20,6 +20,7 @@ Contributors: #define PERSIST_SQLITE_H #include +#include #ifndef UNUSED # define UNUSED(A) (void)(A) @@ -43,7 +44,9 @@ struct mosquitto_sqlite { sqlite3_stmt *msg_load_stmt; sqlite3_stmt *retain_add_stmt; sqlite3_stmt *retain_remove_stmt; + time_t last_transaction; int synchronous; + int event_count; }; int persist_sqlite__init(struct mosquitto_sqlite *ms); @@ -65,4 +68,5 @@ int persist_sqlite__retain_add_cb(int event, void *event_data, void *userdata); int persist_sqlite__retain_remove_cb(int event, void *event_data, void *userdata); int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userdata); int persist_sqlite__subscription_remove_cb(int event, void *event_data, void *userdata); +int persist_sqlite__tick_cb(int event, void *event_data, void *userdata); #endif diff --git a/plugins/persist-sqlite/plugin.c b/plugins/persist-sqlite/plugin.c index 7b88b6fa..319fbf99 100644 --- a/plugins/persist-sqlite/plugin.c +++ b/plugins/persist-sqlite/plugin.c @@ -118,6 +118,8 @@ int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, s if(rc) goto fail; rc = mosquitto_callback_register(plg_id, MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR, persist_sqlite__client_msg_clear_cb, NULL, &plg_data); if(rc) goto fail; + rc = mosquitto_callback_register(plg_id, MOSQ_EVT_TICK, persist_sqlite__tick_cb, NULL, &plg_data); + if(rc) goto fail; return MOSQ_ERR_SUCCESS; fail: @@ -153,6 +155,7 @@ int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *options, int mosquitto_callback_unregister(plg_id, MOSQ_EVT_PERSIST_CLIENT_MSG_DELETE, persist_sqlite__client_msg_remove_cb, NULL); mosquitto_callback_unregister(plg_id, MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE, persist_sqlite__client_msg_update_cb, NULL); mosquitto_callback_unregister(plg_id, MOSQ_EVT_PERSIST_CLIENT_MSG_CLEAR, persist_sqlite__client_msg_clear_cb, NULL); + mosquitto_callback_unregister(plg_id, MOSQ_EVT_TICK, persist_sqlite__tick_cb, NULL); } mosquitto_free(plg_data.db_file); diff --git a/plugins/persist-sqlite/retains.c b/plugins/persist-sqlite/retains.c index 90fdf307..c1bc04ac 100644 --- a/plugins/persist-sqlite/retains.c +++ b/plugins/persist-sqlite/retains.c @@ -35,6 +35,7 @@ int persist_sqlite__retain_add_cb(int event, void *event_data, void *userdata) && sqlite3_bind_int64(ms->retain_add_stmt, 2, (int64_t)ed->store_id) == SQLITE_OK ){ + ms->event_count++; rc = sqlite3_step(ms->retain_add_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -58,6 +59,7 @@ int persist_sqlite__retain_remove_cb(int event, void *event_data, void *userdata if(sqlite3_bind_text(ms->retain_remove_stmt, 1, ed->topic, (int)strlen(ed->topic), SQLITE_STATIC) == SQLITE_OK){ + ms->event_count++; rc = sqlite3_step(ms->retain_remove_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/subscriptions.c b/plugins/persist-sqlite/subscriptions.c index 8422461f..9d6bc552 100644 --- a/plugins/persist-sqlite/subscriptions.c +++ b/plugins/persist-sqlite/subscriptions.c @@ -43,6 +43,7 @@ int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userd if(sqlite3_bind_int(ms->subscription_add_stmt, 4, (int)ed->subscription_identifier) == SQLITE_OK){ + ms->event_count++; rc = sqlite3_step(ms->subscription_add_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; @@ -72,6 +73,7 @@ int persist_sqlite__subscription_remove_cb(int event, void *event_data, void *us if(sqlite3_bind_text(ms->subscription_remove_stmt, 2, ed->topic, (int)strlen(ed->topic), SQLITE_STATIC) == SQLITE_OK){ + ms->event_count++; rc = sqlite3_step(ms->subscription_remove_stmt); if(rc == SQLITE_DONE){ rc = MOSQ_ERR_SUCCESS; diff --git a/plugins/persist-sqlite/tick.c b/plugins/persist-sqlite/tick.c new file mode 100644 index 00000000..179b608c --- /dev/null +++ b/plugins/persist-sqlite/tick.c @@ -0,0 +1,43 @@ +/* +Copyright (c) 2021 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include +#include +#include + +#include "mqtt_protocol.h" +#include "mosquitto.h" +#include "mosquitto_broker.h" +#include "persist_sqlite.h" + +int persist_sqlite__tick_cb(int event, void *event_data, void *userdata) +{ + struct mosquitto_evt_tick *ed = event_data; + struct mosquitto_sqlite *ms = userdata; + + UNUSED(event); + + if(ed->now_s > ms->last_transaction + 5 && ms->event_count > 0){ + ms->last_transaction = ed->now_s; + ms->event_count = 0; + sqlite3_exec(ms->db, "END;", NULL, NULL, NULL); + sqlite3_exec(ms->db, "BEGIN;", NULL, NULL, NULL); + } + + return MOSQ_ERR_SUCCESS; +}