Broker id setting

This allows different brokers to share e.g. the same sql database and still have unique stored message ids.
pull/2438/head
Roger A. Light 4 years ago
parent 068c432b6c
commit 6fbdd71306

@ -634,6 +634,8 @@ mosq_EXPORT int mosquitto_broker_publish_copy(
*/ */
mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result); mosq_EXPORT void mosquitto_complete_basic_auth(const char* client_id, int result);
int mosquitto_broker_node_id_set(uint16_t id);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

@ -186,8 +186,6 @@ int db__open(struct mosquitto__config *config)
if(!config) return MOSQ_ERR_INVAL; if(!config) return MOSQ_ERR_INVAL;
util__random_bytes(&db.last_db_id, sizeof(db.last_db_id));
db.contexts_by_id = NULL; db.contexts_by_id = NULL;
db.contexts_by_sock = NULL; db.contexts_by_sock = NULL;
db.contexts_for_free = NULL; db.contexts_for_free = NULL;
@ -763,6 +761,54 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_
return sub__messages_queue(source_id, stored->topic, stored->qos, stored->retain, &stored); return sub__messages_queue(source_id, stored->topic, stored->qos, stored->retain, &stored);
} }
#define MOSQ_UUID_EPOCH 1637168273
/* db__new_msg_id() attempts to generate a new unique id on the broker, or a
* number of brokers. It uses the 10-bit node ID, which can be set by plugins
* to allow different brokers to share the same plugin persistence database
* without overlapping one another.
*
* The message ID is a 64-bit unsigned integer arranged as follows:
*
* 10-bit ID 31-bit seconds 23-bit fractional seconds
* iiiiiiiiiisssssssssssssssssssssssssssssssnnnnnnnnnnnnnnnnnnnnnnn
*
* 10-bit ID gives a total of 1024 brokers can produce unique values (complete overkill)
* 31-bit seconds gives a roll over date of 68 years after MOSQ_UUID_EPOCH - 2089.
* This roll over date would affect messages that have been queued waiting
* for a client to receive them, or retained messages only. If either of
* those remains for 68 years unchanged, then there will potentially be a
* collision. Ideally we need to ensure, however, that the message id is
* continually increasing for sorting purposes.
* 23-bit fractional seconds gives a resolution of 120ns, or 8.4 million
* messages per second per broker.
*/
uint64_t db__new_msg_id(void)
{
struct timespec ts;
uint64_t id;
uint64_t tmp;
id = db.node_id_shifted; /* Top 10-bits */
clock_gettime(CLOCK_REALTIME, &ts);
tmp = (ts.tv_sec - MOSQ_UUID_EPOCH) & 0x7FFFFFFF;
id = id | (tmp << 23); /* Seconds, 31-bits (68 years) */
tmp = (ts.tv_nsec & 0x7FFFFF80); /* top 23-bits of the bottom 30 bits (1 billion ns), ~100 ns resolution */
id = id | (tmp >> 7);
while(id <= db.last_db_id){
id++;
}
db.last_db_id = id;
return id;
}
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */ /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin) int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin)
{ {
@ -802,7 +848,7 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store
db.msg_store_bytes += stored->payloadlen; db.msg_store_bytes += stored->payloadlen;
if(!store_id){ if(!store_id){
stored->db_id = ++db.last_db_id; stored->db_id = db__new_msg_id();
}else{ }else{
stored->db_id = store_id; stored->db_id = store_id;
} }

@ -1,3 +1,4 @@
_mosquitto_broker_node_id_set
_mosquitto_broker_publish _mosquitto_broker_publish
_mosquitto_broker_publish_copy _mosquitto_broker_publish_copy
_mosquitto_callback_register _mosquitto_callback_register

@ -1,4 +1,5 @@
{ {
mosquitto_broker_node_id_set;
mosquitto_broker_publish; mosquitto_broker_publish;
mosquitto_broker_publish_copy; mosquitto_broker_publish_copy;
mosquitto_callback_register; mosquitto_callback_register;

@ -464,6 +464,7 @@ struct mosquitto_message_v5{
struct mosquitto_db{ struct mosquitto_db{
dbid_t last_db_id; dbid_t last_db_id;
uint64_t node_id_shifted;
struct mosquitto__subhier *subs; struct mosquitto__subhier *subs;
struct mosquitto__retainhier *retains; struct mosquitto__retainhier *retains;
struct mosquitto *contexts_by_id; struct mosquitto *contexts_by_id;
@ -478,6 +479,7 @@ struct mosquitto_db{
struct mosquitto_msg_store *msg_store; struct mosquitto_msg_store *msg_store;
time_t now_s; /* Monotonic clock, where possible */ time_t now_s; /* Monotonic clock, where possible */
time_t now_real_s; /* Read clock, for measuring session/message expiry */ time_t now_real_s; /* Read clock, for measuring session/message expiry */
uint64_t node_id; /* for unique db ids */
int next_event_ms; /* for mux timeout */ int next_event_ms; /* for mux timeout */
int msg_store_count; int msg_store_count;
unsigned long msg_store_bytes; unsigned long msg_store_bytes;
@ -727,6 +729,7 @@ int db__message_write_queued_out(struct mosquitto *context);
int db__message_write_queued_in(struct mosquitto *context); int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg); void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg);
uint64_t db__new_msg_id(void);
/* ============================================================ /* ============================================================
* Subscription functions * Subscription functions

@ -409,3 +409,14 @@ void mosquitto_complete_basic_auth(const char *client_id, int result)
} }
} }
} }
int mosquitto_broker_node_id_set(uint16_t id)
{
if(id > 1023){
return MOSQ_ERR_INVAL;
}else{
db.node_id = id;
db.node_id_shifted = ((uint64_t)id) << 54;
return MOSQ_ERR_SUCCESS;
}
}

Loading…
Cancel
Save