diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c
index fa044941..720e1ee3 100644
--- a/lib/packet_mosq.c
+++ b/lib/packet_mosq.c
@@ -42,14 +42,8 @@ Contributors:
# include "sys_tree.h"
# include "send_mosq.h"
#else
-# define G_BYTES_RECEIVED_INC(A)
-# define G_BYTES_SENT_INC(A)
-# define G_MSGS_SENT_INC(A)
-# define G_PUB_MSGS_SENT_INC(A)
-# define G_OUT_PACKET_COUNT_INC(A)
-# define G_OUT_PACKET_COUNT_DEC(A)
-# define G_OUT_PACKET_BYTES_INC(A)
-# define G_OUT_PACKET_BYTES_DEC(A)
+# define metrics__int_inc(stat, val)
+# define metrics__int_dec(stat, val)
#endif
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
@@ -124,8 +118,8 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq)
mosquitto__FREE(packet);
}
- G_OUT_PACKET_COUNT_DEC(mosq->out_packet_count);
- G_OUT_PACKET_BYTES_DEC(mosq->out_packet_bytes);
+ metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
+ metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
mosq->out_packet_count = 0;
mosq->out_packet_bytes = 0;
mosq->out_packet_last = NULL;
@@ -152,8 +146,8 @@ static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packe
mosq->out_packet_last = packet;
mosq->out_packet_count++;
mosq->out_packet_bytes += packet->packet_length;
- G_OUT_PACKET_COUNT_INC(1);
- G_OUT_PACKET_BYTES_INC(packet->packet_length);
+ metrics__int_inc(mosq_gauge_out_packets, 1);
+ metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
pthread_mutex_unlock(&mosq->out_packet_mutex);
}
@@ -236,8 +230,8 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
if(mosq->out_packet){
mosq->out_packet_count--;
mosq->out_packet_bytes -= mosq->out_packet->packet_length;
- G_OUT_PACKET_COUNT_DEC(1);
- G_OUT_PACKET_BYTES_DEC(mosq->out_packet->packet_length);
+ metrics__int_dec(mosq_gauge_out_packets, 1);
+ metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){
@@ -283,7 +277,7 @@ int packet__write(struct mosquitto *mosq)
while(packet->to_process > 0){
write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
- G_BYTES_SENT_INC(write_length);
+ metrics__int_inc(mosq_counter_bytes_sent, write_length);
packet->to_process -= (uint32_t)write_length;
packet->pos += (uint32_t)write_length;
}else{
@@ -309,17 +303,14 @@ int packet__write(struct mosquitto *mosq)
}
}
- G_MSGS_SENT_INC(1);
+ metrics__int_inc(mosq_counter_messages_sent, 1);
if(((packet->command)&0xF6) == CMD_PUBLISH){
- G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER
callback__on_publish(mosq, packet->mid, 0, NULL);
}else if(((packet->command)&0xF0) == CMD_DISCONNECT){
do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
return MOSQ_ERR_SUCCESS;
#endif
- }else if(((packet->command)&0xF0) == CMD_PUBLISH){
- G_PUB_MSGS_SENT_INC(1);
}
next_packet = packet__get_next_out(mosq);
@@ -390,7 +381,7 @@ int packet__read(struct mosquitto *mosq)
if(read_length == 1){
mosq->in_packet.command = byte;
#ifdef WITH_BROKER
- G_BYTES_RECEIVED_INC(1);
+ metrics__int_inc(mosq_counter_bytes_received, 1);
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
@@ -438,7 +429,7 @@ int packet__read(struct mosquitto *mosq)
return MOSQ_ERR_MALFORMED_PACKET;
}
- G_BYTES_RECEIVED_INC(1);
+ metrics__int_inc(mosq_counter_bytes_received, 1);
mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
mosq->in_packet.remaining_mult *= 128;
}else{
@@ -518,7 +509,7 @@ int packet__read(struct mosquitto *mosq)
while(mosq->in_packet.to_process>0){
read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){
- G_BYTES_RECEIVED_INC(read_length);
+ metrics__int_inc(mosq_counter_bytes_received, read_length);
mosq->in_packet.to_process -= (uint32_t)read_length;
mosq->in_packet.pos += (uint32_t)read_length;
}else{
@@ -557,10 +548,7 @@ int packet__read(struct mosquitto *mosq)
/* All data for this packet is read. */
mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
- G_MSGS_RECEIVED_INC(1);
- if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){
- G_PUB_MSGS_RECEIVED_INC(1);
- }
+ metrics__int_inc(mosq_counter_messages_received, 1);
#endif
rc = handle__packet(mosq);
diff --git a/lib/send_disconnect.c b/lib/send_disconnect.c
index 7dc7acd8..6f291316 100644
--- a/lib/send_disconnect.c
+++ b/lib/send_disconnect.c
@@ -22,6 +22,7 @@ Contributors:
#ifdef WITH_BROKER
# include "mosquitto_broker_internal.h"
+# include "sys_tree.h"
#endif
#include "mosquitto.h"
diff --git a/lib/send_mosq.c b/lib/send_mosq.c
index 8c3bfc5b..9eeac172 100644
--- a/lib/send_mosq.c
+++ b/lib/send_mosq.c
@@ -24,6 +24,7 @@ Contributors:
#ifdef WITH_BROKER
# include "mosquitto_broker_internal.h"
+# include "sys_tree.h"
#endif
#include "mosquitto.h"
@@ -172,4 +173,3 @@ int send__simple_command(struct mosquitto *mosq, uint8_t command)
return packet__queue(mosq, packet);
}
-
diff --git a/lib/send_publish.c b/lib/send_publish.c
index 960993fe..f11cfdeb 100644
--- a/lib/send_publish.c
+++ b/lib/send_publish.c
@@ -25,7 +25,7 @@ Contributors:
# include "mosquitto_broker_internal.h"
# include "sys_tree.h"
#else
-# define G_PUB_BYTES_SENT_INC(A)
+# define metrics__int_inc(stat, val)
#endif
#include "alias_mosq.h"
@@ -156,7 +156,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
mapped_topic = topic_temp;
}
log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, mapped_topic, (long)payloadlen);
- G_PUB_BYTES_SENT_INC(payloadlen);
+ metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
mosquitto__FREE(mapped_topic);
return rc;
@@ -166,7 +166,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
#endif
log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
- G_PUB_BYTES_SENT_INC(payloadlen);
+ metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
#endif
@@ -301,6 +301,10 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
#endif
}
+#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
+ metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
+#endif
+
/* Payload */
if(payloadlen && payload){
packet__write_bytes(packet, payload, payloadlen);
diff --git a/lib/send_subscribe.c b/lib/send_subscribe.c
index c612da0a..004efd9b 100644
--- a/lib/send_subscribe.c
+++ b/lib/send_subscribe.c
@@ -93,4 +93,3 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *con
return packet__queue(mosq, packet);
}
-
diff --git a/man/mosquitto.8.xml b/man/mosquitto.8.xml
index 2d96c9cf..9e25953b 100644
--- a/man/mosquitto.8.xml
+++ b/man/mosquitto.8.xml
@@ -419,9 +419,8 @@
- The total number of active and inactive clients
- currently connected and registered on the
- broker.
+ The total number of connected and disconnected client sessions
+ currently registered on the broker.
@@ -437,7 +436,17 @@
-
+
+
+
+ The total number of socket connections that have been
+ made to the broker, whether or not the MQTT connections
+ were ultimately successful.
+
+
+
+
+
The current size of the heap memory in use by
mosquitto. Note that this topic may be unavailable
@@ -445,7 +454,7 @@
-
+
The largest amount of heap memory used by
mosquitto. Note that this topic may be unavailable
@@ -550,13 +559,6 @@
or 15 minutes.
-
-
-
- The number of messages with QoS>0 that are awaiting
- acknowledgments.
-
-
@@ -569,6 +571,153 @@
The total number of messages of any type sent since the broker started.
+
+
+
+ The total number of MQTT CONNECT messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT CONNACK messages sent since the broker started.
+
+
+
+
+
+
+
+ The total number of MQTT PUBLISH messages that have been
+ dropped due to inflight/queuing limits. See the
+ max_inflight_messages and max_queued_messages options
+ in mosquitto.conf5
+ for more information.
+
+
+
+
+
+
+
+ The total number of MQTT PUBLISH messages received since the broker started.
+
+
+
+
+
+
+ The total number of MQTT PUBLISH messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBACK messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBACK messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBREC messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBREC messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBREL messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBREL messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBCOMP messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT PUBCOMP messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT SUBSCRIBE messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT SUBACK messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT UNSUBSCRIBE messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT UNSUBACK messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT PINGREQ messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT PINGRESP messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT DISCONNECT messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT DISCONNECT messages sent since the broker started.
+
+
+
+
+
+ The total number of MQTT AUTH messages received since the broker started.
+
+
+
+
+
+ The total number of MQTT AUTH messages sent since the broker started.
+
+
@@ -592,26 +741,15 @@
-
-
- The total number of publish messages that have been
- dropped due to inflight/queuing limits. See the
- max_inflight_messages and max_queued_messages options
- in
- mosquitto.conf5
- for more information.
-
-
-
-
+
- The total number of PUBLISH messages received since the broker started.
+ The total number of PUBLISH payload bytes received since the broker started.
-
+
- The total number of PUBLISH messages sent since the broker started.
+ The total number of PUBLISH payload bytes sent since the broker started.
@@ -637,6 +775,12 @@
and messages queued for durable clients.
+
+
+
+ The total number of shared subscriptions active on the broker.
+
+
diff --git a/src/bridge.c b/src/bridge.c
index 092507eb..75013d7b 100644
--- a/src/bridge.c
+++ b/src/bridge.c
@@ -849,8 +849,8 @@ static void bridge__packet_cleanup(struct mosquitto *context)
}
context->out_packet = NULL;
context->out_packet_last = NULL;
- G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
- G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
+ metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
+ metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;
diff --git a/src/context.c b/src/context.c
index d784d538..5d002431 100644
--- a/src/context.c
+++ b/src/context.c
@@ -165,8 +165,8 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next;
mosquitto__FREE(packet);
}
- G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
- G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
+ metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
+ metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
diff --git a/src/database.c b/src/database.c
index e1a443f0..d3d77151 100644
--- a/src/database.c
+++ b/src/database.c
@@ -470,7 +470,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
"Outgoing messages are being dropped for client %s.",
context->id);
}
- G_MSGS_DROPPED_INC();
+ metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
context->stats.messages_dropped++;
return 2;
@@ -601,14 +601,14 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
"Outgoing messages are being dropped for client %s.",
context->id);
}
- G_MSGS_DROPPED_INC();
+ metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
return 2;
}
}else{
if (db__ready_for_queue(context, qos, msg_data)){
state = mosq_ms_queued;
}else{
- G_MSGS_DROPPED_INC();
+ metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
if(context->is_dropping == false){
context->is_dropping = true;
log__printf(NULL, MOSQ_LOG_NOTICE,
diff --git a/src/handle_connect.c b/src/handle_connect.c
index 49530472..04237fdf 100644
--- a/src/handle_connect.c
+++ b/src/handle_connect.c
@@ -593,8 +593,6 @@ int handle__connect(struct mosquitto *context)
uint16_t auth_data_out_len = 0;
bool allow_zero_length_clientid;
- G_CONNECTION_COUNT_INC();
-
if(!context->listener){
return MOSQ_ERR_INVAL;
}
diff --git a/src/handle_publish.c b/src/handle_publish.c
index 739eec01..d1d40a40 100644
--- a/src/handle_publish.c
+++ b/src/handle_publish.c
@@ -215,7 +215,7 @@ int handle__publish(struct mosquitto *context)
}
base_msg->data.payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
- G_PUB_BYTES_RECEIVED_INC(base_msg->data.payloadlen);
+ metrics__int_inc(mosq_counter_pub_bytes_received, base_msg->data.payloadlen);
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + strlen(base_msg->data.topic) + 1;
topic_mount = mosquitto__malloc(len+1);
diff --git a/src/loop.c b/src/loop.c
index 4e6c5a57..95afb994 100644
--- a/src/loop.c
+++ b/src/loop.c
@@ -194,7 +194,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
db.next_event_ms = 86400000;
#ifdef WITH_SYS_TREE
if(db.config->sys_interval > 0){
- sys_tree__update();
+ sys_tree__update(false);
}
#endif
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index 525d390f..c4b9fcad 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -731,7 +731,7 @@ int db__message_reconnect_reset(struct mosquitto *context);
bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos);
bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data);
void sys_tree__init(void);
-void sys_tree__update(void);
+void sys_tree__update(bool force);
int db__message_write_inflight_out_all(struct mosquitto *context);
int db__message_write_inflight_out_latest(struct mosquitto *context);
int db__message_write_queued_out(struct mosquitto *context);
diff --git a/src/net.c b/src/net.c
index 097fd7e9..cf4a93f5 100644
--- a/src/net.c
+++ b/src/net.c
@@ -147,7 +147,7 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock
return NULL;
}
- G_SOCKET_CONNECTIONS_INC();
+ metrics__int_inc(mosq_counter_socket_connections, 1);
if(net__socket_nonblock(&new_sock)){
return NULL;
diff --git a/src/read_handle.c b/src/read_handle.c
index cd2001b9..82280c57 100644
--- a/src/read_handle.c
+++ b/src/read_handle.c
@@ -52,6 +52,7 @@ int handle__packet(struct mosquitto *context)
rc = handle__pubackcomp(context, "PUBCOMP");
break;
case CMD_PUBLISH:
+ metrics__int_inc(mosq_counter_mqtt_publish_received, 1);
rc = handle__publish(context);
break;
case CMD_PUBREC:
@@ -61,6 +62,7 @@ int handle__packet(struct mosquitto *context)
rc = handle__pubrel(context);
break;
case CMD_CONNECT:
+ metrics__int_inc(mosq_counter_mqtt_connect_received, 1);
return handle__connect(context);
case CMD_DISCONNECT:
rc = handle__disconnect(context);
diff --git a/src/send_auth.c b/src/send_auth.c
index 620210d3..bec5814a 100644
--- a/src/send_auth.c
+++ b/src/send_auth.c
@@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
+#include "sys_tree.h"
#include "util_mosq.h"
int send__auth(struct mosquitto *context, uint8_t reason_code, const void *auth_data, uint16_t auth_data_len)
diff --git a/src/send_connack.c b/src/send_connack.c
index e6e482fd..d3a48a16 100644
--- a/src/send_connack.c
+++ b/src/send_connack.c
@@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
+#include "sys_tree.h"
#include "util_mosq.h"
int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, const mosquitto_property *properties)
diff --git a/src/send_suback.c b/src/send_suback.c
index c68c4c57..213a901d 100644
--- a/src/send_suback.c
+++ b/src/send_suback.c
@@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
+#include "sys_tree.h"
#include "util_mosq.h"
diff --git a/src/send_unsuback.c b/src/send_unsuback.c
index 2e60d2e9..c8365eca 100644
--- a/src/send_unsuback.c
+++ b/src/send_unsuback.c
@@ -25,6 +25,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
+#include "sys_tree.h"
int send__unsuback(struct mosquitto *mosq, uint16_t mid, int reason_code_count, uint8_t *reason_codes, const mosquitto_property *properties)
diff --git a/src/session_expiry.c b/src/session_expiry.c
index 73faab32..138a0b2b 100644
--- a/src/session_expiry.c
+++ b/src/session_expiry.c
@@ -175,7 +175,7 @@ void session_expiry__check(void)
if(context->id){
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring client %s due to timeout.", context->id);
}
- G_CLIENTS_EXPIRED_INC();
+ metrics__int_inc(mosq_counter_clients_expired, 1);
/* Session has now expired, so clear interval */
context->session_expiry_interval = 0;
diff --git a/src/sys_tree.c b/src/sys_tree.c
index 560151bb..72d6dbe4 100644
--- a/src/sys_tree.c
+++ b/src/sys_tree.c
@@ -34,25 +34,90 @@ Contributors:
#define SYS_TREE_QOS 2
-uint64_t g_bytes_received = 0;
-uint64_t g_bytes_sent = 0;
-uint64_t g_pub_bytes_received = 0;
-uint64_t g_pub_bytes_sent = 0;
-int64_t g_out_packet_bytes = 0;
-unsigned long g_msgs_received = 0;
-unsigned long g_msgs_sent = 0;
-unsigned long g_pub_msgs_received = 0;
-unsigned long g_pub_msgs_sent = 0;
-unsigned long g_msgs_dropped = 0;
-long g_out_packet_count = 0;
-unsigned int g_clients_expired = 0;
-unsigned int g_socket_connections = 0;
-unsigned int g_connection_count = 0;
+#define METRIC_LOAD_1MIN 1
+#define METRIC_LOAD_5MIN 2
+#define METRIC_LOAD_15MIN 3
+
+struct metric{
+ int64_t current;
+ int64_t next;
+ const char *topic, *topic_alias;
+ bool is_max;
+};
+
+struct metric_load{
+ double current;
+ const char *topic;
+ int load_ref;
+ uint8_t interval;
+};
+
+struct metric metrics[mosq_metric_max] = {
+ { 1, 0, "$SYS/broker/clients/total", NULL, false }, /* mosq_gauge_clients_total */
+ { 1, 0, "$SYS/broker/clients/maximum", NULL, true }, /* metric_clients_maximum */
+ { 1, 0, "$SYS/broker/clients/disconnected", "$SYS/broker/clients/inactive", false }, /* mosq_gauge_clients_disconnected */
+ { 1, 0, "$SYS/broker/clients/connected", "$SYS/broker/clients/active", false }, /* mosq_gauge_clients_connected */
+ { 1, 0, "$SYS/broker/clients/expired", NULL, false }, /* mosq_counter_clients_expired */
+ { 1, 0, "$SYS/broker/messages/stored", "$SYS/broker/store/messages/count", false }, /* mosq_gauge_message_store_count */
+ { 1, 0, "$SYS/broker/store/messages/bytes", NULL, false }, /* mosq_gauge_message_store_bytes */
+ { 1, 0, "$SYS/broker/subscriptions/count", NULL, false }, /* mosq_gauge_subscription_count */
+ { 1, 0, "$SYS/broker/shared_subscriptions/count", NULL, false }, /* mosq_gauge_shared_subscription_count */
+ { 1, 0, "$SYS/broker/retained messages/count", NULL, false }, /* mosq_gauge_retained_message_count */
+#ifdef REAL_WITH_MEMORY_TRACKING
+ { 1, 0, "$SYS/broker/heap/current", NULL, false }, /* mosq_gauge_heap_current */
+ { 1, 0, "$SYS/broker/heap/maximum", NULL, true }, /* mosq_gauge_heap_maximum */
+#else
+ { 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_current */
+ { 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_maximum */
+#endif
+ { 1, 0, "$SYS/broker/messages/received", NULL, false }, /* mosq_counter_messages_received */
+ { 1, 0, "$SYS/broker/messages/sent", NULL, false }, /* mosq_counter_messages_sent */
+ { 1, 0, "$SYS/broker/bytes/received", NULL, false }, /* mosq_counter_bytes_received */
+ { 1, 0, "$SYS/broker/bytes/sent", NULL, false }, /* mosq_counter_bytes_sent */
+ { 1, 0, "$SYS/broker/publish/bytes/received", NULL, false }, /* mosq_counter_pub_bytes_received */
+ { 1, 0, "$SYS/broker/publish/bytes/sent", NULL, false }, /* mosq_counter_pub_bytes_sent */
+ { 1, 0, "$SYS/broker/packet/out/count", NULL, false }, /* mosq_gauge_out_packet_count */
+ { 1, 0, "$SYS/broker/packet/out/bytes", NULL, false }, /* mosq_gauge_out_packet_bytes */
+ { 1, 0, "$SYS/broker/connections/socket/count", NULL, false }, /* mosq_counter_socket_connections */
+ { 1, 0, NULL, NULL, false }, /* mosq_counter_mqtt_connect_received */
+ { 1, 0, "$SYS/broker/publish/messages/dropped", NULL, false }, /* mosq_counter_mqtt_publish_dropped */
+ { 1, 0, "$SYS/broker/publish/messages/received", NULL, false }, /* mosq_counter_mqtt_publish_received */
+ { 1, 0, "$SYS/broker/publish/messages/sent", NULL, false }, /* mosq_counter_mqtt_publish_sent */
+};
+
+struct metric_load metric_loads[mosq_metric_load_max] = {
+ { 0.0, "$SYS/broker/load/messages/received/1min", mosq_counter_messages_received, METRIC_LOAD_1MIN }, /* metric_load_messages_received_1min */
+ { 0.0, "$SYS/broker/load/messages/received/5min", mosq_counter_messages_received, METRIC_LOAD_5MIN }, /* metric_load_messages_received_5min */
+ { 0.0, "$SYS/broker/load/messages/received/15min", mosq_counter_messages_received, METRIC_LOAD_15MIN }, /* metric_load_messages_received_15min */
+ { 0.0, "$SYS/broker/load/messages/sent/1min", mosq_counter_messages_sent, METRIC_LOAD_1MIN }, /* metric_load_messages_sent_1min */
+ { 0.0, "$SYS/broker/load/messages/sent/5min", mosq_counter_messages_sent, METRIC_LOAD_5MIN }, /* metric_load_messages_sent_5min */
+ { 0.0, "$SYS/broker/load/messages/sent/15min", mosq_counter_messages_sent, METRIC_LOAD_15MIN }, /* metric_load_messages_sent_15min */
+ { 0.0, "$SYS/broker/load/publish/dropped/1min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_dropped_1min */
+ { 0.0, "$SYS/broker/load/publish/dropped/5min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_dropped_5min */
+ { 0.0, "$SYS/broker/load/publish/dropped/15min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_dropped_15min */
+ { 0.0, "$SYS/broker/load/publish/received/1min", mosq_counter_mqtt_publish_received, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_received_1min */
+ { 0.0, "$SYS/broker/load/publish/received/5min", mosq_counter_mqtt_publish_received, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_received_5min */
+ { 0.0, "$SYS/broker/load/publish/received/15min", mosq_counter_mqtt_publish_received, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_received_15min */
+ { 0.0, "$SYS/broker/load/publish/sent/1min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_sent_1min */
+ { 0.0, "$SYS/broker/load/publish/sent/5min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_sent_5min */
+ { 0.0, "$SYS/broker/load/publish/sent/15min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_sent_15min */
+ { 0.0, "$SYS/broker/load/bytes/received/1min", mosq_counter_bytes_received, METRIC_LOAD_1MIN }, /* metric_load_bytes_received_1min */
+ { 0.0, "$SYS/broker/load/bytes/received/5min", mosq_counter_bytes_received, METRIC_LOAD_5MIN }, /* metric_load_bytes_received_5min */
+ { 0.0, "$SYS/broker/load/bytes/received/15min", mosq_counter_bytes_received, METRIC_LOAD_15MIN }, /* metric_load_bytes_received_15min */
+ { 0.0, "$SYS/broker/load/bytes/sent/1min", mosq_counter_bytes_sent, METRIC_LOAD_1MIN }, /* metric_load_bytes_sent_1min */
+ { 0.0, "$SYS/broker/load/bytes/sent/5min", mosq_counter_bytes_sent, METRIC_LOAD_5MIN }, /* metric_load_bytes_sent_5min */
+ { 0.0, "$SYS/broker/load/bytes/sent/15min", mosq_counter_bytes_sent, METRIC_LOAD_15MIN }, /* metric_load_bytes_sent_15min */
+ { 0.0, "$SYS/broker/load/sockets/1min", mosq_counter_socket_connections, METRIC_LOAD_1MIN }, /* metric_load_socket_connections_1min */
+ { 0.0, "$SYS/broker/load/sockets/5min", mosq_counter_socket_connections, METRIC_LOAD_5MIN }, /* metric_load_socket_connections_5min */
+ { 0.0, "$SYS/broker/load/sockets/15min", mosq_counter_socket_connections, METRIC_LOAD_15MIN }, /* metric_load_socket_connections_15min */
+ { 0.0, "$SYS/broker/load/connections/1min", mosq_counter_mqtt_connect_received, METRIC_LOAD_1MIN }, /* metric_load_connections_1min */
+ { 0.0, "$SYS/broker/load/connections/5min", mosq_counter_mqtt_connect_received, METRIC_LOAD_5MIN }, /* metric_load_connections_5min */
+ { 0.0, "$SYS/broker/load/connections/15min", mosq_counter_mqtt_connect_received, METRIC_LOAD_15MIN }, /* metric_load_connections_15min */
+};
static time_t start_time = 0;
static time_t last_update = 0;
-
void sys_tree__init(void)
{
char buf[64];
@@ -68,156 +133,50 @@ void sys_tree__init(void)
start_time = mosquitto_time();
last_update = start_time;
+
+ sys_tree__update(true);
}
-static void sys_tree__update_clients(char *buf)
+void metrics__int_inc(enum mosq_metric_type m, int64_t value)
{
- static unsigned int client_count = UINT_MAX;
- static unsigned int clients_expired = UINT_MAX;
- static unsigned int client_max = 0;
- static unsigned int disconnected_count = UINT_MAX;
- static unsigned int connected_count = UINT_MAX;
- uint32_t len;
-
- unsigned int count_total, count_by_sock;
-
- count_total = HASH_CNT(hh_id, db.contexts_by_id);
- count_by_sock = HASH_CNT(hh_sock, db.contexts_by_sock);
-
- if(client_count != count_total){
- client_count = count_total;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_count);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/total", SYS_TREE_QOS, len, buf, 1, 0, NULL);
-
- if(client_count > client_max){
- client_max = client_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_max);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
- }
-
- if(disconnected_count != count_total-count_by_sock){
- disconnected_count = count_total-count_by_sock;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", disconnected_count);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/inactive", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/disconnected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
- if(connected_count != count_by_sock){
- connected_count = count_by_sock;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", connected_count);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/active", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/connected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
- if(g_clients_expired != clients_expired){
- clients_expired = g_clients_expired;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", clients_expired);
- db__messages_easy_queue(NULL, "$SYS/broker/clients/expired", SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ if(m < mosq_metric_max){
+ metrics[m].next += value;
}
}
-#ifdef REAL_WITH_MEMORY_TRACKING
-static void sys_tree__update_memory(char *buf)
+void metrics__int_dec(enum mosq_metric_type m, int64_t value)
{
- static unsigned long current_heap = ULONG_MAX;
- static unsigned long max_heap = ULONG_MAX;
- unsigned long value_ul;
- uint32_t len;
-
- value_ul = mosquitto__memory_used();
- if(current_heap != value_ul){
- current_heap = value_ul;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", current_heap);
- db__messages_easy_queue(NULL, "$SYS/broker/heap/current", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
- value_ul =mosquitto__max_memory_used();
- if(max_heap != value_ul){
- max_heap = value_ul;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", max_heap);
- db__messages_easy_queue(NULL, "$SYS/broker/heap/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ if(m < mosq_metric_max){
+ metrics[m].next -= value;
}
}
-#endif
-static void calc_load(char *buf, const char *topic, double exponent, double interval, double *current)
+static void calc_load(char *buf, double exponent, double i_mult, struct metric_load *m)
{
double new_value;
uint32_t len;
+ double interval;
- new_value = interval + exponent*((*current) - interval);
- if(fabs(new_value - (*current)) >= 0.01){
+ interval = (double)(metrics[m->load_ref].next - metrics[m->load_ref].current)*i_mult;
+ new_value = interval + exponent*(m->current - interval);
+ if(fabs(new_value - (m->current)) >= 0.01){
len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value);
- db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ db__messages_easy_queue(NULL, m->topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
- (*current) = new_value;
+ m->current = new_value;
}
+
/* Send messages for the $SYS hierarchy if the last update is longer than
* 'interval' seconds ago.
* 'interval' is the amount of seconds between updates. If 0, then no periodic
* messages are sent for the $SYS hierarchy.
* 'start_time' is the result of time() that the broker was started at.
*/
-void sys_tree__update(void)
+void sys_tree__update(bool force)
{
time_t uptime;
char buf[BUFLEN];
-
- static int msg_store_count = INT_MAX;
- static unsigned long msg_store_bytes = ULONG_MAX;
- static unsigned long msgs_received = ULONG_MAX;
- static unsigned long msgs_sent = ULONG_MAX;
- static unsigned long publish_dropped = ULONG_MAX;
- static unsigned long pub_msgs_received = ULONG_MAX;
- static unsigned long pub_msgs_sent = ULONG_MAX;
- static unsigned long long bytes_received = ULLONG_MAX;
- static unsigned long long bytes_sent = ULLONG_MAX;
- static unsigned long long pub_bytes_received = ULLONG_MAX;
- static unsigned long long pub_bytes_sent = ULLONG_MAX;
- static int subscription_count = INT_MAX;
- static int shared_subscription_count = INT_MAX;
- static int retained_count = INT_MAX;
- static long out_packet_count = LONG_MAX;
- static long long out_packet_bytes = LLONG_MAX;
-
- static double msgs_received_load1 = 0;
- static double msgs_received_load5 = 0;
- static double msgs_received_load15 = 0;
- static double msgs_sent_load1 = 0;
- static double msgs_sent_load5 = 0;
- static double msgs_sent_load15 = 0;
- static double publish_dropped_load1 = 0;
- static double publish_dropped_load5 = 0;
- static double publish_dropped_load15 = 0;
- double msgs_received_interval, msgs_sent_interval, publish_dropped_interval;
-
- static double publish_received_load1 = 0;
- static double publish_received_load5 = 0;
- static double publish_received_load15 = 0;
- static double publish_sent_load1 = 0;
- static double publish_sent_load5 = 0;
- static double publish_sent_load15 = 0;
- double publish_received_interval, publish_sent_interval;
-
- static double bytes_received_load1 = 0;
- static double bytes_received_load5 = 0;
- static double bytes_received_load15 = 0;
- static double bytes_sent_load1 = 0;
- static double bytes_sent_load5 = 0;
- static double bytes_sent_load15 = 0;
- double bytes_received_interval, bytes_sent_interval;
-
- static double socket_load1 = 0;
- static double socket_load5 = 0;
- static double socket_load15 = 0;
- double socket_interval;
-
- static double connection_load1 = 0;
- static double connection_load5 = 0;
- static double connection_load15 = 0;
- double connection_interval;
-
- double exponent;
- double i_mult;
uint32_t len;
time_t next_event;
static time_t last_update_real = 0;
@@ -231,170 +190,59 @@ void sys_tree__update(void)
}
if(db.config->sys_interval
- && db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s){
+ && ((db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s) || force)){
uptime = db.now_s - start_time;
len = (uint32_t)snprintf(buf, BUFLEN, "%" PRIu64 " seconds", (uint64_t)uptime);
db__messages_easy_queue(NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- sys_tree__update_clients(buf);
- if(db.now_s > last_update){
- i_mult = 60.0/(double)(db.now_s-last_update);
-
- msgs_received_interval = (double)(g_msgs_received - msgs_received)*i_mult;
- msgs_sent_interval = (double)(g_msgs_sent - msgs_sent)*i_mult;
- publish_dropped_interval = (double)(g_msgs_dropped - publish_dropped)*i_mult;
-
- publish_received_interval = (double)(g_pub_msgs_received - pub_msgs_received)*i_mult;
- publish_sent_interval = (double)(g_pub_msgs_sent - pub_msgs_sent)*i_mult;
-
- bytes_received_interval = (double)(g_bytes_received - bytes_received)*i_mult;
- bytes_sent_interval = (double)(g_bytes_sent - bytes_sent)*i_mult;
-
- socket_interval = g_socket_connections*i_mult;
- g_socket_connections = 0;
- connection_interval = g_connection_count*i_mult;
- g_connection_count = 0;
-
- /* 1 minute load */
- exponent = exp(-1.0*(double)(db.now_s-last_update)/60.0);
-
- calc_load(buf, "$SYS/broker/load/messages/received/1min", exponent, msgs_received_interval, &msgs_received_load1);
- calc_load(buf, "$SYS/broker/load/messages/sent/1min", exponent, msgs_sent_interval, &msgs_sent_load1);
- calc_load(buf, "$SYS/broker/load/publish/dropped/1min", exponent, publish_dropped_interval, &publish_dropped_load1);
- calc_load(buf, "$SYS/broker/load/publish/received/1min", exponent, publish_received_interval, &publish_received_load1);
- calc_load(buf, "$SYS/broker/load/publish/sent/1min", exponent, publish_sent_interval, &publish_sent_load1);
- calc_load(buf, "$SYS/broker/load/bytes/received/1min", exponent, bytes_received_interval, &bytes_received_load1);
- calc_load(buf, "$SYS/broker/load/bytes/sent/1min", exponent, bytes_sent_interval, &bytes_sent_load1);
- calc_load(buf, "$SYS/broker/load/sockets/1min", exponent, socket_interval, &socket_load1);
- calc_load(buf, "$SYS/broker/load/connections/1min", exponent, connection_interval, &connection_load1);
-
- /* 5 minute load */
- exponent = exp(-1.0*(double)(db.now_s-last_update)/300.0);
-
- calc_load(buf, "$SYS/broker/load/messages/received/5min", exponent, msgs_received_interval, &msgs_received_load5);
- calc_load(buf, "$SYS/broker/load/messages/sent/5min", exponent, msgs_sent_interval, &msgs_sent_load5);
- calc_load(buf, "$SYS/broker/load/publish/dropped/5min", exponent, publish_dropped_interval, &publish_dropped_load5);
- calc_load(buf, "$SYS/broker/load/publish/received/5min", exponent, publish_received_interval, &publish_received_load5);
- calc_load(buf, "$SYS/broker/load/publish/sent/5min", exponent, publish_sent_interval, &publish_sent_load5);
- calc_load(buf, "$SYS/broker/load/bytes/received/5min", exponent, bytes_received_interval, &bytes_received_load5);
- calc_load(buf, "$SYS/broker/load/bytes/sent/5min", exponent, bytes_sent_interval, &bytes_sent_load5);
- calc_load(buf, "$SYS/broker/load/sockets/5min", exponent, socket_interval, &socket_load5);
- calc_load(buf, "$SYS/broker/load/connections/5min", exponent, connection_interval, &connection_load5);
-
- /* 15 minute load */
- exponent = exp(-1.0*(double)(db.now_s-last_update)/900.0);
-
- calc_load(buf, "$SYS/broker/load/messages/received/15min", exponent, msgs_received_interval, &msgs_received_load15);
- calc_load(buf, "$SYS/broker/load/messages/sent/15min", exponent, msgs_sent_interval, &msgs_sent_load15);
- calc_load(buf, "$SYS/broker/load/publish/dropped/15min", exponent, publish_dropped_interval, &publish_dropped_load15);
- calc_load(buf, "$SYS/broker/load/publish/received/15min", exponent, publish_received_interval, &publish_received_load15);
- calc_load(buf, "$SYS/broker/load/publish/sent/15min", exponent, publish_sent_interval, &publish_sent_load15);
- calc_load(buf, "$SYS/broker/load/bytes/received/15min", exponent, bytes_received_interval, &bytes_received_load15);
- calc_load(buf, "$SYS/broker/load/bytes/sent/15min", exponent, bytes_sent_interval, &bytes_sent_load15);
- calc_load(buf, "$SYS/broker/load/sockets/15min", exponent, socket_interval, &socket_load15);
- calc_load(buf, "$SYS/broker/load/connections/15min", exponent, connection_interval, &connection_load15);
- }
-
- if(db.msg_store_count != msg_store_count){
- msg_store_count = db.msg_store_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", msg_store_count);
- db__messages_easy_queue(NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- db__messages_easy_queue(NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if (db.msg_store_bytes != msg_store_bytes){
- msg_store_bytes = db.msg_store_bytes;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msg_store_bytes);
- db__messages_easy_queue(NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(db.subscription_count != subscription_count){
- subscription_count = db.subscription_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", subscription_count);
- db__messages_easy_queue(NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(db.shared_subscription_count != shared_subscription_count){
- shared_subscription_count = db.shared_subscription_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", shared_subscription_count);
- db__messages_easy_queue(NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(db.retained_count != retained_count){
- retained_count = db.retained_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%d", retained_count);
- db__messages_easy_queue(NULL, "$SYS/broker/retained messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
+ /* Update metrics values where not otherwise updated */
+ metrics[mosq_gauge_message_store_count].next = db.msg_store_count;
+ metrics[mosq_gauge_message_store_bytes].next = (int64_t)db.msg_store_bytes;
+ metrics[mosq_gauge_subscriptions].next = db.subscription_count;
+ metrics[mosq_gauge_shared_subscriptions].next = db.shared_subscription_count;
+ metrics[mosq_gauge_retained_messages].next = db.retained_count;
#ifdef REAL_WITH_MEMORY_TRACKING
- sys_tree__update_memory(buf);
+ metrics[mosq_gauge_heap_current].next = (int64_t)mosquitto__memory_used();
+ metrics[mosq_counter_heap_maximum].next = (int64_t)mosquitto__max_memory_used();
#endif
+ metrics[mosq_gauge_clients_total].next = HASH_CNT(hh_id, db.contexts_by_id);
+ metrics[mosq_counter_clients_maximum].next = HASH_CNT(hh_id, db.contexts_by_id);
+ metrics[mosq_gauge_clients_connected].next = HASH_CNT(hh_sock, db.contexts_by_sock);
+ metrics[mosq_gauge_clients_disconnected].next = HASH_CNT(hh_id, db.contexts_by_id) - HASH_CNT(hh_sock, db.contexts_by_sock);
- if(msgs_received != g_msgs_received){
- msgs_received = g_msgs_received;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_received);
- db__messages_easy_queue(NULL, "$SYS/broker/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(msgs_sent != g_msgs_sent){
- msgs_sent = g_msgs_sent;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_sent);
- db__messages_easy_queue(NULL, "$SYS/broker/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(publish_dropped != g_msgs_dropped){
- publish_dropped = g_msgs_dropped;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", publish_dropped);
- db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/dropped", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(pub_msgs_received != g_pub_msgs_received){
- pub_msgs_received = g_pub_msgs_received;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_received);
- db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(pub_msgs_sent != g_pub_msgs_sent){
- pub_msgs_sent = g_pub_msgs_sent;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_sent);
- db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(bytes_received != g_bytes_received){
- bytes_received = g_bytes_received;
- len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_received);
- db__messages_easy_queue(NULL, "$SYS/broker/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(bytes_sent != g_bytes_sent){
- bytes_sent = g_bytes_sent;
- len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_sent);
- db__messages_easy_queue(NULL, "$SYS/broker/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(pub_bytes_received != g_pub_bytes_received){
- pub_bytes_received = g_pub_bytes_received;
- len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_received);
- db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(pub_bytes_sent != g_pub_bytes_sent){
- pub_bytes_sent = g_pub_bytes_sent;
- len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_sent);
- db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(out_packet_count != g_out_packet_count){
- out_packet_count = g_out_packet_count;
- len = (uint32_t)snprintf(buf, BUFLEN, "%lu", out_packet_count);
- db__messages_easy_queue(NULL, "$SYS/broker/packet/out/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
- }
-
- if(out_packet_bytes != g_out_packet_bytes){
- out_packet_bytes = g_out_packet_bytes;
- len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_bytes);
- db__messages_easy_queue(NULL, "$SYS/broker/packet/out/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ /* Handle loads first, because they reference other metrics and need next != current */
+ if(db.now_s > last_update){
+ double i_mult = 60.0/(double)(db.now_s-last_update);
+
+ double exponent_1min = exp(-1.0*(double)(db.now_s-last_update)/60.0);
+ double exponent_5min = exp(-1.0*(double)(db.now_s-last_update)/300.0);
+ double exponent_15min = exp(-1.0*(double)(db.now_s-last_update)/900.0);
+
+ for(int i=0; i metrics[i].current) ||
+ (!metrics[i].is_max && metrics[i].next != metrics[i].current)){
+
+ metrics[i].current = metrics[i].next;
+ len = (uint32_t)snprintf(buf, BUFLEN, "%lu", metrics[i].current);
+ if(metrics[i].topic){
+ db__messages_easy_queue(NULL, metrics[i].topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ }
+ if(metrics[i].topic_alias){
+ db__messages_easy_queue(NULL, metrics[i].topic_alias, SYS_TREE_QOS, len, buf, 1, 0, NULL);
+ }
+ }
}
last_update = db.now_s;
diff --git a/src/sys_tree.h b/src/sys_tree.h
index ed5e7a21..de5ec5f2 100644
--- a/src/sys_tree.h
+++ b/src/sys_tree.h
@@ -20,56 +20,77 @@ Contributors:
#define SYS_TREE_H
#if defined(WITH_SYS_TREE) && defined(WITH_BROKER)
-extern uint64_t g_bytes_received;
-extern uint64_t g_bytes_sent;
-extern uint64_t g_pub_bytes_received;
-extern uint64_t g_pub_bytes_sent;
-extern int64_t g_out_packet_bytes;
-extern unsigned long g_msgs_received;
-extern unsigned long g_msgs_sent;
-extern unsigned long g_pub_msgs_received;
-extern unsigned long g_pub_msgs_sent;
-extern unsigned long g_msgs_dropped;
-extern long g_out_packet_count;
-extern unsigned int g_clients_expired;
-extern unsigned int g_socket_connections;
-extern unsigned int g_connection_count;
-#define G_BYTES_RECEIVED_INC(A) (g_bytes_received+=(uint64_t)(A))
-#define G_BYTES_SENT_INC(A) (g_bytes_sent+=(uint64_t)(A))
-#define G_PUB_BYTES_RECEIVED_INC(A) (g_pub_bytes_received+=(A))
-#define G_PUB_BYTES_SENT_INC(A) (g_pub_bytes_sent+=(A))
-#define G_MSGS_RECEIVED_INC(A) (g_msgs_received+=(A))
-#define G_MSGS_SENT_INC(A) (g_msgs_sent+=(A))
-#define G_PUB_MSGS_RECEIVED_INC(A) (g_pub_msgs_received+=(A))
-#define G_PUB_MSGS_SENT_INC(A) (g_pub_msgs_sent+=(A))
-#define G_MSGS_DROPPED_INC() (g_msgs_dropped++)
-#define G_CLIENTS_EXPIRED_INC() (g_clients_expired++)
-#define G_SOCKET_CONNECTIONS_INC() (g_socket_connections++)
-#define G_CONNECTION_COUNT_INC() (g_connection_count++)
-#define G_OUT_PACKET_COUNT_INC(A) (g_out_packet_count+=(A))
-#define G_OUT_PACKET_COUNT_DEC(A) (g_out_packet_count-=(A))
-#define G_OUT_PACKET_BYTES_INC(A) (g_out_packet_bytes+=(A))
-#define G_OUT_PACKET_BYTES_DEC(A) (g_out_packet_bytes-=(A))
+/* This ordering *must* match the metrics array in sys_tree.c. */
+enum mosq_metric_type{
+ mosq_gauge_clients_total = 0,
+ mosq_counter_clients_maximum = 1,
+ mosq_gauge_clients_disconnected = 2,
+ mosq_gauge_clients_connected = 3,
+ mosq_counter_clients_expired = 4,
+ mosq_gauge_message_store_count = 5,
+ mosq_gauge_message_store_bytes = 6,
+ mosq_gauge_subscriptions = 7,
+ mosq_gauge_shared_subscriptions = 8,
+ mosq_gauge_retained_messages = 9,
+ mosq_gauge_heap_current = 10,
+ mosq_counter_heap_maximum = 11,
+ mosq_counter_messages_received = 12,
+ mosq_counter_messages_sent = 13,
+ mosq_counter_bytes_received = 14,
+ mosq_counter_bytes_sent = 15,
+ mosq_counter_pub_bytes_received = 16,
+ mosq_counter_pub_bytes_sent = 17,
+ mosq_gauge_out_packets = 18,
+ mosq_gauge_out_packet_bytes = 19,
+ mosq_counter_socket_connections = 20,
+ mosq_counter_mqtt_connect_received = 21,
+ mosq_counter_mqtt_publish_dropped = 22,
+ mosq_counter_mqtt_publish_received = 23,
+ mosq_counter_mqtt_publish_sent = 24,
-#else
+ mosq_metric_max,
+};
+
+/* This ordering *must* match the metrics_load array in sys_tree.c. */
+enum mosq_metric_load_type{
+ mosq_load_messages_received_1min = 0,
+ mosq_load_messages_received_5min = 1,
+ mosq_load_messages_received_15min = 2,
+ mosq_load_messages_sent_1min = 3,
+ mosq_load_messages_sent_5min = 4,
+ mosq_load_messages_sent_15min = 5,
+ mosq_load_pub_messages_dropped_1min = 6,
+ mosq_load_pub_messages_dropped_5min = 7,
+ mosq_load_pub_messages_dropped_15min = 8,
+ mosq_load_pub_messages_received_1min = 9,
+ mosq_load_pub_messages_received_5min = 10,
+ mosq_load_pub_messages_received_15min = 11,
+ mosq_load_pub_messages_sent_1min = 12,
+ mosq_load_pub_messages_sent_5min = 13,
+ mosq_load_pub_messages_sent_15min = 14,
+ mosq_load_bytes_received_1min = 15,
+ mosq_load_bytes_received_5min = 16,
+ mosq_load_bytes_received_15min = 17,
+ mosq_load_bytes_sent_1min = 18,
+ mosq_load_bytes_sent_5min = 19,
+ mosq_load_bytes_sent_15min = 20,
+ mosq_load_sockets_1min = 21,
+ mosq_load_sockets_5min = 22,
+ mosq_load_sockets_15min = 23,
+ mosq_load_connections_1min = 24,
+ mosq_load_connections_5min = 25,
+ mosq_load_connections_15min = 26,
+
+ mosq_metric_load_max,
+};
-#define G_BYTES_RECEIVED_INC(A)
-#define G_BYTES_SENT_INC(A)
-#define G_PUB_BYTES_RECEIVED_INC(A)
-#define G_PUB_BYTES_SENT_INC(A)
-#define G_MSGS_RECEIVED_INC(A)
-#define G_MSGS_SENT_INC(A)
-#define G_PUB_MSGS_RECEIVED_INC(A)
-#define G_PUB_MSGS_SENT_INC(A)
-#define G_MSGS_DROPPED_INC()
-#define G_CLIENTS_EXPIRED_INC()
-#define G_SOCKET_CONNECTIONS_INC()
-#define G_CONNECTION_COUNT_INC()
-#define G_OUT_PACKET_COUNT_INC(A)
-#define G_OUT_PACKET_COUNT_DEC(A)
-#define G_OUT_PACKET_BYTES_INC(A)
-#define G_OUT_PACKET_BYTES_DEC(A)
+void metrics__int_inc(enum mosq_metric_type m, int64_t value);
+void metrics__int_dec(enum mosq_metric_type m, int64_t value);
+
+#else
+# define metrics__int_inc(A, B)
+# define metrics__int_dec(A, B)
#endif
diff --git a/src/websockets.c b/src/websockets.c
index 20e3f345..5558fec5 100644
--- a/src/websockets.c
+++ b/src/websockets.c
@@ -236,7 +236,7 @@ static int callback_mqtt(
}
ucount = (unsigned int)count;
#ifdef WITH_SYS_TREE
- g_bytes_sent += ucount;
+ metrics__int_inc(mosq_counter_bytes_sent, ucount);
#endif
packet->to_process -= ucount;
packet->pos += ucount;
@@ -251,9 +251,9 @@ static int callback_mqtt(
}
#ifdef WITH_SYS_TREE
- g_msgs_sent++;
+ metrics__int_inc(mosq_counter_messages_sent, 1);
if(((packet->command)&0xF0) == CMD_PUBLISH){
- g_pub_msgs_sent++;
+ metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
}
#endif
@@ -280,7 +280,7 @@ static int callback_mqtt(
mosq = u->mosq;
pos = 0;
buf = (uint8_t *)in;
- G_BYTES_RECEIVED_INC(len);
+ metrics__int_inc(mosq_counter_bytes_received, (int64_t)len);
while(pos < len){
if(!mosq->in_packet.command){
mosq->in_packet.command = buf[pos];
@@ -336,10 +336,7 @@ static int callback_mqtt(
mosq->in_packet.pos = 0;
#ifdef WITH_SYS_TREE
- G_MSGS_RECEIVED_INC(1);
- if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){
- G_PUB_MSGS_RECEIVED_INC(1);
- }
+ metrics__int_inc(mosq_counter_messages_received, 1);
#endif
rc = handle__packet(mosq);
diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c
index 07237e3c..3a2dff84 100644
--- a/test/unit/persist_write_stubs.c
+++ b/test/unit/persist_write_stubs.c
@@ -7,21 +7,7 @@
#include
#include
#include
-
-uint64_t g_bytes_received;
-uint64_t g_bytes_sent;
-uint64_t g_pub_bytes_received;
-uint64_t g_pub_bytes_sent;
-int64_t g_out_packet_bytes;
-unsigned long g_msgs_received;
-unsigned long g_msgs_sent;
-unsigned long g_pub_msgs_received;
-unsigned long g_pub_msgs_sent;
-unsigned long g_msgs_dropped;
-long g_out_packet_count;
-unsigned int g_clients_expired;
-unsigned int g_socket_connections;
-unsigned int g_connection_count;
+#include
extern uint64_t last_retained;
extern char *last_sub;
@@ -292,3 +278,12 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt
UNUSED(properties);
return 0;
}
+
+void metrics__int_inc(enum mosq_metric_type m, int64_t value)
+{
+ UNUSED(m); UNUSED(value);
+}
+void metrics__int_dec(enum mosq_metric_type m, int64_t value)
+{
+ UNUSED(m); UNUSED(value);
+}
diff --git a/test/unit/subs_stubs.c b/test/unit/subs_stubs.c
index 1a274b72..a3280cc5 100644
--- a/test/unit/subs_stubs.c
+++ b/test/unit/subs_stubs.c
@@ -9,21 +9,7 @@
#include
#include
#include
-
-uint64_t g_bytes_received;
-uint64_t g_bytes_sent;
-uint64_t g_pub_bytes_received;
-uint64_t g_pub_bytes_sent;
-int64_t g_out_packet_bytes;
-unsigned long g_msgs_received;
-unsigned long g_msgs_sent;
-unsigned long g_pub_msgs_received;
-unsigned long g_pub_msgs_sent;
-unsigned long g_msgs_dropped;
-long g_out_packet_count;
-unsigned int g_clients_expired;
-unsigned int g_socket_connections;
-unsigned int g_connection_count;
+#include
int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt, ...)
{
@@ -233,3 +219,12 @@ int session_expiry__add_from_persistence(struct mosquitto *context, time_t expir
UNUSED(expiry_time);
return 0;
}
+
+void metrics__int_inc(enum mosq_metric_type m, int64_t value)
+{
+ UNUSED(m); UNUSED(value);
+}
+void metrics__int_dec(enum mosq_metric_type m, int64_t value)
+{
+ UNUSED(m); UNUSED(value);
+}