From a791532c12b576942edc43beefdd24c9b7c9409b Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Fri, 10 Mar 2023 23:33:20 +0000 Subject: [PATCH] Simplify $SYS metric generation. --- lib/packet_mosq.c | 40 +-- lib/send_disconnect.c | 1 + lib/send_mosq.c | 2 +- lib/send_publish.c | 10 +- lib/send_subscribe.c | 1 - man/mosquitto.8.xml | 198 +++++++++++++-- src/bridge.c | 4 +- src/context.c | 4 +- src/database.c | 6 +- src/handle_connect.c | 2 - src/handle_publish.c | 2 +- src/loop.c | 2 +- src/mosquitto_broker_internal.h | 2 +- src/net.c | 2 +- src/read_handle.c | 2 + src/send_auth.c | 1 + src/send_connack.c | 1 + src/send_suback.c | 1 + src/send_unsuback.c | 1 + src/session_expiry.c | 2 +- src/sys_tree.c | 436 +++++++++++--------------------- src/sys_tree.h | 115 +++++---- src/websockets.c | 13 +- test/unit/persist_write_stubs.c | 25 +- test/unit/subs_stubs.c | 25 +- 25 files changed, 447 insertions(+), 451 deletions(-) diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index fa044941..720e1ee3 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -42,14 +42,8 @@ Contributors: # include "sys_tree.h" # include "send_mosq.h" #else -# define G_BYTES_RECEIVED_INC(A) -# define G_BYTES_SENT_INC(A) -# define G_MSGS_SENT_INC(A) -# define G_PUB_MSGS_SENT_INC(A) -# define G_OUT_PACKET_COUNT_INC(A) -# define G_OUT_PACKET_COUNT_DEC(A) -# define G_OUT_PACKET_BYTES_INC(A) -# define G_OUT_PACKET_BYTES_DEC(A) +# define metrics__int_inc(stat, val) +# define metrics__int_dec(stat, val) #endif int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length) @@ -124,8 +118,8 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq) mosquitto__FREE(packet); } - G_OUT_PACKET_COUNT_DEC(mosq->out_packet_count); - G_OUT_PACKET_BYTES_DEC(mosq->out_packet_bytes); + metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count); + metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes); mosq->out_packet_count = 0; mosq->out_packet_bytes = 0; mosq->out_packet_last = NULL; @@ -152,8 +146,8 @@ static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packe mosq->out_packet_last = packet; mosq->out_packet_count++; mosq->out_packet_bytes += packet->packet_length; - G_OUT_PACKET_COUNT_INC(1); - G_OUT_PACKET_BYTES_INC(packet->packet_length); + metrics__int_inc(mosq_gauge_out_packets, 1); + metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length); pthread_mutex_unlock(&mosq->out_packet_mutex); } @@ -236,8 +230,8 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq) if(mosq->out_packet){ mosq->out_packet_count--; mosq->out_packet_bytes -= mosq->out_packet->packet_length; - G_OUT_PACKET_COUNT_DEC(1); - G_OUT_PACKET_BYTES_DEC(mosq->out_packet->packet_length); + metrics__int_dec(mosq_gauge_out_packets, 1); + metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length); mosq->out_packet = mosq->out_packet->next; if(!mosq->out_packet){ @@ -283,7 +277,7 @@ int packet__write(struct mosquitto *mosq) while(packet->to_process > 0){ write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process); if(write_length > 0){ - G_BYTES_SENT_INC(write_length); + metrics__int_inc(mosq_counter_bytes_sent, write_length); packet->to_process -= (uint32_t)write_length; packet->pos += (uint32_t)write_length; }else{ @@ -309,17 +303,14 @@ int packet__write(struct mosquitto *mosq) } } - G_MSGS_SENT_INC(1); + metrics__int_inc(mosq_counter_messages_sent, 1); if(((packet->command)&0xF6) == CMD_PUBLISH){ - G_PUB_MSGS_SENT_INC(1); #ifndef WITH_BROKER callback__on_publish(mosq, packet->mid, 0, NULL); }else if(((packet->command)&0xF0) == CMD_DISCONNECT){ do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL); return MOSQ_ERR_SUCCESS; #endif - }else if(((packet->command)&0xF0) == CMD_PUBLISH){ - G_PUB_MSGS_SENT_INC(1); } next_packet = packet__get_next_out(mosq); @@ -390,7 +381,7 @@ int packet__read(struct mosquitto *mosq) if(read_length == 1){ mosq->in_packet.command = byte; #ifdef WITH_BROKER - G_BYTES_RECEIVED_INC(1); + metrics__int_inc(mosq_counter_bytes_received, 1); /* Clients must send CONNECT as their first command. */ if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){ return MOSQ_ERR_PROTOCOL; @@ -438,7 +429,7 @@ int packet__read(struct mosquitto *mosq) return MOSQ_ERR_MALFORMED_PACKET; } - G_BYTES_RECEIVED_INC(1); + metrics__int_inc(mosq_counter_bytes_received, 1); mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult; mosq->in_packet.remaining_mult *= 128; }else{ @@ -518,7 +509,7 @@ int packet__read(struct mosquitto *mosq) while(mosq->in_packet.to_process>0){ read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(read_length > 0){ - G_BYTES_RECEIVED_INC(read_length); + metrics__int_inc(mosq_counter_bytes_received, read_length); mosq->in_packet.to_process -= (uint32_t)read_length; mosq->in_packet.pos += (uint32_t)read_length; }else{ @@ -557,10 +548,7 @@ int packet__read(struct mosquitto *mosq) /* All data for this packet is read. */ mosq->in_packet.pos = 0; #ifdef WITH_BROKER - G_MSGS_RECEIVED_INC(1); - if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){ - G_PUB_MSGS_RECEIVED_INC(1); - } + metrics__int_inc(mosq_counter_messages_received, 1); #endif rc = handle__packet(mosq); diff --git a/lib/send_disconnect.c b/lib/send_disconnect.c index 7dc7acd8..6f291316 100644 --- a/lib/send_disconnect.c +++ b/lib/send_disconnect.c @@ -22,6 +22,7 @@ Contributors: #ifdef WITH_BROKER # include "mosquitto_broker_internal.h" +# include "sys_tree.h" #endif #include "mosquitto.h" diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 8c3bfc5b..9eeac172 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -24,6 +24,7 @@ Contributors: #ifdef WITH_BROKER # include "mosquitto_broker_internal.h" +# include "sys_tree.h" #endif #include "mosquitto.h" @@ -172,4 +173,3 @@ int send__simple_command(struct mosquitto *mosq, uint8_t command) return packet__queue(mosq, packet); } - diff --git a/lib/send_publish.c b/lib/send_publish.c index 960993fe..f11cfdeb 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -25,7 +25,7 @@ Contributors: # include "mosquitto_broker_internal.h" # include "sys_tree.h" #else -# define G_PUB_BYTES_SENT_INC(A) +# define metrics__int_inc(stat, val) #endif #include "alias_mosq.h" @@ -156,7 +156,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 mapped_topic = topic_temp; } log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, mapped_topic, (long)payloadlen); - G_PUB_BYTES_SENT_INC(payloadlen); + metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen); rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval); mosquitto__FREE(mapped_topic); return rc; @@ -166,7 +166,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 } #endif log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen); - G_PUB_BYTES_SENT_INC(payloadlen); + metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen); #else log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen); #endif @@ -301,6 +301,10 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, #endif } +#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) + metrics__int_inc(mosq_counter_mqtt_publish_sent, 1); +#endif + /* Payload */ if(payloadlen && payload){ packet__write_bytes(packet, payload, payloadlen); diff --git a/lib/send_subscribe.c b/lib/send_subscribe.c index c612da0a..004efd9b 100644 --- a/lib/send_subscribe.c +++ b/lib/send_subscribe.c @@ -93,4 +93,3 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *con return packet__queue(mosq, packet); } - diff --git a/man/mosquitto.8.xml b/man/mosquitto.8.xml index 2d96c9cf..9e25953b 100644 --- a/man/mosquitto.8.xml +++ b/man/mosquitto.8.xml @@ -419,9 +419,8 @@ - The total number of active and inactive clients - currently connected and registered on the - broker. + The total number of connected and disconnected client sessions + currently registered on the broker. @@ -437,7 +436,17 @@ - + + + + The total number of socket connections that have been + made to the broker, whether or not the MQTT connections + were ultimately successful. + + + + + The current size of the heap memory in use by mosquitto. Note that this topic may be unavailable @@ -445,7 +454,7 @@ - + The largest amount of heap memory used by mosquitto. Note that this topic may be unavailable @@ -550,13 +559,6 @@ or 15 minutes. - - - - The number of messages with QoS>0 that are awaiting - acknowledgments. - - @@ -569,6 +571,153 @@ The total number of messages of any type sent since the broker started. + + + + The total number of MQTT CONNECT messages received since the broker started. + + + + + + The total number of MQTT CONNACK messages sent since the broker started. + + + + + + + + The total number of MQTT PUBLISH messages that have been + dropped due to inflight/queuing limits. See the + max_inflight_messages and max_queued_messages options + in mosquitto.conf5 + for more information. + + + + + + + + The total number of MQTT PUBLISH messages received since the broker started. + + + + + + + The total number of MQTT PUBLISH messages sent since the broker started. + + + + + + The total number of MQTT PUBACK messages received since the broker started. + + + + + + The total number of MQTT PUBACK messages sent since the broker started. + + + + + + The total number of MQTT PUBREC messages received since the broker started. + + + + + + The total number of MQTT PUBREC messages sent since the broker started. + + + + + + The total number of MQTT PUBREL messages received since the broker started. + + + + + + The total number of MQTT PUBREL messages sent since the broker started. + + + + + + The total number of MQTT PUBCOMP messages received since the broker started. + + + + + + The total number of MQTT PUBCOMP messages sent since the broker started. + + + + + + The total number of MQTT SUBSCRIBE messages received since the broker started. + + + + + + The total number of MQTT SUBACK messages sent since the broker started. + + + + + + The total number of MQTT UNSUBSCRIBE messages received since the broker started. + + + + + + The total number of MQTT UNSUBACK messages sent since the broker started. + + + + + + The total number of MQTT PINGREQ messages received since the broker started. + + + + + + The total number of MQTT PINGRESP messages sent since the broker started. + + + + + + The total number of MQTT DISCONNECT messages received since the broker started. + + + + + + The total number of MQTT DISCONNECT messages sent since the broker started. + + + + + + The total number of MQTT AUTH messages received since the broker started. + + + + + + The total number of MQTT AUTH messages sent since the broker started. + + @@ -592,26 +741,15 @@ - - - The total number of publish messages that have been - dropped due to inflight/queuing limits. See the - max_inflight_messages and max_queued_messages options - in - mosquitto.conf5 - for more information. - - - - + - The total number of PUBLISH messages received since the broker started. + The total number of PUBLISH payload bytes received since the broker started. - + - The total number of PUBLISH messages sent since the broker started. + The total number of PUBLISH payload bytes sent since the broker started. @@ -637,6 +775,12 @@ and messages queued for durable clients. + + + + The total number of shared subscriptions active on the broker. + + diff --git a/src/bridge.c b/src/bridge.c index 092507eb..75013d7b 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -849,8 +849,8 @@ static void bridge__packet_cleanup(struct mosquitto *context) } context->out_packet = NULL; context->out_packet_last = NULL; - G_OUT_PACKET_COUNT_DEC(context->out_packet_count); - G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes); + metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count); + metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes); context->out_packet_count = 0; context->out_packet_bytes = 0; diff --git a/src/context.c b/src/context.c index d784d538..5d002431 100644 --- a/src/context.c +++ b/src/context.c @@ -165,8 +165,8 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->out_packet = context->out_packet->next; mosquitto__FREE(packet); } - G_OUT_PACKET_COUNT_DEC(context->out_packet_count); - G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes); + metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count); + metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes); context->out_packet_count = 0; context->out_packet_bytes = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) diff --git a/src/database.c b/src/database.c index e1a443f0..d3d77151 100644 --- a/src/database.c +++ b/src/database.c @@ -470,7 +470,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str "Outgoing messages are being dropped for client %s.", context->id); } - G_MSGS_DROPPED_INC(); + metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); context->stats.messages_dropped++; return 2; @@ -601,14 +601,14 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin "Outgoing messages are being dropped for client %s.", context->id); } - G_MSGS_DROPPED_INC(); + metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); return 2; } }else{ if (db__ready_for_queue(context, qos, msg_data)){ state = mosq_ms_queued; }else{ - G_MSGS_DROPPED_INC(); + metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1); if(context->is_dropping == false){ context->is_dropping = true; log__printf(NULL, MOSQ_LOG_NOTICE, diff --git a/src/handle_connect.c b/src/handle_connect.c index 49530472..04237fdf 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -593,8 +593,6 @@ int handle__connect(struct mosquitto *context) uint16_t auth_data_out_len = 0; bool allow_zero_length_clientid; - G_CONNECTION_COUNT_INC(); - if(!context->listener){ return MOSQ_ERR_INVAL; } diff --git a/src/handle_publish.c b/src/handle_publish.c index 739eec01..d1d40a40 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -215,7 +215,7 @@ int handle__publish(struct mosquitto *context) } base_msg->data.payloadlen = context->in_packet.remaining_length - context->in_packet.pos; - G_PUB_BYTES_RECEIVED_INC(base_msg->data.payloadlen); + metrics__int_inc(mosq_counter_pub_bytes_received, base_msg->data.payloadlen); if(context->listener && context->listener->mount_point){ len = strlen(context->listener->mount_point) + strlen(base_msg->data.topic) + 1; topic_mount = mosquitto__malloc(len+1); diff --git a/src/loop.c b/src/loop.c index 4e6c5a57..95afb994 100644 --- a/src/loop.c +++ b/src/loop.c @@ -194,7 +194,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens db.next_event_ms = 86400000; #ifdef WITH_SYS_TREE if(db.config->sys_interval > 0){ - sys_tree__update(); + sys_tree__update(false); } #endif diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 525d390f..c4b9fcad 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -731,7 +731,7 @@ int db__message_reconnect_reset(struct mosquitto *context); bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos); bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data); void sys_tree__init(void); -void sys_tree__update(void); +void sys_tree__update(bool force); int db__message_write_inflight_out_all(struct mosquitto *context); int db__message_write_inflight_out_latest(struct mosquitto *context); int db__message_write_queued_out(struct mosquitto *context); diff --git a/src/net.c b/src/net.c index 097fd7e9..cf4a93f5 100644 --- a/src/net.c +++ b/src/net.c @@ -147,7 +147,7 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock return NULL; } - G_SOCKET_CONNECTIONS_INC(); + metrics__int_inc(mosq_counter_socket_connections, 1); if(net__socket_nonblock(&new_sock)){ return NULL; diff --git a/src/read_handle.c b/src/read_handle.c index cd2001b9..82280c57 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -52,6 +52,7 @@ int handle__packet(struct mosquitto *context) rc = handle__pubackcomp(context, "PUBCOMP"); break; case CMD_PUBLISH: + metrics__int_inc(mosq_counter_mqtt_publish_received, 1); rc = handle__publish(context); break; case CMD_PUBREC: @@ -61,6 +62,7 @@ int handle__packet(struct mosquitto *context) rc = handle__pubrel(context); break; case CMD_CONNECT: + metrics__int_inc(mosq_counter_mqtt_connect_received, 1); return handle__connect(context); case CMD_DISCONNECT: rc = handle__disconnect(context); diff --git a/src/send_auth.c b/src/send_auth.c index 620210d3..bec5814a 100644 --- a/src/send_auth.c +++ b/src/send_auth.c @@ -23,6 +23,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "sys_tree.h" #include "util_mosq.h" int send__auth(struct mosquitto *context, uint8_t reason_code, const void *auth_data, uint16_t auth_data_len) diff --git a/src/send_connack.c b/src/send_connack.c index e6e482fd..d3a48a16 100644 --- a/src/send_connack.c +++ b/src/send_connack.c @@ -23,6 +23,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "sys_tree.h" #include "util_mosq.h" int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, const mosquitto_property *properties) diff --git a/src/send_suback.c b/src/send_suback.c index c68c4c57..213a901d 100644 --- a/src/send_suback.c +++ b/src/send_suback.c @@ -23,6 +23,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "sys_tree.h" #include "util_mosq.h" diff --git a/src/send_unsuback.c b/src/send_unsuback.c index 2e60d2e9..c8365eca 100644 --- a/src/send_unsuback.c +++ b/src/send_unsuback.c @@ -25,6 +25,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "sys_tree.h" int send__unsuback(struct mosquitto *mosq, uint16_t mid, int reason_code_count, uint8_t *reason_codes, const mosquitto_property *properties) diff --git a/src/session_expiry.c b/src/session_expiry.c index 73faab32..138a0b2b 100644 --- a/src/session_expiry.c +++ b/src/session_expiry.c @@ -175,7 +175,7 @@ void session_expiry__check(void) if(context->id){ log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring client %s due to timeout.", context->id); } - G_CLIENTS_EXPIRED_INC(); + metrics__int_inc(mosq_counter_clients_expired, 1); /* Session has now expired, so clear interval */ context->session_expiry_interval = 0; diff --git a/src/sys_tree.c b/src/sys_tree.c index 560151bb..72d6dbe4 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -34,25 +34,90 @@ Contributors: #define SYS_TREE_QOS 2 -uint64_t g_bytes_received = 0; -uint64_t g_bytes_sent = 0; -uint64_t g_pub_bytes_received = 0; -uint64_t g_pub_bytes_sent = 0; -int64_t g_out_packet_bytes = 0; -unsigned long g_msgs_received = 0; -unsigned long g_msgs_sent = 0; -unsigned long g_pub_msgs_received = 0; -unsigned long g_pub_msgs_sent = 0; -unsigned long g_msgs_dropped = 0; -long g_out_packet_count = 0; -unsigned int g_clients_expired = 0; -unsigned int g_socket_connections = 0; -unsigned int g_connection_count = 0; +#define METRIC_LOAD_1MIN 1 +#define METRIC_LOAD_5MIN 2 +#define METRIC_LOAD_15MIN 3 + +struct metric{ + int64_t current; + int64_t next; + const char *topic, *topic_alias; + bool is_max; +}; + +struct metric_load{ + double current; + const char *topic; + int load_ref; + uint8_t interval; +}; + +struct metric metrics[mosq_metric_max] = { + { 1, 0, "$SYS/broker/clients/total", NULL, false }, /* mosq_gauge_clients_total */ + { 1, 0, "$SYS/broker/clients/maximum", NULL, true }, /* metric_clients_maximum */ + { 1, 0, "$SYS/broker/clients/disconnected", "$SYS/broker/clients/inactive", false }, /* mosq_gauge_clients_disconnected */ + { 1, 0, "$SYS/broker/clients/connected", "$SYS/broker/clients/active", false }, /* mosq_gauge_clients_connected */ + { 1, 0, "$SYS/broker/clients/expired", NULL, false }, /* mosq_counter_clients_expired */ + { 1, 0, "$SYS/broker/messages/stored", "$SYS/broker/store/messages/count", false }, /* mosq_gauge_message_store_count */ + { 1, 0, "$SYS/broker/store/messages/bytes", NULL, false }, /* mosq_gauge_message_store_bytes */ + { 1, 0, "$SYS/broker/subscriptions/count", NULL, false }, /* mosq_gauge_subscription_count */ + { 1, 0, "$SYS/broker/shared_subscriptions/count", NULL, false }, /* mosq_gauge_shared_subscription_count */ + { 1, 0, "$SYS/broker/retained messages/count", NULL, false }, /* mosq_gauge_retained_message_count */ +#ifdef REAL_WITH_MEMORY_TRACKING + { 1, 0, "$SYS/broker/heap/current", NULL, false }, /* mosq_gauge_heap_current */ + { 1, 0, "$SYS/broker/heap/maximum", NULL, true }, /* mosq_gauge_heap_maximum */ +#else + { 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_current */ + { 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_maximum */ +#endif + { 1, 0, "$SYS/broker/messages/received", NULL, false }, /* mosq_counter_messages_received */ + { 1, 0, "$SYS/broker/messages/sent", NULL, false }, /* mosq_counter_messages_sent */ + { 1, 0, "$SYS/broker/bytes/received", NULL, false }, /* mosq_counter_bytes_received */ + { 1, 0, "$SYS/broker/bytes/sent", NULL, false }, /* mosq_counter_bytes_sent */ + { 1, 0, "$SYS/broker/publish/bytes/received", NULL, false }, /* mosq_counter_pub_bytes_received */ + { 1, 0, "$SYS/broker/publish/bytes/sent", NULL, false }, /* mosq_counter_pub_bytes_sent */ + { 1, 0, "$SYS/broker/packet/out/count", NULL, false }, /* mosq_gauge_out_packet_count */ + { 1, 0, "$SYS/broker/packet/out/bytes", NULL, false }, /* mosq_gauge_out_packet_bytes */ + { 1, 0, "$SYS/broker/connections/socket/count", NULL, false }, /* mosq_counter_socket_connections */ + { 1, 0, NULL, NULL, false }, /* mosq_counter_mqtt_connect_received */ + { 1, 0, "$SYS/broker/publish/messages/dropped", NULL, false }, /* mosq_counter_mqtt_publish_dropped */ + { 1, 0, "$SYS/broker/publish/messages/received", NULL, false }, /* mosq_counter_mqtt_publish_received */ + { 1, 0, "$SYS/broker/publish/messages/sent", NULL, false }, /* mosq_counter_mqtt_publish_sent */ +}; + +struct metric_load metric_loads[mosq_metric_load_max] = { + { 0.0, "$SYS/broker/load/messages/received/1min", mosq_counter_messages_received, METRIC_LOAD_1MIN }, /* metric_load_messages_received_1min */ + { 0.0, "$SYS/broker/load/messages/received/5min", mosq_counter_messages_received, METRIC_LOAD_5MIN }, /* metric_load_messages_received_5min */ + { 0.0, "$SYS/broker/load/messages/received/15min", mosq_counter_messages_received, METRIC_LOAD_15MIN }, /* metric_load_messages_received_15min */ + { 0.0, "$SYS/broker/load/messages/sent/1min", mosq_counter_messages_sent, METRIC_LOAD_1MIN }, /* metric_load_messages_sent_1min */ + { 0.0, "$SYS/broker/load/messages/sent/5min", mosq_counter_messages_sent, METRIC_LOAD_5MIN }, /* metric_load_messages_sent_5min */ + { 0.0, "$SYS/broker/load/messages/sent/15min", mosq_counter_messages_sent, METRIC_LOAD_15MIN }, /* metric_load_messages_sent_15min */ + { 0.0, "$SYS/broker/load/publish/dropped/1min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_dropped_1min */ + { 0.0, "$SYS/broker/load/publish/dropped/5min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_dropped_5min */ + { 0.0, "$SYS/broker/load/publish/dropped/15min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_dropped_15min */ + { 0.0, "$SYS/broker/load/publish/received/1min", mosq_counter_mqtt_publish_received, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_received_1min */ + { 0.0, "$SYS/broker/load/publish/received/5min", mosq_counter_mqtt_publish_received, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_received_5min */ + { 0.0, "$SYS/broker/load/publish/received/15min", mosq_counter_mqtt_publish_received, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_received_15min */ + { 0.0, "$SYS/broker/load/publish/sent/1min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_sent_1min */ + { 0.0, "$SYS/broker/load/publish/sent/5min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_sent_5min */ + { 0.0, "$SYS/broker/load/publish/sent/15min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_sent_15min */ + { 0.0, "$SYS/broker/load/bytes/received/1min", mosq_counter_bytes_received, METRIC_LOAD_1MIN }, /* metric_load_bytes_received_1min */ + { 0.0, "$SYS/broker/load/bytes/received/5min", mosq_counter_bytes_received, METRIC_LOAD_5MIN }, /* metric_load_bytes_received_5min */ + { 0.0, "$SYS/broker/load/bytes/received/15min", mosq_counter_bytes_received, METRIC_LOAD_15MIN }, /* metric_load_bytes_received_15min */ + { 0.0, "$SYS/broker/load/bytes/sent/1min", mosq_counter_bytes_sent, METRIC_LOAD_1MIN }, /* metric_load_bytes_sent_1min */ + { 0.0, "$SYS/broker/load/bytes/sent/5min", mosq_counter_bytes_sent, METRIC_LOAD_5MIN }, /* metric_load_bytes_sent_5min */ + { 0.0, "$SYS/broker/load/bytes/sent/15min", mosq_counter_bytes_sent, METRIC_LOAD_15MIN }, /* metric_load_bytes_sent_15min */ + { 0.0, "$SYS/broker/load/sockets/1min", mosq_counter_socket_connections, METRIC_LOAD_1MIN }, /* metric_load_socket_connections_1min */ + { 0.0, "$SYS/broker/load/sockets/5min", mosq_counter_socket_connections, METRIC_LOAD_5MIN }, /* metric_load_socket_connections_5min */ + { 0.0, "$SYS/broker/load/sockets/15min", mosq_counter_socket_connections, METRIC_LOAD_15MIN }, /* metric_load_socket_connections_15min */ + { 0.0, "$SYS/broker/load/connections/1min", mosq_counter_mqtt_connect_received, METRIC_LOAD_1MIN }, /* metric_load_connections_1min */ + { 0.0, "$SYS/broker/load/connections/5min", mosq_counter_mqtt_connect_received, METRIC_LOAD_5MIN }, /* metric_load_connections_5min */ + { 0.0, "$SYS/broker/load/connections/15min", mosq_counter_mqtt_connect_received, METRIC_LOAD_15MIN }, /* metric_load_connections_15min */ +}; static time_t start_time = 0; static time_t last_update = 0; - void sys_tree__init(void) { char buf[64]; @@ -68,156 +133,50 @@ void sys_tree__init(void) start_time = mosquitto_time(); last_update = start_time; + + sys_tree__update(true); } -static void sys_tree__update_clients(char *buf) +void metrics__int_inc(enum mosq_metric_type m, int64_t value) { - static unsigned int client_count = UINT_MAX; - static unsigned int clients_expired = UINT_MAX; - static unsigned int client_max = 0; - static unsigned int disconnected_count = UINT_MAX; - static unsigned int connected_count = UINT_MAX; - uint32_t len; - - unsigned int count_total, count_by_sock; - - count_total = HASH_CNT(hh_id, db.contexts_by_id); - count_by_sock = HASH_CNT(hh_sock, db.contexts_by_sock); - - if(client_count != count_total){ - client_count = count_total; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_count); - db__messages_easy_queue(NULL, "$SYS/broker/clients/total", SYS_TREE_QOS, len, buf, 1, 0, NULL); - - if(client_count > client_max){ - client_max = client_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_max); - db__messages_easy_queue(NULL, "$SYS/broker/clients/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - } - - if(disconnected_count != count_total-count_by_sock){ - disconnected_count = count_total-count_by_sock; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", disconnected_count); - db__messages_easy_queue(NULL, "$SYS/broker/clients/inactive", SYS_TREE_QOS, len, buf, 1, 0, NULL); - db__messages_easy_queue(NULL, "$SYS/broker/clients/disconnected", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - if(connected_count != count_by_sock){ - connected_count = count_by_sock; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", connected_count); - db__messages_easy_queue(NULL, "$SYS/broker/clients/active", SYS_TREE_QOS, len, buf, 1, 0, NULL); - db__messages_easy_queue(NULL, "$SYS/broker/clients/connected", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - if(g_clients_expired != clients_expired){ - clients_expired = g_clients_expired; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", clients_expired); - db__messages_easy_queue(NULL, "$SYS/broker/clients/expired", SYS_TREE_QOS, len, buf, 1, 0, NULL); + if(m < mosq_metric_max){ + metrics[m].next += value; } } -#ifdef REAL_WITH_MEMORY_TRACKING -static void sys_tree__update_memory(char *buf) +void metrics__int_dec(enum mosq_metric_type m, int64_t value) { - static unsigned long current_heap = ULONG_MAX; - static unsigned long max_heap = ULONG_MAX; - unsigned long value_ul; - uint32_t len; - - value_ul = mosquitto__memory_used(); - if(current_heap != value_ul){ - current_heap = value_ul; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", current_heap); - db__messages_easy_queue(NULL, "$SYS/broker/heap/current", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - value_ul =mosquitto__max_memory_used(); - if(max_heap != value_ul){ - max_heap = value_ul; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", max_heap); - db__messages_easy_queue(NULL, "$SYS/broker/heap/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL); + if(m < mosq_metric_max){ + metrics[m].next -= value; } } -#endif -static void calc_load(char *buf, const char *topic, double exponent, double interval, double *current) +static void calc_load(char *buf, double exponent, double i_mult, struct metric_load *m) { double new_value; uint32_t len; + double interval; - new_value = interval + exponent*((*current) - interval); - if(fabs(new_value - (*current)) >= 0.01){ + interval = (double)(metrics[m->load_ref].next - metrics[m->load_ref].current)*i_mult; + new_value = interval + exponent*(m->current - interval); + if(fabs(new_value - (m->current)) >= 0.01){ len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value); - db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 0, NULL); + db__messages_easy_queue(NULL, m->topic, SYS_TREE_QOS, len, buf, 1, 0, NULL); } - (*current) = new_value; + m->current = new_value; } + /* Send messages for the $SYS hierarchy if the last update is longer than * 'interval' seconds ago. * 'interval' is the amount of seconds between updates. If 0, then no periodic * messages are sent for the $SYS hierarchy. * 'start_time' is the result of time() that the broker was started at. */ -void sys_tree__update(void) +void sys_tree__update(bool force) { time_t uptime; char buf[BUFLEN]; - - static int msg_store_count = INT_MAX; - static unsigned long msg_store_bytes = ULONG_MAX; - static unsigned long msgs_received = ULONG_MAX; - static unsigned long msgs_sent = ULONG_MAX; - static unsigned long publish_dropped = ULONG_MAX; - static unsigned long pub_msgs_received = ULONG_MAX; - static unsigned long pub_msgs_sent = ULONG_MAX; - static unsigned long long bytes_received = ULLONG_MAX; - static unsigned long long bytes_sent = ULLONG_MAX; - static unsigned long long pub_bytes_received = ULLONG_MAX; - static unsigned long long pub_bytes_sent = ULLONG_MAX; - static int subscription_count = INT_MAX; - static int shared_subscription_count = INT_MAX; - static int retained_count = INT_MAX; - static long out_packet_count = LONG_MAX; - static long long out_packet_bytes = LLONG_MAX; - - static double msgs_received_load1 = 0; - static double msgs_received_load5 = 0; - static double msgs_received_load15 = 0; - static double msgs_sent_load1 = 0; - static double msgs_sent_load5 = 0; - static double msgs_sent_load15 = 0; - static double publish_dropped_load1 = 0; - static double publish_dropped_load5 = 0; - static double publish_dropped_load15 = 0; - double msgs_received_interval, msgs_sent_interval, publish_dropped_interval; - - static double publish_received_load1 = 0; - static double publish_received_load5 = 0; - static double publish_received_load15 = 0; - static double publish_sent_load1 = 0; - static double publish_sent_load5 = 0; - static double publish_sent_load15 = 0; - double publish_received_interval, publish_sent_interval; - - static double bytes_received_load1 = 0; - static double bytes_received_load5 = 0; - static double bytes_received_load15 = 0; - static double bytes_sent_load1 = 0; - static double bytes_sent_load5 = 0; - static double bytes_sent_load15 = 0; - double bytes_received_interval, bytes_sent_interval; - - static double socket_load1 = 0; - static double socket_load5 = 0; - static double socket_load15 = 0; - double socket_interval; - - static double connection_load1 = 0; - static double connection_load5 = 0; - static double connection_load15 = 0; - double connection_interval; - - double exponent; - double i_mult; uint32_t len; time_t next_event; static time_t last_update_real = 0; @@ -231,170 +190,59 @@ void sys_tree__update(void) } if(db.config->sys_interval - && db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s){ + && ((db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s) || force)){ uptime = db.now_s - start_time; len = (uint32_t)snprintf(buf, BUFLEN, "%" PRIu64 " seconds", (uint64_t)uptime); db__messages_easy_queue(NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 0, NULL); - sys_tree__update_clients(buf); - if(db.now_s > last_update){ - i_mult = 60.0/(double)(db.now_s-last_update); - - msgs_received_interval = (double)(g_msgs_received - msgs_received)*i_mult; - msgs_sent_interval = (double)(g_msgs_sent - msgs_sent)*i_mult; - publish_dropped_interval = (double)(g_msgs_dropped - publish_dropped)*i_mult; - - publish_received_interval = (double)(g_pub_msgs_received - pub_msgs_received)*i_mult; - publish_sent_interval = (double)(g_pub_msgs_sent - pub_msgs_sent)*i_mult; - - bytes_received_interval = (double)(g_bytes_received - bytes_received)*i_mult; - bytes_sent_interval = (double)(g_bytes_sent - bytes_sent)*i_mult; - - socket_interval = g_socket_connections*i_mult; - g_socket_connections = 0; - connection_interval = g_connection_count*i_mult; - g_connection_count = 0; - - /* 1 minute load */ - exponent = exp(-1.0*(double)(db.now_s-last_update)/60.0); - - calc_load(buf, "$SYS/broker/load/messages/received/1min", exponent, msgs_received_interval, &msgs_received_load1); - calc_load(buf, "$SYS/broker/load/messages/sent/1min", exponent, msgs_sent_interval, &msgs_sent_load1); - calc_load(buf, "$SYS/broker/load/publish/dropped/1min", exponent, publish_dropped_interval, &publish_dropped_load1); - calc_load(buf, "$SYS/broker/load/publish/received/1min", exponent, publish_received_interval, &publish_received_load1); - calc_load(buf, "$SYS/broker/load/publish/sent/1min", exponent, publish_sent_interval, &publish_sent_load1); - calc_load(buf, "$SYS/broker/load/bytes/received/1min", exponent, bytes_received_interval, &bytes_received_load1); - calc_load(buf, "$SYS/broker/load/bytes/sent/1min", exponent, bytes_sent_interval, &bytes_sent_load1); - calc_load(buf, "$SYS/broker/load/sockets/1min", exponent, socket_interval, &socket_load1); - calc_load(buf, "$SYS/broker/load/connections/1min", exponent, connection_interval, &connection_load1); - - /* 5 minute load */ - exponent = exp(-1.0*(double)(db.now_s-last_update)/300.0); - - calc_load(buf, "$SYS/broker/load/messages/received/5min", exponent, msgs_received_interval, &msgs_received_load5); - calc_load(buf, "$SYS/broker/load/messages/sent/5min", exponent, msgs_sent_interval, &msgs_sent_load5); - calc_load(buf, "$SYS/broker/load/publish/dropped/5min", exponent, publish_dropped_interval, &publish_dropped_load5); - calc_load(buf, "$SYS/broker/load/publish/received/5min", exponent, publish_received_interval, &publish_received_load5); - calc_load(buf, "$SYS/broker/load/publish/sent/5min", exponent, publish_sent_interval, &publish_sent_load5); - calc_load(buf, "$SYS/broker/load/bytes/received/5min", exponent, bytes_received_interval, &bytes_received_load5); - calc_load(buf, "$SYS/broker/load/bytes/sent/5min", exponent, bytes_sent_interval, &bytes_sent_load5); - calc_load(buf, "$SYS/broker/load/sockets/5min", exponent, socket_interval, &socket_load5); - calc_load(buf, "$SYS/broker/load/connections/5min", exponent, connection_interval, &connection_load5); - - /* 15 minute load */ - exponent = exp(-1.0*(double)(db.now_s-last_update)/900.0); - - calc_load(buf, "$SYS/broker/load/messages/received/15min", exponent, msgs_received_interval, &msgs_received_load15); - calc_load(buf, "$SYS/broker/load/messages/sent/15min", exponent, msgs_sent_interval, &msgs_sent_load15); - calc_load(buf, "$SYS/broker/load/publish/dropped/15min", exponent, publish_dropped_interval, &publish_dropped_load15); - calc_load(buf, "$SYS/broker/load/publish/received/15min", exponent, publish_received_interval, &publish_received_load15); - calc_load(buf, "$SYS/broker/load/publish/sent/15min", exponent, publish_sent_interval, &publish_sent_load15); - calc_load(buf, "$SYS/broker/load/bytes/received/15min", exponent, bytes_received_interval, &bytes_received_load15); - calc_load(buf, "$SYS/broker/load/bytes/sent/15min", exponent, bytes_sent_interval, &bytes_sent_load15); - calc_load(buf, "$SYS/broker/load/sockets/15min", exponent, socket_interval, &socket_load15); - calc_load(buf, "$SYS/broker/load/connections/15min", exponent, connection_interval, &connection_load15); - } - - if(db.msg_store_count != msg_store_count){ - msg_store_count = db.msg_store_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", msg_store_count); - db__messages_easy_queue(NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, len, buf, 1, 0, NULL); - db__messages_easy_queue(NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if (db.msg_store_bytes != msg_store_bytes){ - msg_store_bytes = db.msg_store_bytes; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msg_store_bytes); - db__messages_easy_queue(NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(db.subscription_count != subscription_count){ - subscription_count = db.subscription_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", subscription_count); - db__messages_easy_queue(NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(db.shared_subscription_count != shared_subscription_count){ - shared_subscription_count = db.shared_subscription_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", shared_subscription_count); - db__messages_easy_queue(NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(db.retained_count != retained_count){ - retained_count = db.retained_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%d", retained_count); - db__messages_easy_queue(NULL, "$SYS/broker/retained messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - + /* Update metrics values where not otherwise updated */ + metrics[mosq_gauge_message_store_count].next = db.msg_store_count; + metrics[mosq_gauge_message_store_bytes].next = (int64_t)db.msg_store_bytes; + metrics[mosq_gauge_subscriptions].next = db.subscription_count; + metrics[mosq_gauge_shared_subscriptions].next = db.shared_subscription_count; + metrics[mosq_gauge_retained_messages].next = db.retained_count; #ifdef REAL_WITH_MEMORY_TRACKING - sys_tree__update_memory(buf); + metrics[mosq_gauge_heap_current].next = (int64_t)mosquitto__memory_used(); + metrics[mosq_counter_heap_maximum].next = (int64_t)mosquitto__max_memory_used(); #endif + metrics[mosq_gauge_clients_total].next = HASH_CNT(hh_id, db.contexts_by_id); + metrics[mosq_counter_clients_maximum].next = HASH_CNT(hh_id, db.contexts_by_id); + metrics[mosq_gauge_clients_connected].next = HASH_CNT(hh_sock, db.contexts_by_sock); + metrics[mosq_gauge_clients_disconnected].next = HASH_CNT(hh_id, db.contexts_by_id) - HASH_CNT(hh_sock, db.contexts_by_sock); - if(msgs_received != g_msgs_received){ - msgs_received = g_msgs_received; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_received); - db__messages_easy_queue(NULL, "$SYS/broker/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(msgs_sent != g_msgs_sent){ - msgs_sent = g_msgs_sent; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_sent); - db__messages_easy_queue(NULL, "$SYS/broker/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(publish_dropped != g_msgs_dropped){ - publish_dropped = g_msgs_dropped; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", publish_dropped); - db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/dropped", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(pub_msgs_received != g_pub_msgs_received){ - pub_msgs_received = g_pub_msgs_received; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_received); - db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(pub_msgs_sent != g_pub_msgs_sent){ - pub_msgs_sent = g_pub_msgs_sent; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_sent); - db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(bytes_received != g_bytes_received){ - bytes_received = g_bytes_received; - len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_received); - db__messages_easy_queue(NULL, "$SYS/broker/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(bytes_sent != g_bytes_sent){ - bytes_sent = g_bytes_sent; - len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_sent); - db__messages_easy_queue(NULL, "$SYS/broker/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(pub_bytes_received != g_pub_bytes_received){ - pub_bytes_received = g_pub_bytes_received; - len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_received); - db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(pub_bytes_sent != g_pub_bytes_sent){ - pub_bytes_sent = g_pub_bytes_sent; - len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_sent); - db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(out_packet_count != g_out_packet_count){ - out_packet_count = g_out_packet_count; - len = (uint32_t)snprintf(buf, BUFLEN, "%lu", out_packet_count); - db__messages_easy_queue(NULL, "$SYS/broker/packet/out/count", SYS_TREE_QOS, len, buf, 1, 0, NULL); - } - - if(out_packet_bytes != g_out_packet_bytes){ - out_packet_bytes = g_out_packet_bytes; - len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_bytes); - db__messages_easy_queue(NULL, "$SYS/broker/packet/out/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL); + /* Handle loads first, because they reference other metrics and need next != current */ + if(db.now_s > last_update){ + double i_mult = 60.0/(double)(db.now_s-last_update); + + double exponent_1min = exp(-1.0*(double)(db.now_s-last_update)/60.0); + double exponent_5min = exp(-1.0*(double)(db.now_s-last_update)/300.0); + double exponent_15min = exp(-1.0*(double)(db.now_s-last_update)/900.0); + + for(int i=0; i metrics[i].current) || + (!metrics[i].is_max && metrics[i].next != metrics[i].current)){ + + metrics[i].current = metrics[i].next; + len = (uint32_t)snprintf(buf, BUFLEN, "%lu", metrics[i].current); + if(metrics[i].topic){ + db__messages_easy_queue(NULL, metrics[i].topic, SYS_TREE_QOS, len, buf, 1, 0, NULL); + } + if(metrics[i].topic_alias){ + db__messages_easy_queue(NULL, metrics[i].topic_alias, SYS_TREE_QOS, len, buf, 1, 0, NULL); + } + } } last_update = db.now_s; diff --git a/src/sys_tree.h b/src/sys_tree.h index ed5e7a21..de5ec5f2 100644 --- a/src/sys_tree.h +++ b/src/sys_tree.h @@ -20,56 +20,77 @@ Contributors: #define SYS_TREE_H #if defined(WITH_SYS_TREE) && defined(WITH_BROKER) -extern uint64_t g_bytes_received; -extern uint64_t g_bytes_sent; -extern uint64_t g_pub_bytes_received; -extern uint64_t g_pub_bytes_sent; -extern int64_t g_out_packet_bytes; -extern unsigned long g_msgs_received; -extern unsigned long g_msgs_sent; -extern unsigned long g_pub_msgs_received; -extern unsigned long g_pub_msgs_sent; -extern unsigned long g_msgs_dropped; -extern long g_out_packet_count; -extern unsigned int g_clients_expired; -extern unsigned int g_socket_connections; -extern unsigned int g_connection_count; -#define G_BYTES_RECEIVED_INC(A) (g_bytes_received+=(uint64_t)(A)) -#define G_BYTES_SENT_INC(A) (g_bytes_sent+=(uint64_t)(A)) -#define G_PUB_BYTES_RECEIVED_INC(A) (g_pub_bytes_received+=(A)) -#define G_PUB_BYTES_SENT_INC(A) (g_pub_bytes_sent+=(A)) -#define G_MSGS_RECEIVED_INC(A) (g_msgs_received+=(A)) -#define G_MSGS_SENT_INC(A) (g_msgs_sent+=(A)) -#define G_PUB_MSGS_RECEIVED_INC(A) (g_pub_msgs_received+=(A)) -#define G_PUB_MSGS_SENT_INC(A) (g_pub_msgs_sent+=(A)) -#define G_MSGS_DROPPED_INC() (g_msgs_dropped++) -#define G_CLIENTS_EXPIRED_INC() (g_clients_expired++) -#define G_SOCKET_CONNECTIONS_INC() (g_socket_connections++) -#define G_CONNECTION_COUNT_INC() (g_connection_count++) -#define G_OUT_PACKET_COUNT_INC(A) (g_out_packet_count+=(A)) -#define G_OUT_PACKET_COUNT_DEC(A) (g_out_packet_count-=(A)) -#define G_OUT_PACKET_BYTES_INC(A) (g_out_packet_bytes+=(A)) -#define G_OUT_PACKET_BYTES_DEC(A) (g_out_packet_bytes-=(A)) +/* This ordering *must* match the metrics array in sys_tree.c. */ +enum mosq_metric_type{ + mosq_gauge_clients_total = 0, + mosq_counter_clients_maximum = 1, + mosq_gauge_clients_disconnected = 2, + mosq_gauge_clients_connected = 3, + mosq_counter_clients_expired = 4, + mosq_gauge_message_store_count = 5, + mosq_gauge_message_store_bytes = 6, + mosq_gauge_subscriptions = 7, + mosq_gauge_shared_subscriptions = 8, + mosq_gauge_retained_messages = 9, + mosq_gauge_heap_current = 10, + mosq_counter_heap_maximum = 11, + mosq_counter_messages_received = 12, + mosq_counter_messages_sent = 13, + mosq_counter_bytes_received = 14, + mosq_counter_bytes_sent = 15, + mosq_counter_pub_bytes_received = 16, + mosq_counter_pub_bytes_sent = 17, + mosq_gauge_out_packets = 18, + mosq_gauge_out_packet_bytes = 19, + mosq_counter_socket_connections = 20, + mosq_counter_mqtt_connect_received = 21, + mosq_counter_mqtt_publish_dropped = 22, + mosq_counter_mqtt_publish_received = 23, + mosq_counter_mqtt_publish_sent = 24, -#else + mosq_metric_max, +}; + +/* This ordering *must* match the metrics_load array in sys_tree.c. */ +enum mosq_metric_load_type{ + mosq_load_messages_received_1min = 0, + mosq_load_messages_received_5min = 1, + mosq_load_messages_received_15min = 2, + mosq_load_messages_sent_1min = 3, + mosq_load_messages_sent_5min = 4, + mosq_load_messages_sent_15min = 5, + mosq_load_pub_messages_dropped_1min = 6, + mosq_load_pub_messages_dropped_5min = 7, + mosq_load_pub_messages_dropped_15min = 8, + mosq_load_pub_messages_received_1min = 9, + mosq_load_pub_messages_received_5min = 10, + mosq_load_pub_messages_received_15min = 11, + mosq_load_pub_messages_sent_1min = 12, + mosq_load_pub_messages_sent_5min = 13, + mosq_load_pub_messages_sent_15min = 14, + mosq_load_bytes_received_1min = 15, + mosq_load_bytes_received_5min = 16, + mosq_load_bytes_received_15min = 17, + mosq_load_bytes_sent_1min = 18, + mosq_load_bytes_sent_5min = 19, + mosq_load_bytes_sent_15min = 20, + mosq_load_sockets_1min = 21, + mosq_load_sockets_5min = 22, + mosq_load_sockets_15min = 23, + mosq_load_connections_1min = 24, + mosq_load_connections_5min = 25, + mosq_load_connections_15min = 26, + + mosq_metric_load_max, +}; -#define G_BYTES_RECEIVED_INC(A) -#define G_BYTES_SENT_INC(A) -#define G_PUB_BYTES_RECEIVED_INC(A) -#define G_PUB_BYTES_SENT_INC(A) -#define G_MSGS_RECEIVED_INC(A) -#define G_MSGS_SENT_INC(A) -#define G_PUB_MSGS_RECEIVED_INC(A) -#define G_PUB_MSGS_SENT_INC(A) -#define G_MSGS_DROPPED_INC() -#define G_CLIENTS_EXPIRED_INC() -#define G_SOCKET_CONNECTIONS_INC() -#define G_CONNECTION_COUNT_INC() -#define G_OUT_PACKET_COUNT_INC(A) -#define G_OUT_PACKET_COUNT_DEC(A) -#define G_OUT_PACKET_BYTES_INC(A) -#define G_OUT_PACKET_BYTES_DEC(A) +void metrics__int_inc(enum mosq_metric_type m, int64_t value); +void metrics__int_dec(enum mosq_metric_type m, int64_t value); + +#else +# define metrics__int_inc(A, B) +# define metrics__int_dec(A, B) #endif diff --git a/src/websockets.c b/src/websockets.c index 20e3f345..5558fec5 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -236,7 +236,7 @@ static int callback_mqtt( } ucount = (unsigned int)count; #ifdef WITH_SYS_TREE - g_bytes_sent += ucount; + metrics__int_inc(mosq_counter_bytes_sent, ucount); #endif packet->to_process -= ucount; packet->pos += ucount; @@ -251,9 +251,9 @@ static int callback_mqtt( } #ifdef WITH_SYS_TREE - g_msgs_sent++; + metrics__int_inc(mosq_counter_messages_sent, 1); if(((packet->command)&0xF0) == CMD_PUBLISH){ - g_pub_msgs_sent++; + metrics__int_inc(mosq_counter_mqtt_publish_sent, 1); } #endif @@ -280,7 +280,7 @@ static int callback_mqtt( mosq = u->mosq; pos = 0; buf = (uint8_t *)in; - G_BYTES_RECEIVED_INC(len); + metrics__int_inc(mosq_counter_bytes_received, (int64_t)len); while(pos < len){ if(!mosq->in_packet.command){ mosq->in_packet.command = buf[pos]; @@ -336,10 +336,7 @@ static int callback_mqtt( mosq->in_packet.pos = 0; #ifdef WITH_SYS_TREE - G_MSGS_RECEIVED_INC(1); - if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){ - G_PUB_MSGS_RECEIVED_INC(1); - } + metrics__int_inc(mosq_counter_messages_received, 1); #endif rc = handle__packet(mosq); diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c index 07237e3c..3a2dff84 100644 --- a/test/unit/persist_write_stubs.c +++ b/test/unit/persist_write_stubs.c @@ -7,21 +7,7 @@ #include #include #include - -uint64_t g_bytes_received; -uint64_t g_bytes_sent; -uint64_t g_pub_bytes_received; -uint64_t g_pub_bytes_sent; -int64_t g_out_packet_bytes; -unsigned long g_msgs_received; -unsigned long g_msgs_sent; -unsigned long g_pub_msgs_received; -unsigned long g_pub_msgs_sent; -unsigned long g_msgs_dropped; -long g_out_packet_count; -unsigned int g_clients_expired; -unsigned int g_socket_connections; -unsigned int g_connection_count; +#include extern uint64_t last_retained; extern char *last_sub; @@ -292,3 +278,12 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt UNUSED(properties); return 0; } + +void metrics__int_inc(enum mosq_metric_type m, int64_t value) +{ + UNUSED(m); UNUSED(value); +} +void metrics__int_dec(enum mosq_metric_type m, int64_t value) +{ + UNUSED(m); UNUSED(value); +} diff --git a/test/unit/subs_stubs.c b/test/unit/subs_stubs.c index 1a274b72..a3280cc5 100644 --- a/test/unit/subs_stubs.c +++ b/test/unit/subs_stubs.c @@ -9,21 +9,7 @@ #include #include #include - -uint64_t g_bytes_received; -uint64_t g_bytes_sent; -uint64_t g_pub_bytes_received; -uint64_t g_pub_bytes_sent; -int64_t g_out_packet_bytes; -unsigned long g_msgs_received; -unsigned long g_msgs_sent; -unsigned long g_pub_msgs_received; -unsigned long g_pub_msgs_sent; -unsigned long g_msgs_dropped; -long g_out_packet_count; -unsigned int g_clients_expired; -unsigned int g_socket_connections; -unsigned int g_connection_count; +#include int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt, ...) { @@ -233,3 +219,12 @@ int session_expiry__add_from_persistence(struct mosquitto *context, time_t expir UNUSED(expiry_time); return 0; } + +void metrics__int_inc(enum mosq_metric_type m, int64_t value) +{ + UNUSED(m); UNUSED(value); +} +void metrics__int_dec(enum mosq_metric_type m, int64_t value) +{ + UNUSED(m); UNUSED(value); +}