diff --git a/apps/db_dump/stubs.c b/apps/db_dump/stubs.c index 50a3f9d4..a1f8663d 100644 --- a/apps/db_dump/stubs.c +++ b/apps/db_dump/stubs.c @@ -105,11 +105,12 @@ ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count) return 0; } -int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics) +int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist) { UNUSED(topic); UNUSED(stored); UNUSED(split_topics); + UNUSED(persist); return 0; } diff --git a/include/mosquitto.h b/include/mosquitto.h index bfb9bf17..6c785bbb 100644 --- a/include/mosquitto.h +++ b/include/mosquitto.h @@ -185,6 +185,16 @@ struct mosquitto_message{ struct mosquitto; typedef struct mqtt5__property mosquitto_property; +struct mosquitto_message_v5{ + void *payload; + char *topic; + mosquitto_property *properties; + uint32_t payloadlen; + uint8_t qos; + bool retain; + uint8_t padding[2]; +}; + /* * Topic: Threads * libmosquitto provides thread safe operation, with the exception of diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 06c13d80..ce1d8163 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -39,6 +39,8 @@ extern "C" { #include #include +#include + struct mosquitto; typedef struct mqtt5__property mosquitto_property; @@ -48,6 +50,11 @@ enum mosquitto_protocol { mp_websockets }; +enum mosquitto_broker_msg_direction { + mosq_bmd_in = 0, + mosq_bmd_out = 1 +}; + /* ========================================================================= * * Section: Register callbacks. @@ -67,6 +74,22 @@ enum mosquitto_plugin_event { MOSQ_EVT_TICK = 9, MOSQ_EVT_DISCONNECT = 10, MOSQ_EVT_CONNECT = 11, + MOSQ_EVT_PERSIST_RESTORE = 12, + MOSQ_EVT_PERSIST_CONFIG_ADD = 13, + MOSQ_EVT_PERSIST_MSG_ADD = 14, + MOSQ_EVT_PERSIST_MSG_REMOVE = 15, + MOSQ_EVT_PERSIST_MSG_LOAD = 16, + MOSQ_EVT_PERSIST_RETAIN_ADD = 17, + MOSQ_EVT_PERSIST_RETAIN_REMOVE = 18, + MOSQ_EVT_PERSIST_CLIENT_ADD = 19, + MOSQ_EVT_PERSIST_CLIENT_REMOVE = 20, + MOSQ_EVT_PERSIST_CLIENT_UPDATE = 21, + MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD = 22, + MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE = 23, + MOSQ_EVT_PERSIST_CLIENT_MSG_ADD = 24, + MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE = 25, + MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE = 26, + MOSQ_EVT_PERSIST_CLIENT_MSG_LOAD = 27, }; /* Data for the MOSQ_EVT_RELOAD event */ @@ -179,6 +202,95 @@ struct mosquitto_evt_disconnect { void *future2[4]; }; +/* Data for the MOSQ_EVT_PERSIST_RESTORE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_restore { + void *future[8]; +}; + +/* Data for the MOSQ_EVT_PERSIST_CLIENT_ADD/_REMOVE/_UPDATE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_client { + const char *client_id; + const char *username; + const struct mosquitto_message_v5 *will; + time_t will_delay_time; /* update */ + time_t session_expiry_time; /* update */ + uint32_t will_delay_interval; + uint32_t session_expiry_interval; + uint32_t max_packet_size; + uint16_t listener_port; + uint8_t max_qos; + bool retain_available; + uint8_t padding[6]; + void *future[8]; +}; + + +/* Data for the MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD/_REMOVE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_subscription { + const char *client_id; + const char *topic; + uint32_t subscription_identifier; + uint8_t subscription_options; + uint8_t padding[3]; + void *future[8]; +}; + + +/* Data for the MOSQ_EVT_PERSIST_CLIENT_MSG_ADD/_REMOVE/_UPDATE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_client_msg { + const char *client_id; + uint64_t cmsg_id; + uint64_t store_id; + uint32_t subscription_identifier; + uint16_t mid; + uint8_t qos; + bool retain; + bool dup; /* add, update */ + uint8_t direction; + uint8_t state; /* add, update */ + uint8_t padding[5]; + void *future[8]; +}; + + +/* Data for the MOSQ_EVT_PERSIST_MSG_ADD/_REMOVE/_LOAD event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_msg { + uint64_t store_id; + int64_t expiry_time; + const char *topic; + const void *payload; + const char *source_id; + const char *source_username; + const mosquitto_property *properties; + uint32_t payloadlen; + uint16_t source_mid; + uint16_t source_port; + uint8_t qos; + bool retain; + uint8_t padding[6]; + void *future[8]; +}; + + +/* Data for the MOSQ_EVT_PERSIST_RETAIN/_REMOVE event */ +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +struct mosquitto_evt_persist_retain { + const char *topic; + uint64_t store_id; + void *future[8]; +}; + /* Callback definition */ typedef int (*MOSQ_FUNC_generic_callback)(int, void *, void *); @@ -648,6 +760,21 @@ mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result */ mosq_EXPORT int mosquitto_broker_node_id_set(uint16_t id); +/* NOTE: The persistence interface is currently marked as unstable, which means + * it may change in a future minor release. */ +int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client); +int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client); +int mosquitto_persist_client_remove(const char *client_id); +int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg); +int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg); +int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg); +int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg); +int mosquitto_persist_msg_remove(uint64_t store_id); +int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t subscription_options, uint32_t subscription_identifier); +int mosquitto_subscription_remove(const char *client_id, const char *topic); +int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *retain); +int mosquitto_persist_retain_remove(const char *topic); + #ifdef __cplusplus } #endif diff --git a/lib/handle_pubrec.c b/lib/handle_pubrec.c index 850b3ad4..d9f93ae3 100644 --- a/lib/handle_pubrec.c +++ b/lib/handle_pubrec.c @@ -94,7 +94,7 @@ int handle__pubrec(struct mosquitto *mosq) log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", SAFE_PRINT(mosq->id), mid); if(reason_code < 0x80){ - rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2); + rc = db__message_update_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, 2, true); }else{ return db__message_delete_outgoing(mosq, mid, mosq_ms_wait_for_pubrec, 2); } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b5e1a79c..da4f26ca 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -44,7 +44,7 @@ set (MOSQ_SRCS persist_write_v5.c persist_write.c persist.h plugin_callbacks.c plugin_v5.c plugin_v4.c plugin_v3.c plugin_v2.c - plugin_init.c plugin_cleanup.c + plugin_init.c plugin_cleanup.c plugin_persist.c plugin_acl_check.c plugin_basic_auth.c plugin_connect.c plugin_disconnect.c plugin_extended_auth.c plugin_message.c plugin_psk_key.c plugin_public.c plugin_tick.c diff --git a/src/Makefile b/src/Makefile index 8166d1c5..88743169 100644 --- a/src/Makefile +++ b/src/Makefile @@ -68,6 +68,7 @@ OBJS= mosquitto.o \ plugin_extended_auth.o \ plugin_init.o \ plugin_message.o \ + plugin_persist.o \ plugin_psk_key.o \ plugin_public.o \ plugin_tick.o \ @@ -303,6 +304,9 @@ plugin_init.o : plugin_init.c ../include/mosquitto_plugin.h mosquitto_broker_int plugin_message.o : plugin_message.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +plugin_persist.o : plugin_persist.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + plugin_psk_key.o : plugin_psk_key.c ../include/mosquitto_plugin.h mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/context.c b/src/context.c index 9a2dc8f7..91a9dbf4 100644 --- a/src/context.c +++ b/src/context.c @@ -234,6 +234,7 @@ void context__disconnect(struct mosquitto *context) context__send_will(context); net__socket_close(context); if(context->session_expiry_interval == 0){ + plugin_persist__handle_client_remove(context); /* Client session is due to be expired now */ #ifdef WITH_BRIDGE if(context->bridge == NULL) diff --git a/src/database.c b/src/database.c index baed0207..d4286246 100644 --- a/src/database.c +++ b/src/database.c @@ -275,13 +275,15 @@ void db__msg_store_free(struct mosquitto_msg_store *store) mosquitto__free(store); } -void db__msg_store_remove(struct mosquitto_msg_store *store) +void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify) { if(store == NULL) return; HASH_DELETE(hh, db.msg_store, store); db.msg_store_count--; db.msg_store_bytes -= store->payloadlen; - + if(db.shutdown == false || notify == false){ + plugin_persist__handle_msg_remove(store); + } db__msg_store_free(store); } @@ -291,7 +293,7 @@ void db__msg_store_clean(void) struct mosquitto_msg_store *store, *store_tmp; HASH_ITER(hh, db.msg_store, store, store_tmp){ - db__msg_store_remove(store); + db__msg_store_remove(store, false); } } @@ -304,7 +306,7 @@ void db__msg_store_ref_dec(struct mosquitto_msg_store **store) { (*store)->ref_count--; if((*store)->ref_count == 0){ - db__msg_store_remove(*store); + db__msg_store_remove(*store, true); *store = NULL; } } @@ -316,18 +318,20 @@ void db__msg_store_compact(void) HASH_ITER(hh, db.msg_store, store, store_tmp){ if(store->ref_count < 1){ - db__msg_store_remove(store); + db__msg_store_remove(store, true); } } } -static void db__message_remove(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item) +static void db__message_remove(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item) { - if(!msg_data || !item){ + if(!context || !msg_data || !item){ return; } + plugin_persist__handle_client_msg_remove(context, item); + DL_DELETE(msg_data->inflight, item); if(item->store){ db__msg_remove_from_inflight_stats(msg_data, item); @@ -372,7 +376,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo return MOSQ_ERR_PROTOCOL; } msg_index--; - db__message_remove(&context->msgs_out, tail); + db__message_remove(context, &context->msgs_out, tail); break; } } @@ -394,6 +398,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo tail->state = mosq_ms_publish_qos2; break; } + plugin_persist__handle_client_msg_update(context, tail); db__message_dequeue_first(context, &context->msgs_out); } #ifdef WITH_PERSISTENCE @@ -405,7 +410,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo /* Only for QoS 2 messages */ -int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored) +int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored, bool persist) { struct mosquitto_client_msg *msg; struct mosquitto_msg_data *msg_data; @@ -476,13 +481,18 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str db__msg_add_to_inflight_stats(msg_data, msg); } + if(persist){ + plugin_persist__handle_msg_add(msg->store); + plugin_persist__handle_client_msg_add(context, msg); + } + if(msg->store->qos > 0){ util__decrement_receive_quota(context); } return rc; } -int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update) +int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist) { struct mosquitto_client_msg *msg; struct mosquitto_msg_data *msg_data; @@ -613,6 +623,11 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin db__msg_add_to_inflight_stats(msg_data, msg); } + if(persist){ + plugin_persist__handle_msg_add(msg->store); + plugin_persist__handle_client_msg_add(context, msg); + } + if(db.config->allow_duplicate_messages == false && retain == false){ /* Record which client ids this message has been sent to so we can avoid duplicates. * Outgoing messages only. @@ -656,7 +671,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin return rc; } -int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos) +int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist) { struct mosquitto_client_msg *tail; @@ -666,6 +681,9 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo return MOSQ_ERR_PROTOCOL; } tail->state = state; + if(persist){ + plugin_persist__handle_client_msg_update(context, tail); + } return MOSQ_ERR_SUCCESS; } } @@ -933,6 +951,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) } break; } + plugin_persist__handle_client_msg_update(context, msg); } /* Messages received when the client was disconnected are put * in the mosq_ms_queued state. If we don't change them to the @@ -955,6 +974,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context) break; } db__message_dequeue_first(context, &context->msgs_out); + plugin_persist__handle_client_msg_update(context, msg); } } @@ -986,7 +1006,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) if(msg->qos != 2){ /* Anything msgs_in, msg); + db__message_remove(context, &context->msgs_in, msg); }else{ /* Message state can be preserved here because it should match * whatever the client has got. */ @@ -1014,6 +1034,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context) break; } db__message_dequeue_first(context, &context->msgs_in); + plugin_persist__handle_client_msg_update(context, msg); } } @@ -1042,7 +1063,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid) if(tail->store->qos != 2){ return MOSQ_ERR_PROTOCOL; } - db__message_remove(&context->msgs_in, tail); + db__message_remove(context, &context->msgs_in, tail); return MOSQ_ERR_SUCCESS; } } @@ -1078,12 +1099,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) * keep resending it. That means we don't send it to other * clients. */ if(topic == NULL){ - db__message_remove(&context->msgs_in, tail); + db__message_remove(context, &context->msgs_in, tail); deleted = true; }else{ rc = sub__messages_queue(source_id, topic, 2, retain, &tail->store); if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){ - db__message_remove(&context->msgs_in, tail); + db__message_remove(context, &context->msgs_in, tail); deleted = true; }else{ return 1; @@ -1103,6 +1124,7 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) send__pubrec(context, tail->mid, 0, NULL); tail->state = mosq_ms_wait_for_pubrel; db__message_dequeue_first(context, &context->msgs_in); + plugin_persist__handle_client_msg_update(context, tail); } } if(deleted){ @@ -1133,7 +1155,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru if(msg->direction == mosq_md_out && msg->qos > 0){ util__increment_send_quota(context); } - db__message_remove(&context->msgs_out, msg); + db__message_remove(context, &context->msgs_out, msg); return MOSQ_ERR_SUCCESS; }else{ expiry_interval = (uint32_t)(msg->store->message_expiry_time - db.now_real_s); @@ -1153,7 +1175,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru case mosq_ms_publish_qos0: rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, subscription_id, store_props, expiry_interval); if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(&context->msgs_out, msg); + db__message_remove(context, &context->msgs_out, msg); }else{ return rc; } @@ -1165,7 +1187,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru msg->dup = 1; /* Any retry attempts are a duplicate. */ msg->state = mosq_ms_wait_for_puback; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(&context->msgs_out, msg); + db__message_remove(context, &context->msgs_out, msg); }else{ return rc; } @@ -1177,7 +1199,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru msg->dup = 1; /* Any retry attempts are a duplicate. */ msg->state = mosq_ms_wait_for_pubrec; }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(&context->msgs_out, msg); + db__message_remove(context, &context->msgs_out, msg); }else{ return rc; } @@ -1288,7 +1310,9 @@ int db__message_write_queued_in(struct mosquitto *context) rc = send__pubrec(context, tail->mid, 0, NULL); if(!rc){ tail->state = mosq_ms_wait_for_pubrel; + plugin_persist__handle_client_msg_update(context, tail); }else{ + plugin_persist__handle_client_msg_update(context, tail); return rc; } } @@ -1322,6 +1346,7 @@ int db__message_write_queued_out(struct mosquitto *context) break; } db__message_dequeue_first(context, &context->msgs_out); + plugin_persist__handle_client_msg_update(context, tail); } return MOSQ_ERR_SUCCESS; } diff --git a/src/handle_connect.c b/src/handle_connect.c index c61a2d42..57b21109 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -185,6 +185,8 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 if(context->clean_start == true){ sub__clean_session(found_context); + found_context->session_expiry_interval = 0; + plugin_persist__handle_client_remove(found_context); } if((found_context->protocol == mosq_p_mqtt5 && found_context->session_expiry_interval == 0) || (found_context->protocol != mosq_p_mqtt5 && found_context->clean_start == true) @@ -335,6 +337,10 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 if(rc == MOSQ_ERR_SUCCESS){ plugin__handle_connect(context); + + if(context->session_expiry_interval != 0){ + plugin_persist__handle_client_add(context); + } } return rc; error: diff --git a/src/handle_publish.c b/src/handle_publish.c index 8846a6f1..e001c884 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -355,7 +355,7 @@ int handle__publish(struct mosquitto *context) break; case 2: if(dup == 0){ - res = db__message_insert_incoming(context, 0, stored); + res = db__message_insert_incoming(context, 0, stored, true); }else{ res = 0; } diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 5d772706..ea4e56db 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -212,6 +212,10 @@ int handle__subscribe(struct mosquitto *context) } log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); + + if(context->session_expiry_interval > 0){ + plugin_persist__handle_subscription_add(context, sub, subscription_options, subscription_identifier); + } } mosquitto__free(sub); diff --git a/src/handle_unsubscribe.c b/src/handle_unsubscribe.c index 13c884f7..3726d9cf 100644 --- a/src/handle_unsubscribe.c +++ b/src/handle_unsubscribe.c @@ -130,6 +130,9 @@ int handle__unsubscribe(struct mosquitto *context) log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s", sub); if(allowed){ rc = sub__remove(context, sub, db.subs, &reason); + if(context->session_expiry_interval > 0){ + plugin_persist__handle_subscription_remove(context, sub); + } }else{ rc = MOSQ_ERR_SUCCESS; } diff --git a/src/linker-macosx.syms b/src/linker-macosx.syms index 8113c373..ad563f1e 100644 --- a/src/linker-macosx.syms +++ b/src/linker-macosx.syms @@ -20,6 +20,16 @@ _mosquitto_kick_client_by_clientid _mosquitto_kick_client_by_username _mosquitto_log_printf _mosquitto_malloc +_mosquitto_persist_client_add +_mosquitto_persist_client_update +_mosquitto_persist_client_remove +_mosquitto_persist_client_msg_add +_mosquitto_persist_client_msg_remove +_mosquitto_persist_client_msg_update +_mosquitto_persist_msg_add +_mosquitto_persist_msg_remove +_mosquitto_persist_retain_add +_mosquitto_persist_retain_remove _mosquitto_plugin_set_info _mosquitto_property_add_binary _mosquitto_property_add_byte @@ -36,6 +46,8 @@ _mosquitto_strdup _mosquitto_sub_matches_acl _mosquitto_sub_matches_acl_with_pattern _mosquitto_sub_topic_check +_mosquitto_subscription_add +_mosquitto_subscription_remove _mosquitto_topic_matches_sub _mosquitto_topic_matches_sub_with_pattern _mosquitto_validate_utf8 diff --git a/src/linker.syms b/src/linker.syms index d5d91866..9dd2dac3 100644 --- a/src/linker.syms +++ b/src/linker.syms @@ -21,6 +21,16 @@ mosquitto_kick_client_by_username; mosquitto_log_printf; mosquitto_malloc; + mosquitto_persist_client_msg_add; + mosquitto_persist_client_msg_remove; + mosquitto_persist_client_msg_update; + mosquitto_persist_client_add; + mosquitto_persist_client_update; + mosquitto_persist_client_remove; + mosquitto_persist_msg_add; + mosquitto_persist_msg_remove; + mosquitto_persist_retain_add; + mosquitto_persist_retain_remove; mosquitto_plugin_set_info; mosquitto_property_add_binary; mosquitto_property_add_byte; @@ -37,6 +47,8 @@ mosquitto_sub_matches_acl; mosquitto_sub_matches_acl_with_pattern; mosquitto_sub_topic_check; + mosquitto_subscription_add; + mosquitto_subscription_remove; mosquitto_topic_matches_sub; mosquitto_topic_matches_sub_with_pattern; mosquitto_validate_utf8; diff --git a/src/loop.c b/src/loop.c index 846443e2..5a2a30ea 100644 --- a/src/loop.c +++ b/src/loop.c @@ -65,7 +65,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l) static struct lws_sorted_usec_list sul; #endif -static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry) +static int single_publish(struct mosquitto *context, struct mosquitto__message_v5 *msg, uint32_t message_expiry) { struct mosquitto_msg_store *stored; uint16_t mid; @@ -98,7 +98,7 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 }else{ mid = 0; } - return db__message_insert_outgoing(context, 0, mid, (uint8_t)msg->qos, 0, stored, 0, true); + return db__message_insert_outgoing(context, 0, mid, (uint8_t)msg->qos, 0, stored, 0, true, true); } @@ -130,7 +130,7 @@ static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t static void queue_plugin_msgs(void) { - struct mosquitto_message_v5 *msg, *tmp; + struct mosquitto__message_v5 *msg, *tmp; struct mosquitto *context; uint32_t message_expiry; diff --git a/src/mosquitto.c b/src/mosquitto.c index 0496ed8f..67083638 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -305,6 +305,9 @@ int main(int argc, char *argv[]) rc = mosquitto_security_init(false); if(rc) return rc; + plugin_persist__handle_restore(); + db__msg_store_compact(); + /* After loading persisted clients and ACLs, try to associate them, * so persisted subscriptions can start storing messages */ HASH_ITER(hh_id, db.contexts_by_id, ctxt, ctxt_tmp){ @@ -343,6 +346,7 @@ int main(int argc, char *argv[]) g_run = 1; rc = mosquitto_main_loop(g_listensock, g_listensock_count); + db.shutdown = true; log__printf(NULL, MOSQ_LOG_INFO, "mosquitto version %s terminating", VERSION); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 36cdd4c2..bed5e944 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -164,6 +164,20 @@ struct plugin__callbacks{ struct mosquitto__callback *message; struct mosquitto__callback *psk_key; struct mosquitto__callback *reload; + struct mosquitto__callback *persist_restore; + struct mosquitto__callback *persist_client_add; + struct mosquitto__callback *persist_client_remove; + struct mosquitto__callback *persist_client_update; + struct mosquitto__callback *persist_subscription_add; + struct mosquitto__callback *persist_subscription_remove; + struct mosquitto__callback *persist_client_msg_add; + struct mosquitto__callback *persist_client_msg_remove; + struct mosquitto__callback *persist_client_msg_update; + struct mosquitto__callback *persist_msg_add; + struct mosquitto__callback *persist_msg_remove; + struct mosquitto__callback *persist_msg_load; + struct mosquitto__callback *persist_retain_add; + struct mosquitto__callback *persist_retain_remove; }; struct mosquitto__security_options { @@ -404,6 +418,7 @@ struct mosquitto_msg_store{ uint16_t source_mid; uint8_t qos; bool retain; + bool stored; }; struct mosquitto_client_msg{ @@ -450,8 +465,8 @@ struct mosquitto__acl_user{ }; -struct mosquitto_message_v5{ - struct mosquitto_message_v5 *next, *prev; +struct mosquitto__message_v5{ + struct mosquitto__message_v5 *next, *prev; char *topic; void *payload; mosquitto_property *properties; @@ -501,12 +516,13 @@ struct mosquitto_db{ #ifdef WITH_KQUEUE int kqueuefd; #endif - struct mosquitto_message_v5 *plugin_msgs; + struct mosquitto__message_v5 *plugin_msgs; #ifdef WITH_TLS char *tls_keylog; /* This can't be in the config struct because it is used before the config is allocated. Config probably shouldn't be separately allocated. */ #endif + bool shutdown; }; enum mosquitto__bridge_direction{ @@ -702,18 +718,18 @@ int persist__restore(void); /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos); -int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update); -int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored); +int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist); +int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *stored, bool persist); int db__message_remove_incoming(struct mosquitto* context, uint16_t mid); int db__message_release_incoming(struct mosquitto *context, uint16_t mid); -int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos); +int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto *context, bool force_free); int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_t qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void db__msg_store_add(struct mosquitto_msg_store *store); -void db__msg_store_remove(struct mosquitto_msg_store *store); +void db__msg_store_remove(struct mosquitto_msg_store *store, bool notify); void db__msg_store_ref_inc(struct mosquitto_msg_store *store); void db__msg_store_ref_dec(struct mosquitto_msg_store **store); void db__msg_store_clean(void); @@ -840,6 +856,19 @@ int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store void LIB_ERROR(void); void plugin__handle_tick(void); int plugin__callback_unregister_all(mosquitto_plugin_id_t *identifier); +void plugin_persist__handle_restore(void); +void plugin_persist__handle_client_add(struct mosquitto *context); +void plugin_persist__handle_client_remove(struct mosquitto *context); +void plugin_persist__handle_client_update(struct mosquitto *context); +void plugin_persist__handle_subscription_add(struct mosquitto *context, const char *sub, uint8_t subscription_options, uint32_t subscription_identifier); +void plugin_persist__handle_subscription_remove(struct mosquitto *context, const char *sub); +void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg); +void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg); +void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg); +void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg); +void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg); +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg); +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg); /* ============================================================ * Property related functions @@ -864,7 +893,7 @@ int property__process_disconnect(struct mosquitto *context, mosquitto_property * int retain__init(void); void retain__clean(struct mosquitto__retainhier **retainhier); int retain__queue(struct mosquitto *context, const char *sub, uint8_t sub_qos, uint32_t subscription_identifier); -int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics); +int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist); /* ============================================================ * Security related functions diff --git a/src/persist_read.c b/src/persist_read.c index 9d0f98de..988902ff 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -306,6 +306,7 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) stored->retain = chunk.F.retain; stored->properties = chunk.properties; stored->payload = chunk.payload; + stored->source_listener = chunk.source.listener; rc = db__message_store(&chunk.source, stored, message_expiry_interval, chunk.F.store_id, mosq_mo_client); @@ -345,7 +346,7 @@ static int persist__retain_chunk_restore(FILE *db_fptr) HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), msg); if(msg){ if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return 1; - retain__store(msg->topic, msg, split_topics); + retain__store(msg->topic, msg, split_topics, true); mosquitto__free(local_topic); mosquitto__free(split_topics); }else{ diff --git a/src/plugin_callbacks.c b/src/plugin_callbacks.c index 9d9671d5..33783202 100644 --- a/src/plugin_callbacks.c +++ b/src/plugin_callbacks.c @@ -89,6 +89,34 @@ static struct mosquitto__callback **plugin__get_callback_base(struct mosquitto__ return &security_options->plugin_callbacks.disconnect; case MOSQ_EVT_CONNECT: return &security_options->plugin_callbacks.connect; + case MOSQ_EVT_PERSIST_RESTORE: + return &security_options->plugin_callbacks.persist_restore; + case MOSQ_EVT_PERSIST_CLIENT_ADD: + return &security_options->plugin_callbacks.persist_client_add; + case MOSQ_EVT_PERSIST_CLIENT_REMOVE: + return &security_options->plugin_callbacks.persist_client_remove; + case MOSQ_EVT_PERSIST_CLIENT_UPDATE: + return &security_options->plugin_callbacks.persist_client_update; + case MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD: + return &security_options->plugin_callbacks.persist_subscription_add; + case MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE: + return &security_options->plugin_callbacks.persist_subscription_remove; + case MOSQ_EVT_PERSIST_CLIENT_MSG_ADD: + return &security_options->plugin_callbacks.persist_client_msg_add; + case MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE: + return &security_options->plugin_callbacks.persist_client_msg_remove; + case MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE: + return &security_options->plugin_callbacks.persist_client_msg_update; + case MOSQ_EVT_PERSIST_MSG_ADD: + return &security_options->plugin_callbacks.persist_msg_add; + case MOSQ_EVT_PERSIST_MSG_REMOVE: + return &security_options->plugin_callbacks.persist_msg_remove; + case MOSQ_EVT_PERSIST_MSG_LOAD: + return &security_options->plugin_callbacks.persist_msg_load; + case MOSQ_EVT_PERSIST_RETAIN_ADD: + return &security_options->plugin_callbacks.persist_retain_add; + case MOSQ_EVT_PERSIST_RETAIN_REMOVE: + return &security_options->plugin_callbacks.persist_retain_remove; default: return NULL; } diff --git a/src/plugin_persist.c b/src/plugin_persist.c new file mode 100644 index 00000000..5a587f49 --- /dev/null +++ b/src/plugin_persist.c @@ -0,0 +1,327 @@ +/* +Copyright (c) 2021 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License 2.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include "mosquitto_broker_internal.h" +#include "mosquitto_internal.h" +#include "mosquitto_broker.h" +#include "memory_mosq.h" +#include "mqtt_protocol.h" +#include "send_mosq.h" +#include "util_mosq.h" +#include "utlist.h" +#include "lib_load.h" +#include "will_mosq.h" + + +void plugin_persist__handle_restore(void) +{ + struct mosquitto_evt_persist_restore event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + DL_FOREACH(opts->plugin_callbacks.persist_restore, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_RESTORE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_add(struct mosquitto *context) +{ + struct mosquitto_evt_persist_client event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + struct mosquitto_message_v5 will; + + UNUSED(will); /* FIXME */ + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + event_data.username = context->username; + event_data.will_delay_time = context->will_delay_time; + event_data.session_expiry_time = context->session_expiry_time; + event_data.will_delay_interval = context->will_delay_interval; + event_data.session_expiry_interval = context->session_expiry_interval; + if(context->listener){ + event_data.listener_port = context->listener->port; + }else{ + event_data.listener_port = 0; + } + event_data.max_qos = context->max_qos; + event_data.retain_available = context->retain_available; + event_data.max_packet_size = context->maximum_packet_size; + + DL_FOREACH(opts->plugin_callbacks.persist_client_add, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_ADD, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_update(struct mosquitto *context) +{ + struct mosquitto_evt_persist_client event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + struct mosquitto_message_v5 will; + + UNUSED(will); /* FIXME */ + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + event_data.username = context->username; + event_data.will_delay_time = context->will_delay_time; + event_data.session_expiry_time = context->session_expiry_time; + event_data.will_delay_interval = context->will_delay_interval; + event_data.session_expiry_interval = context->session_expiry_interval; + if(context->listener){ + event_data.listener_port = context->listener->port; + }else{ + event_data.listener_port = 0; + } + event_data.max_qos = context->max_qos; + event_data.retain_available = context->retain_available; + event_data.max_packet_size = context->maximum_packet_size; + + DL_FOREACH(opts->plugin_callbacks.persist_client_update, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_UPDATE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_remove(struct mosquitto *context) +{ + struct mosquitto_evt_persist_client event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + if(context->session_expiry_interval > 0 || context->id == NULL || context->state == mosq_cs_duplicate){ + return; + } + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + + DL_FOREACH(opts->plugin_callbacks.persist_client_remove, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_REMOVE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_subscription_add(struct mosquitto *context, const char *sub, uint8_t subscription_options, uint32_t subscription_identifier) +{ + struct mosquitto_evt_persist_subscription event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + event_data.topic = sub; + event_data.subscription_identifier = subscription_identifier; + event_data.subscription_options = subscription_options; + + DL_FOREACH(opts->plugin_callbacks.persist_subscription_add, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_subscription_remove(struct mosquitto *context, const char *sub) +{ + struct mosquitto_evt_persist_subscription event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + event_data.topic = sub; + + DL_FOREACH(opts->plugin_callbacks.persist_subscription_remove, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_REMOVE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + struct mosquitto_evt_persist_client_msg event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + if(context->session_expiry_interval == 0 + || (cmsg->qos == 0 && db.config->queue_qos0_messages == false)){ + + return; + } + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.client_id = context->id; + event_data.cmsg_id = cmsg->cmsg_id; + event_data.store_id = cmsg->store->db_id; + event_data.mid = cmsg->mid; + event_data.qos = cmsg->qos; + event_data.retain = cmsg->retain; + event_data.dup = cmsg->dup; + event_data.direction = cmsg->direction; + event_data.state = cmsg->state; + + DL_FOREACH(opts->plugin_callbacks.persist_client_msg_add, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_ADD, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + struct mosquitto_evt_persist_client_msg event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + event_data.client_id = context->id; + event_data.cmsg_id = cmsg->cmsg_id; + event_data.mid = cmsg->mid; + event_data.state = cmsg->state; + event_data.qos = cmsg->qos; + event_data.store_id = cmsg->store->db_id; + event_data.direction = cmsg->direction; + + DL_FOREACH(opts->plugin_callbacks.persist_client_msg_remove, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_REMOVE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + struct mosquitto_evt_persist_client_msg event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.client_id = context->id; + event_data.cmsg_id = cmsg->cmsg_id; + event_data.store_id = cmsg->store->db_id; + event_data.state = cmsg->state; + event_data.dup = cmsg->dup; + event_data.direction = cmsg->direction; + + DL_FOREACH(opts->plugin_callbacks.persist_client_msg_update, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_CLIENT_MSG_UPDATE, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg) +{ + struct mosquitto_evt_persist_msg event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + if(msg->stored) return; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.store_id = msg->db_id; + event_data.expiry_time = msg->message_expiry_time; + event_data.topic = msg->topic; + event_data.payload = msg->payload; + event_data.source_id = msg->source_id; + event_data.source_username = msg->source_username; + event_data.properties = msg->properties; + event_data.payloadlen = msg->payloadlen; + event_data.source_mid = msg->source_mid; + if(msg->source_listener){ + event_data.source_port = msg->source_listener->port; + }else{ + event_data.source_port = 0; + } + event_data.qos = msg->qos; + event_data.retain = msg->retain; + + DL_FOREACH(opts->plugin_callbacks.persist_msg_add, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_MSG_ADD, &event_data, cb_base->userdata); + } + msg->stored = true; +} + + +void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg) +{ + struct mosquitto_evt_persist_msg event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + if(msg->stored == false) return; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.store_id = msg->db_id; + + DL_FOREACH(opts->plugin_callbacks.persist_msg_remove, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_MSG_REMOVE, &event_data, cb_base->userdata); + } + msg->stored = false; +} + + +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg) +{ + struct mosquitto_evt_persist_retain event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.store_id = msg->db_id; + event_data.topic = msg->topic; + + DL_FOREACH(opts->plugin_callbacks.persist_retain_add, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_MSG_ADD, &event_data, cb_base->userdata); + } +} + + +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg) +{ + struct mosquitto_evt_persist_retain event_data; + struct mosquitto__callback *cb_base; + struct mosquitto__security_options *opts; + + opts = &db.config->security_options; + memset(&event_data, 0, sizeof(event_data)); + + event_data.topic = msg->topic; + + DL_FOREACH(opts->plugin_callbacks.persist_retain_remove, cb_base){ + cb_base->cb(MOSQ_EVT_PERSIST_MSG_REMOVE, &event_data, cb_base->userdata); + } +} diff --git a/src/plugin_public.c b/src/plugin_public.c index bbbaeef6..73bb438a 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -23,6 +23,7 @@ Contributors: #include "mqtt_protocol.h" #include "send_mosq.h" #include "util_mosq.h" +#include "will_mosq.h" #include "utlist.h" #include "will_mosq.h" @@ -189,7 +190,7 @@ int mosquitto_broker_publish( bool retain, mosquitto_property *properties) { - struct mosquitto_message_v5 *msg; + struct mosquitto__message_v5 *msg; if(topic == NULL || payloadlen < 0 @@ -199,7 +200,7 @@ int mosquitto_broker_publish( return MOSQ_ERR_INVAL; } - msg = mosquitto__malloc(sizeof(struct mosquitto_message_v5)); + msg = mosquitto__malloc(sizeof(struct mosquitto__message_v5)); if(msg == NULL) return MOSQ_ERR_NOMEM; msg->next = NULL; @@ -385,6 +386,340 @@ int mosquitto_kick_client_by_username(const char *username, bool with_will) } +int mosquitto_persist_client_add(const struct mosquitto_evt_persist_client *client) +{ + struct mosquitto *context; + int i; + + if(client == NULL || client->client_id == NULL) return MOSQ_ERR_INVAL; + + context = NULL; + HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context); + if(context){ + return MOSQ_ERR_INVAL; + } + + context = context__init(); + if(!context) return MOSQ_ERR_NOMEM; + + context->id = mosquitto__strdup(client->client_id); + if(client->username){ + context->username = mosquitto__strdup(client->username); + if(!context->username){ + mosquitto__free(context->id); + mosquitto__free(context); + return MOSQ_ERR_NOMEM; + } + }else{ + context->username = NULL; + } + context->clean_start = false; + context->will_delay_time = client->will_delay_time; + context->session_expiry_time = client->session_expiry_time; + context->will_delay_interval = client->will_delay_interval; + context->session_expiry_interval = client->session_expiry_interval; + context->max_qos = client->max_qos; + context->maximum_packet_size = client->max_packet_size; + context->retain_available = client->retain_available; + + /* in per_listener_settings mode, try to find the listener by persisted port */ + if(db.config->per_listener_settings && client->listener_port > 0){ + for(i=0; i < db.config->listener_count; i++){ + if(db.config->listeners[i].port == client->listener_port){ + context->listener = &db.config->listeners[i]; + break; + } + } + } + + HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context); + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_persist_client_update(const struct mosquitto_evt_persist_client *client) +{ + struct mosquitto *context; + int i; + char *username; + + if(client == NULL || client->client_id == NULL) return MOSQ_ERR_INVAL; + + context = NULL; + HASH_FIND(hh_id, db.contexts_by_id, client->client_id, strlen(client->client_id), context); + if(context == NULL){ + return MOSQ_ERR_INVAL; + } + + if(client->username){ + username = mosquitto__strdup(client->username); + if(!username){ + return MOSQ_ERR_NOMEM; + } + mosquitto_free(context->username); + context->username = username; + }else{ + mosquitto_free(context->username); + context->username = NULL; + } + context->clean_start = false; + context->will_delay_time = client->will_delay_time; + context->session_expiry_time = client->session_expiry_time; + context->will_delay_interval = client->will_delay_interval; + context->session_expiry_interval = client->session_expiry_interval; + context->max_qos = client->max_qos; + context->maximum_packet_size = client->max_packet_size; + context->retain_available = client->retain_available; + + /* in per_listener_settings mode, try to find the listener by persisted port */ + if(db.config->per_listener_settings && client->listener_port > 0){ + for(i=0; i < db.config->listener_count; i++){ + if(db.config->listeners[i].port == client->listener_port){ + context->listener = &db.config->listeners[i]; + break; + } + } + } + + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_persist_client_remove(const char *client_id) +{ + struct mosquitto *context; + + if(client_id == NULL) return MOSQ_ERR_INVAL; + + context = NULL; + HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context); + if(context == NULL){ + return MOSQ_ERR_SUCCESS; + } + + session_expiry__remove(context); + will_delay__remove(context); + will__clear(context); + + context->clean_start = true; + context->session_expiry_interval = 0; + mosquitto__set_state(context, mosq_cs_duplicate); + do_disconnect(context, MOSQ_ERR_SUCCESS); + + return MOSQ_ERR_SUCCESS; +} + + +struct mosquitto_msg_store *find_store_msg(uint64_t store_id) +{ + struct mosquitto_msg_store *stored; + + HASH_FIND(hh, db.msg_store, &store_id, sizeof(store_id), stored); + return stored; +} + +int mosquitto_persist_client_msg_add(const struct mosquitto_evt_persist_client_msg *client_msg) +{ + struct mosquitto *context; + struct mosquitto_msg_store *stored; + + if(client_msg == NULL || client_msg->client_id == NULL){ + return MOSQ_ERR_INVAL; + } + HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + if(context == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + stored = find_store_msg(client_msg->store_id); + if(stored == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + + if(client_msg->direction == mosq_md_out){ + if(client_msg->qos > 0){ + context->last_mid = client_msg->mid; + } + return db__message_insert_outgoing(context, client_msg->cmsg_id, client_msg->mid, client_msg->qos, client_msg->retain, + stored, client_msg->subscription_identifier, false, false); + }else{ + return db__message_insert_incoming(context, client_msg->cmsg_id, stored, false); + } + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_persist_client_msg_remove(const struct mosquitto_evt_persist_client_msg *client_msg) +{ + struct mosquitto *context; + struct mosquitto_msg_store *stored; + + if(client_msg == NULL || client_msg->client_id == NULL){ + return MOSQ_ERR_INVAL; + } + HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + if(context == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + stored = find_store_msg(client_msg->store_id); + if(stored == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + + if(client_msg->direction == mosq_md_out){ + if(client_msg->qos > 0){ + context->last_mid = client_msg->mid; + } + return db__message_delete_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos); + }else{ + return db__message_remove_incoming(context, client_msg->mid); + } + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_persist_client_msg_update(const struct mosquitto_evt_persist_client_msg *client_msg) +{ + struct mosquitto *context; + + if(client_msg == NULL || client_msg->client_id == NULL){ + return MOSQ_ERR_INVAL; + } + HASH_FIND(hh_id, db.contexts_by_id, client_msg->client_id, strlen(client_msg->client_id), context); + if(context == NULL){ + return MOSQ_ERR_NOT_FOUND; + } + + if(client_msg->direction == mosq_md_out){ + db__message_update_outgoing(context, client_msg->mid, client_msg->state, client_msg->qos, false); + } + return MOSQ_ERR_SUCCESS; +} + + +int mosquitto_subscription_add(const char *client_id, const char *topic, uint8_t subscription_options, uint32_t subscription_identifier) +{ + struct mosquitto *context; + + if(client_id == NULL || topic == NULL || client_id[0] == '\0' || topic[0] == '\0'){ + return MOSQ_ERR_INVAL; + } + + HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context); + + if(context){ + return sub__add(context, topic, subscription_options&0x03, subscription_identifier, subscription_options, &db.subs); + }else{ + return MOSQ_ERR_INVAL; + } +} + + +int mosquitto_subscription_remove(const char *client_id, const char *topic) +{ + struct mosquitto *context; + uint8_t reason; + + if(client_id == NULL || topic == NULL || client_id[0] == '\0' || topic[0] == '\0'){ + return MOSQ_ERR_INVAL; + } + + HASH_FIND(hh_id, db.contexts_by_id, client_id, strlen(client_id), context); + + if(context){ + return sub__remove(context, topic, db.subs, &reason); + }else{ + return MOSQ_ERR_INVAL; + } +} + + +int mosquitto_persist_msg_add(const struct mosquitto_evt_persist_msg *msg) +{ + struct mosquitto context; + struct mosquitto_msg_store *stored; + uint32_t message_expiry_interval; + time_t message_expiry_interval_tt; + int i; + + memset(&context, 0, sizeof(context)); + memset(&stored, 0, sizeof(stored)); + + if(msg->source_id){ + context.id = mosquitto__strdup(msg->source_id); + if(!context.id) return MOSQ_ERR_NOMEM; + } + if(msg->source_username){ + context.username = mosquitto__strdup(msg->source_username); + if(!context.username) return MOSQ_ERR_NOMEM; + } + + if(msg->expiry_time == 0){ + message_expiry_interval = 0; + }else if(msg->expiry_time <= db.now_real_s){ + message_expiry_interval = 1; + }else{ + message_expiry_interval_tt = msg->expiry_time - db.now_real_s; + if(message_expiry_interval_tt > UINT32_MAX){ + message_expiry_interval = UINT32_MAX; + }else{ + message_expiry_interval = (uint32_t)message_expiry_interval_tt; + } + } + + stored = mosquitto_calloc(1, sizeof(struct mosquitto_msg_store)); + if(stored == NULL){ + goto error; + } + stored->payloadlen = msg->payloadlen; + stored->source_mid = msg->source_mid; + stored->qos = msg->qos; + stored->retain = msg->retain; + + stored->payload = mosquitto_malloc(stored->payloadlen+1); + if(stored->payload == NULL){ + goto error; + } + memcpy(stored->payload, msg->payload, stored->payloadlen); + ((uint8_t *)stored->payload)[stored->payloadlen] = 0; /* Always zero terminate */ + + stored->topic = mosquitto_strdup(msg->topic); + if(stored->topic == NULL){ + goto error; + } + stored->properties = NULL; /* FIXME */ + + if(msg->source_port){ + for(i=0; ilistener_count; i++){ + if(db.config->listeners[i].port == msg->source_port){ + stored->source_listener = &db.config->listeners[i]; + break; + } + } + } + return db__message_store(&context, stored, message_expiry_interval, msg->store_id, mosq_mo_broker); + +error: + if(stored){ + mosquitto_property_free_all(&stored->properties); + mosquitto_free(stored->topic); + mosquitto_free(stored->payload); + mosquitto_free(stored); + } + return MOSQ_ERR_NOMEM; +} + + +int mosquitto_persist_msg_remove(uint64_t store_id) +{ + struct mosquitto_msg_store *stored; + + stored = find_store_msg(store_id); + db__msg_store_remove(stored, false); + + return MOSQ_ERR_SUCCESS; +} + + void mosquitto_complete_basic_auth(const char *client_id, int result) { struct mosquitto *context; diff --git a/src/retain.c b/src/retain.c index d4e4235a..b5bd5172 100644 --- a/src/retain.c +++ b/src/retain.c @@ -71,8 +71,54 @@ int retain__init(void) return MOSQ_ERR_SUCCESS; } +int mosquitto_persist_retain_add(struct mosquitto_evt_persist_retain *msg) +{ + struct mosquitto_msg_store *stored; + int rc = MOSQ_ERR_UNKNOWN; + char **split_topics = NULL; + char *local_topic = NULL; + + if(msg == NULL || msg->topic == NULL) return MOSQ_ERR_INVAL; + + HASH_FIND(hh, db.msg_store, &msg->store_id, sizeof(msg->store_id), stored); + if(stored){ + if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM; + + rc = retain__store(msg->topic, stored, split_topics, false); + mosquitto__free(split_topics); + mosquitto__free(local_topic); + } + + return rc; +} -int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics) + + +int mosquitto_persist_retain_remove(const char *topic) +{ + struct mosquitto_msg_store stored; + int rc = MOSQ_ERR_UNKNOWN; + char **split_topics = NULL; + char *local_topic = NULL; + + if(topic == NULL) return MOSQ_ERR_INVAL; + + memset(&stored, 0, sizeof(stored)); + stored.ref_count = 10; /* Ensure this isn't freed */ + + if(sub__topic_tokenise(topic, &local_topic, &split_topics, NULL)) return MOSQ_ERR_NOMEM; + + /* With stored->payloadlen == 0, this means the message will be removed */ + rc = retain__store(topic, &stored, split_topics, false); + mosquitto__free(split_topics); + mosquitto__free(local_topic); + + return rc; +} + + + +int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist) { struct mosquitto__retainhier *retainhier; struct mosquitto__retainhier *branch; @@ -111,19 +157,29 @@ int retain__store(const char *topic, struct mosquitto_msg_store *stored, char ** #endif if(retainhier->retained){ + if(persist){ + plugin_persist__handle_retain_remove(retainhier->retained); + } db__msg_store_ref_dec(&retainhier->retained); #ifdef WITH_SYS_TREE db.retained_count--; #endif + if(stored->payloadlen == 0){ + retainhier->retained = NULL; + } } if(stored->payloadlen){ retainhier->retained = stored; db__msg_store_ref_inc(retainhier->retained); + if(retainhier->retained->topic[0] != '$'){ + if(persist){ + plugin_persist__handle_msg_add(retainhier->retained); + plugin_persist__handle_retain_add(retainhier->retained); + } + } #ifdef WITH_SYS_TREE db.retained_count++; #endif - }else{ - retainhier->retained = NULL; } return MOSQ_ERR_SUCCESS; @@ -138,6 +194,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt struct mosquitto_msg_store *retained; if(branch->retained->message_expiry_time > 0 && db.now_real_s >= branch->retained->message_expiry_time){ + plugin_persist__handle_retain_remove(branch->retained); db__msg_store_ref_dec(&branch->retained); branch->retained = NULL; #ifdef WITH_SYS_TREE @@ -188,7 +245,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt }else{ mid = 0; } - return db__message_insert_outgoing(context, 0, mid, qos, true, retained, subscription_identifier, false); + return db__message_insert_outgoing(context, 0, mid, qos, true, retained, subscription_identifier, false, true); } diff --git a/src/session_expiry.c b/src/session_expiry.c index 0bf9c36e..737d0f3e 100644 --- a/src/session_expiry.c +++ b/src/session_expiry.c @@ -78,6 +78,8 @@ int session_expiry__add(struct mosquitto *context) DL_INSERT_INORDER(expiry_list, item, session_expiry__cmp); + plugin_persist__handle_client_update(context); + return MOSQ_ERR_SUCCESS; } @@ -146,6 +148,7 @@ void session_expiry__check(void) context->will_delay_interval = 0; will_delay__remove(context); context__send_will(context); + plugin_persist__handle_client_remove(context); context__add_to_disused(context); }else{ timeout = (item->context->session_expiry_time - db.now_real_s + 1) * 1000; diff --git a/src/subs.c b/src/subs.c index a865c819..90489dc3 100644 --- a/src/subs.c +++ b/src/subs.c @@ -93,7 +93,7 @@ static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_ }else{ client_retain = false; } - if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true) == 1){ + if(db__message_insert_outgoing(leaf->context, 0, mid, msg_qos, client_retain, stored, leaf->identifier, true, true) == 1){ return 1; } }else{ @@ -661,7 +661,7 @@ int sub__messages_queue(const char *source_id, const char *topic, uint8_t qos, i } if(retain){ - rc2 = retain__store(topic, *stored, split_topics); + rc2 = retain__store(topic, *stored, split_topics, true); if(rc2) rc = rc2; } diff --git a/src/will_delay.c b/src/will_delay.c index 55ca0ede..6569ab20 100644 --- a/src/will_delay.c +++ b/src/will_delay.c @@ -54,6 +54,7 @@ int will_delay__add(struct mosquitto *context) DL_INSERT_INORDER(delay_list, item, will_delay__cmp); loop__update_next_event(item->context->will_delay_interval*1000); + plugin_persist__handle_client_update(context); return MOSQ_ERR_SUCCESS; } diff --git a/test/broker/15-persist-client-msg-in-v3-1-1.py b/test/broker/15-persist-client-msg-in-v3-1-1.py new file mode 100755 index 00000000..e223a9f8 --- /dev/null +++ b/test/broker/15-persist-client-msg-in-v3-1-1.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 + +# Connect a client, start a QoS 2 flow, disconnect, restore, carry on with the +# QoS 2 flow. Is it received? + +from mosq_test_helper import * +import persist_help + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persist_help.write_config(conf_file, port) + +rc = 1 + +persist_help.init(port) + +keepalive = 10 +client_id = "persist-client-msg-in-v3-1-1" +proto_ver = 4 + +helper_id = "persist-client-msg-in-v3-1-1-helper" +topic = "client-msg-in/2" +qos = 2 + +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False) +connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver) +mid = 1 +publish_packet = mosq_test.gen_publish(topic=topic, qos=qos, payload="message", mid=mid, proto_ver=proto_ver) +pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver) +pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver) +pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver) + +connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True) +subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos=qos, proto_ver=proto_ver) +suback_packet = mosq_test.gen_suback(mid=mid, qos=qos, proto_ver=proto_ver) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +con = None +try: + # Connect client, start flow, disconnect + sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port) + mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec send") + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect helper and subscribe + helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port) + mosq_test.do_send_receive(helper, subscribe_packet, suback_packet, "suback helper") + + # Complete the flow + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port) + mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubrel send") + + mosq_test.do_receive_send(helper, publish_packet, pubrec_packet, "pubrec receive") + mosq_test.do_receive_send(helper, pubrel_packet, pubcomp_packet, "pubcomp receive") + + rc = 0 +finally: + if broker is not None: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + rc += persist_help.cleanup(port) + + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/15-persist-client-msg-out-v3-1-1.py b/test/broker/15-persist-client-msg-out-v3-1-1.py new file mode 100755 index 00000000..4b43b6ae --- /dev/null +++ b/test/broker/15-persist-client-msg-out-v3-1-1.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +# Connect a client, add a subscription, disconnect, send a message with a +# different client, restore, reconnect, check it is received. + +from mosq_test_helper import * +import persist_help + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persist_help.write_config(conf_file, port) + +rc = 1 + +persist_help.init(port) + +keepalive = 10 +client_id = "persist-client-msg-v3-1-1" +proto_ver = 4 + +helper_id = "persist-client-msg-v3-1-1-helper" +topic0 = "client-msg/0" +topic1 = "client-msg/1" +topic2 = "client-msg/2" + +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False) +connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver) +mid = 1 +subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver) +suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver) +subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver) +suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver) +subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver) +suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver) + +connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True) +publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver) +mid = 1 +publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver) +puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver) +mid = 2 +publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver) +pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver) +pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver) +pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver) + + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +con = None +try: + # Connect client, subscribe, disconnect + sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port) + mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0") + mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1") + mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2") + sock.close() + + # Connect helper and publish + helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port) + helper.send(publish_packet0) + mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper") + mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper") + mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper") + helper.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client again, it should have a session + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port) + + # Does the client get the messages + mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1") + mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2") + mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2") + sock.close() + + # Connect client again, it should have a session + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port) + # If there are messages, the ping will fail + mosq_test.do_ping(sock) + + rc = 0 +finally: + if broker is not None: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + rc += persist_help.cleanup(port) + + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/15-persist-client-v3-1-1.py b/test/broker/15-persist-client-v3-1-1.py new file mode 100755 index 00000000..5a59ffa2 --- /dev/null +++ b/test/broker/15-persist-client-v3-1-1.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 + +# Connect a client, check it is restored, clear the client, check it is not there. + +from mosq_test_helper import * +import persist_help + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persist_help.write_config(conf_file, port) + +rc = 1 + +persist_help.init(port) + +keepalive = 10 +client_id = "persist-client-v3-1-1" +proto_ver = 4 + +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False) +connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver) + +connect_packet_clean = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +con = None +try: + # Connect client + sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port, connack_error="connack 1") + mosq_test.do_ping(sock) + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client again, it should have a session + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port, connack_error="connack 2") + mosq_test.do_ping(sock) + sock.close() + + # Clear the client + sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 3") + mosq_test.do_ping(sock) + sock.close() + + # Connect client, it should not have a session + sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 4") + mosq_test.do_ping(sock) + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client, it should not have a session + sock = mosq_test.do_client_connect(connect_packet_clean, connack_packet1, timeout=5, port=port, connack_error="connack 5") + mosq_test.do_ping(sock) + sock.close() + + rc = 0 +finally: + if broker is not None: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + rc += persist_help.cleanup(port) + + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/15-persist-retain-v3-1-1.py b/test/broker/15-persist-retain-v3-1-1.py new file mode 100755 index 00000000..d679d7dc --- /dev/null +++ b/test/broker/15-persist-retain-v3-1-1.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +# Publish a retained messages, check they are restored + +from mosq_test_helper import * +import persist_help + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persist_help.write_config(conf_file, port) + +rc = 1 + +persist_help.init(port) + +keepalive = 10 +topic1 = "test/retain1" +topic2 = "test/retain2" +topic3 = "test/retain3" +source_id = "persist-retain-v3-1-1" +qos = 0 +payload2 = "retained message 2" +payload3 = "retained message 3" +proto_ver = 4 +connect_packet = mosq_test.gen_connect(source_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True) +connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) + +publish1_packet = mosq_test.gen_publish(topic1, qos=qos, payload="retained message 1", retain=True, proto_ver=proto_ver) +publish2_packet = mosq_test.gen_publish(topic2, qos=qos, payload=payload2, retain=False, proto_ver=proto_ver) +publish3_packet = mosq_test.gen_publish(topic3, qos=qos, payload=payload3, retain=True, proto_ver=proto_ver) + +mid = 1 +subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0, proto_ver=4) +suback_packet = mosq_test.gen_suback(mid, qos=0, proto_ver=4) + +mid = 2 +unsubscribe_packet = mosq_test.gen_unsubscribe(mid, "#", proto_ver=4) +unsuback_packet = mosq_test.gen_unsuback(mid, proto_ver=4) + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +try: + # Connect client + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port) + # Check no retained messages exist + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + # Ping will fail if a PUBLISH is received + mosq_test.do_ping(sock) + # Unsubscribe, so we don't receive the messages + mosq_test.do_send_receive(sock, unsubscribe_packet, unsuback_packet, "unsuback") + + # Send some retained messages + sock.send(publish1_packet) + mosq_test.do_ping(sock) + sock.send(publish2_packet) # Not retained + mosq_test.do_ping(sock) + sock.send(publish3_packet) + mosq_test.do_ping(sock) + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client + sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port) + # Subscribe + mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback") + # Check retained messages exist + mosq_test.receive_unordered(sock, publish1_packet, publish3_packet, "publish 1 / 3") + mosq_test.do_ping(sock) + + rc = 0 +finally: + if broker is not None: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + rc += persist_help.cleanup(port) + + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/15-persist-subscription-v3-1-1.py b/test/broker/15-persist-subscription-v3-1-1.py new file mode 100755 index 00000000..b5cb5d58 --- /dev/null +++ b/test/broker/15-persist-subscription-v3-1-1.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 + +# Connect a client, add a subscription, disconnect, restore, reconnect, send a +# message with a different client, check it is received. + +from mosq_test_helper import * +import persist_help + +port = mosq_test.get_port() +conf_file = os.path.basename(__file__).replace('.py', '.conf') +persist_help.write_config(conf_file, port) + +rc = 1 + +persist_help.init(port) + +keepalive = 10 +client_id = "persist-subscription-v3-1-1" +proto_ver = 4 + +helper_id = "persist-subscription-v3-1-1-helper" +topic0 = "subscription/0" +topic1 = "subscription/1" +topic2 = "subscription/2" + +connect_packet = mosq_test.gen_connect(client_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=False) +connack_packet1 = mosq_test.gen_connack(rc=0, proto_ver=proto_ver) +connack_packet2 = mosq_test.gen_connack(rc=0, flags=1, proto_ver=proto_ver) +mid = 1 +subscribe_packet0 = mosq_test.gen_subscribe(mid, topic0, qos=0, proto_ver=proto_ver) +suback_packet0 = mosq_test.gen_suback(mid=mid, qos=0, proto_ver=proto_ver) +subscribe_packet1 = mosq_test.gen_subscribe(mid, topic1, qos=1, proto_ver=proto_ver) +suback_packet1 = mosq_test.gen_suback(mid=mid, qos=1, proto_ver=proto_ver) +subscribe_packet2 = mosq_test.gen_subscribe(mid, topic2, qos=2, proto_ver=proto_ver) +suback_packet2 = mosq_test.gen_suback(mid=mid, qos=2, proto_ver=proto_ver) + +unsubscribe_packet2 = mosq_test.gen_unsubscribe(mid, topic2, proto_ver=proto_ver) +unsuback_packet2 = mosq_test.gen_unsuback(mid=mid, proto_ver=proto_ver) + +connect_packet_helper = mosq_test.gen_connect(helper_id, keepalive=keepalive, proto_ver=proto_ver, clean_session=True) +publish_packet0 = mosq_test.gen_publish(topic=topic0, qos=0, payload="message", proto_ver=proto_ver) +mid = 1 +publish_packet1 = mosq_test.gen_publish(topic=topic1, qos=1, payload="message", mid=mid, proto_ver=proto_ver) +puback_packet = mosq_test.gen_puback(mid=mid, proto_ver=proto_ver) +mid = 2 +publish_packet2 = mosq_test.gen_publish(topic=topic2, qos=2, payload="message", mid=mid, proto_ver=proto_ver) +pubrec_packet = mosq_test.gen_pubrec(mid=mid, proto_ver=proto_ver) +pubrel_packet = mosq_test.gen_pubrel(mid=mid, proto_ver=proto_ver) +pubcomp_packet = mosq_test.gen_pubcomp(mid=mid, proto_ver=proto_ver) + + +broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + +con = None +try: + # Connect client + sock = mosq_test.do_client_connect(connect_packet, connack_packet1, timeout=5, port=port) + mosq_test.do_send_receive(sock, subscribe_packet0, suback_packet0, "suback 0") + mosq_test.do_send_receive(sock, subscribe_packet1, suback_packet1, "suback 1") + mosq_test.do_send_receive(sock, subscribe_packet2, suback_packet2, "suback 2") + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client again, it should have a session + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port) + mosq_test.do_ping(sock) + + # Connect helper and publish + helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port) + helper.send(publish_packet0) + mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper") + mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper") + mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper") + helper.close() + + # Does the client get the messages + mosq_test.expect_packet(sock, "publish 0", publish_packet0) + mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1") + mosq_test.do_receive_send(sock, publish_packet2, pubrec_packet, "publish 2") + mosq_test.do_receive_send(sock, pubrel_packet, pubcomp_packet, "pubrel 2") + + # Unsubscribe + mosq_test.do_send_receive(sock, unsubscribe_packet2, unsuback_packet2, "unsuback 2") + sock.close() + + # Kill broker + broker.terminate() + broker.wait() + + # Restart broker + broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port) + + # Connect client again, it should have a session + sock = mosq_test.do_client_connect(connect_packet, connack_packet2, timeout=5, port=port) + mosq_test.do_ping(sock) + + # Connect helper and publish + helper = mosq_test.do_client_connect(connect_packet_helper, connack_packet1, timeout=5, port=port) + helper.send(publish_packet0) + mosq_test.do_send_receive(helper, publish_packet1, puback_packet, "puback helper") + mosq_test.do_send_receive(helper, publish_packet2, pubrec_packet, "pubrec helper") + mosq_test.do_send_receive(helper, pubrel_packet, pubcomp_packet, "pubcomp helper") + helper.close() + + # Does the client get the messages + mosq_test.expect_packet(sock, "publish 0", publish_packet0) + mosq_test.do_receive_send(sock, publish_packet1, puback_packet, "publish 1") + mosq_test.do_ping(sock) + + rc = 0 +finally: + if broker is not None: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + os.remove(conf_file) + rc += persist_help.cleanup(port) + + if rc: + print(stde.decode('utf-8')) + + +exit(rc) diff --git a/test/broker/Makefile b/test/broker/Makefile index 9d4dcee3..79cd0b3c 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -250,3 +250,10 @@ ifeq ($(WITH_CJSON),yes) ./14-dynsec-role-invalid.py endif endif + +15 : + ./15-persist-client-v3-1-1.py + ./15-persist-subscription-v3-1-1.py + ./15-persist-retain-v3-1-1.py + ./15-persist-client-msg-in-v3-1-1.py + ./15-persist-client-msg-out-v3-1-1.py \ No newline at end of file diff --git a/test/broker/persist_help.py b/test/broker/persist_help.py new file mode 100644 index 00000000..74db90ad --- /dev/null +++ b/test/broker/persist_help.py @@ -0,0 +1,13 @@ +# This can be rewritten to easily support persistence plugins with all of +# the 15-persist-* tests. + +def init(port): + pass + +def cleanup(port): + return 0 + +def write_config(filename, port): + with open(filename, 'w') as f: + #f.write("plugin ..\n") + pass diff --git a/test/broker/test.py b/test/broker/test.py index bad82981..952ebf76 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -211,6 +211,12 @@ tests = [ (1, './14-dynsec-plugin-invalid.py'), (1, './14-dynsec-role.py'), (1, './14-dynsec-role-invalid.py'), + + #(1, './15-persist-client-msg-in-v3-1-1.py'), + #(1, './15-persist-client-msg-out-v3-1-1.py'), + #(1, './15-persist-client-v3-1-1.py'), + #(1, './15-persist-retain-v3-1-1.py'), + #(1, './15-persist-subscription-v3-1-1.py'), ] ptest.run_tests(tests) diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 1e683264..455f274f 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -165,16 +165,17 @@ int sub__add(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t i return MOSQ_ERR_SUCCESS; } -int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *msg) +int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto_msg_store *msg, bool persist) { UNUSED(context); UNUSED(cmsg_id); UNUSED(msg); + UNUSED(persist); return MOSQ_ERR_SUCCESS; } -int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update) +int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, uint32_t subscription_identifier, bool update, bool persist) { UNUSED(context); UNUSED(cmsg_id); @@ -184,6 +185,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin UNUSED(stored); UNUSED(subscription_identifier); UNUSED(update); + UNUSED(persist); return MOSQ_ERR_SUCCESS; } @@ -224,3 +226,16 @@ void context__add_to_by_id(struct mosquitto *context) HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context); } } + +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c index 74ddbfd8..cfc084de 100644 --- a/test/unit/persist_write_stubs.c +++ b/test/unit/persist_write_stubs.c @@ -170,3 +170,39 @@ void context__add_to_by_id(struct mosquitto *context) HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, context->id, strlen(context->id), context); } } +void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_clear(struct mosquitto *context, uint8_t direction) +{ + UNUSED(context); + UNUSED(direction); +} +void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} diff --git a/test/unit/stubs.c b/test/unit/stubs.c index a9f60b95..3dcd5992 100644 --- a/test/unit/stubs.c +++ b/test/unit/stubs.c @@ -13,6 +13,10 @@ struct mosquitto_db{ }; +struct mosquitto_msg_store{ + +}; + int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt, ...) { UNUSED(mosq); @@ -90,3 +94,13 @@ ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count) UNUSED(count); return 1; } + +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} + +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} diff --git a/test/unit/subs_stubs.c b/test/unit/subs_stubs.c index 2012eb85..527c50ab 100644 --- a/test/unit/subs_stubs.c +++ b/test/unit/subs_stubs.c @@ -146,11 +146,12 @@ int retain__queue(struct mosquitto *context, const char *sub, uint8_t sub_qos, u return MOSQ_ERR_SUCCESS; } -int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics) +int retain__store(const char *topic, struct mosquitto_msg_store *stored, char **split_topics, bool persist) { UNUSED(topic); UNUSED(stored); UNUSED(split_topics); + UNUSED(persist); return MOSQ_ERR_SUCCESS; } @@ -188,3 +189,40 @@ int util__random_bytes(void *bytes, int count) return MOSQ_ERR_SUCCESS; } + +void plugin_persist__handle_client_msg_add(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_remove(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_update(struct mosquitto *context, const struct mosquitto_client_msg *cmsg) +{ + UNUSED(context); + UNUSED(cmsg); +} +void plugin_persist__handle_client_msg_clear(struct mosquitto *context, uint8_t direction) +{ + UNUSED(context); + UNUSED(direction); +} +void plugin_persist__handle_msg_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_msg_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_retain_add(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +} +void plugin_persist__handle_retain_remove(struct mosquitto_msg_store *msg) +{ + UNUSED(msg); +}