diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 6b949229..bb52907a 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -634,6 +634,8 @@ mosq_EXPORT int mosquitto_broker_publish_copy( */ mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result); +int mosquitto_broker_node_id_set(uint16_t id); + #ifdef __cplusplus } #endif diff --git a/src/database.c b/src/database.c index 2afcbb4c..d5b0dd1e 100644 --- a/src/database.c +++ b/src/database.c @@ -186,8 +186,6 @@ int db__open(struct mosquitto__config *config) if(!config) return MOSQ_ERR_INVAL; - util__random_bytes(&db.last_db_id, sizeof(db.last_db_id)); - db.contexts_by_id = NULL; db.contexts_by_sock = NULL; db.contexts_for_free = NULL; @@ -763,6 +761,54 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_ return sub__messages_queue(source_id, stored->topic, stored->qos, stored->retain, &stored); } + +#define MOSQ_UUID_EPOCH 1637168273 + +/* db__new_msg_id() attempts to generate a new unique id on the broker, or a + * number of brokers. It uses the 10-bit node ID, which can be set by plugins + * to allow different brokers to share the same plugin persistence database + * without overlapping one another. + * + * The message ID is a 64-bit unsigned integer arranged as follows: + * + * 10-bit ID 31-bit seconds 23-bit fractional seconds + * iiiiiiiiiisssssssssssssssssssssssssssssssnnnnnnnnnnnnnnnnnnnnnnn + * + * 10-bit ID gives a total of 1024 brokers can produce unique values (complete overkill) + * 31-bit seconds gives a roll over date of 68 years after MOSQ_UUID_EPOCH - 2089. + * This roll over date would affect messages that have been queued waiting + * for a client to receive them, or retained messages only. If either of + * those remains for 68 years unchanged, then there will potentially be a + * collision. Ideally we need to ensure, however, that the message id is + * continually increasing for sorting purposes. + * 23-bit fractional seconds gives a resolution of 120ns, or 8.4 million + * messages per second per broker. + */ +uint64_t db__new_msg_id(void) +{ + struct timespec ts; + uint64_t id; + uint64_t tmp; + + id = db.node_id_shifted; /* Top 10-bits */ + + clock_gettime(CLOCK_REALTIME, &ts); + + tmp = (ts.tv_sec - MOSQ_UUID_EPOCH) & 0x7FFFFFFF; + id = id | (tmp << 23); /* Seconds, 31-bits (68 years) */ + + tmp = (ts.tv_nsec & 0x7FFFFF80); /* top 23-bits of the bottom 30 bits (1 billion ns), ~100 ns resolution */ + id = id | (tmp >> 7); + + while(id <= db.last_db_id){ + id++; + } + db.last_db_id = id; + + return id; +} + + /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin) { @@ -802,7 +848,7 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store db.msg_store_bytes += stored->payloadlen; if(!store_id){ - stored->db_id = ++db.last_db_id; + stored->db_id = db__new_msg_id(); }else{ stored->db_id = store_id; } diff --git a/src/linker-macosx.syms b/src/linker-macosx.syms index d4e9f356..8113c373 100644 --- a/src/linker-macosx.syms +++ b/src/linker-macosx.syms @@ -1,3 +1,4 @@ +_mosquitto_broker_node_id_set _mosquitto_broker_publish _mosquitto_broker_publish_copy _mosquitto_callback_register diff --git a/src/linker.syms b/src/linker.syms index b14e1fac..d5d91866 100644 --- a/src/linker.syms +++ b/src/linker.syms @@ -1,4 +1,5 @@ { + mosquitto_broker_node_id_set; mosquitto_broker_publish; mosquitto_broker_publish_copy; mosquitto_callback_register; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 31c33456..50b987b3 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -464,6 +464,7 @@ struct mosquitto_message_v5{ struct mosquitto_db{ dbid_t last_db_id; + uint64_t node_id_shifted; struct mosquitto__subhier *subs; struct mosquitto__retainhier *retains; struct mosquitto *contexts_by_id; @@ -478,6 +479,7 @@ struct mosquitto_db{ struct mosquitto_msg_store *msg_store; time_t now_s; /* Monotonic clock, where possible */ time_t now_real_s; /* Read clock, for measuring session/message expiry */ + uint64_t node_id; /* for unique db ids */ int next_event_ms; /* for mux timeout */ int msg_store_count; unsigned long msg_store_bytes; @@ -727,6 +729,7 @@ int db__message_write_queued_out(struct mosquitto *context); int db__message_write_queued_in(struct mosquitto *context); void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); +uint64_t db__new_msg_id(void); /* ============================================================ * Subscription functions diff --git a/src/plugin_public.c b/src/plugin_public.c index 13fce019..bbbaeef6 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -409,3 +409,14 @@ void mosquitto_complete_basic_auth(const char *client_id, int result) } } } + +int mosquitto_broker_node_id_set(uint16_t id) +{ + if(id > 1023){ + return MOSQ_ERR_INVAL; + }else{ + db.node_id = id; + db.node_id_shifted = ((uint64_t)id) << 54; + return MOSQ_ERR_SUCCESS; + } +}