Simplify $SYS metric generation.

pull/2743/merge
Roger A. Light 3 years ago
parent 105a652fbf
commit a791532c12

@ -42,14 +42,8 @@ Contributors:
# include "sys_tree.h"
# include "send_mosq.h"
#else
# define G_BYTES_RECEIVED_INC(A)
# define G_BYTES_SENT_INC(A)
# define G_MSGS_SENT_INC(A)
# define G_PUB_MSGS_SENT_INC(A)
# define G_OUT_PACKET_COUNT_INC(A)
# define G_OUT_PACKET_COUNT_DEC(A)
# define G_OUT_PACKET_BYTES_INC(A)
# define G_OUT_PACKET_BYTES_DEC(A)
# define metrics__int_inc(stat, val)
# define metrics__int_dec(stat, val)
#endif
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
@ -124,8 +118,8 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq)
mosquitto__FREE(packet);
}
G_OUT_PACKET_COUNT_DEC(mosq->out_packet_count);
G_OUT_PACKET_BYTES_DEC(mosq->out_packet_bytes);
metrics__int_dec(mosq_gauge_out_packets, mosq->out_packet_count);
metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet_bytes);
mosq->out_packet_count = 0;
mosq->out_packet_bytes = 0;
mosq->out_packet_last = NULL;
@ -152,8 +146,8 @@ static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packe
mosq->out_packet_last = packet;
mosq->out_packet_count++;
mosq->out_packet_bytes += packet->packet_length;
G_OUT_PACKET_COUNT_INC(1);
G_OUT_PACKET_BYTES_INC(packet->packet_length);
metrics__int_inc(mosq_gauge_out_packets, 1);
metrics__int_inc(mosq_gauge_out_packet_bytes, packet->packet_length);
pthread_mutex_unlock(&mosq->out_packet_mutex);
}
@ -236,8 +230,8 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
if(mosq->out_packet){
mosq->out_packet_count--;
mosq->out_packet_bytes -= mosq->out_packet->packet_length;
G_OUT_PACKET_COUNT_DEC(1);
G_OUT_PACKET_BYTES_DEC(mosq->out_packet->packet_length);
metrics__int_dec(mosq_gauge_out_packets, 1);
metrics__int_dec(mosq_gauge_out_packet_bytes, mosq->out_packet->packet_length);
mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){
@ -283,7 +277,7 @@ int packet__write(struct mosquitto *mosq)
while(packet->to_process > 0){
write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
G_BYTES_SENT_INC(write_length);
metrics__int_inc(mosq_counter_bytes_sent, write_length);
packet->to_process -= (uint32_t)write_length;
packet->pos += (uint32_t)write_length;
}else{
@ -309,17 +303,14 @@ int packet__write(struct mosquitto *mosq)
}
}
G_MSGS_SENT_INC(1);
metrics__int_inc(mosq_counter_messages_sent, 1);
if(((packet->command)&0xF6) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER
callback__on_publish(mosq, packet->mid, 0, NULL);
}else if(((packet->command)&0xF0) == CMD_DISCONNECT){
do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
return MOSQ_ERR_SUCCESS;
#endif
}else if(((packet->command)&0xF0) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1);
}
next_packet = packet__get_next_out(mosq);
@ -390,7 +381,7 @@ int packet__read(struct mosquitto *mosq)
if(read_length == 1){
mosq->in_packet.command = byte;
#ifdef WITH_BROKER
G_BYTES_RECEIVED_INC(1);
metrics__int_inc(mosq_counter_bytes_received, 1);
/* Clients must send CONNECT as their first command. */
if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
@ -438,7 +429,7 @@ int packet__read(struct mosquitto *mosq)
return MOSQ_ERR_MALFORMED_PACKET;
}
G_BYTES_RECEIVED_INC(1);
metrics__int_inc(mosq_counter_bytes_received, 1);
mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
mosq->in_packet.remaining_mult *= 128;
}else{
@ -518,7 +509,7 @@ int packet__read(struct mosquitto *mosq)
while(mosq->in_packet.to_process>0){
read_length = local__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){
G_BYTES_RECEIVED_INC(read_length);
metrics__int_inc(mosq_counter_bytes_received, read_length);
mosq->in_packet.to_process -= (uint32_t)read_length;
mosq->in_packet.pos += (uint32_t)read_length;
}else{
@ -557,10 +548,7 @@ int packet__read(struct mosquitto *mosq)
/* All data for this packet is read. */
mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
G_MSGS_RECEIVED_INC(1);
if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){
G_PUB_MSGS_RECEIVED_INC(1);
}
metrics__int_inc(mosq_counter_messages_received, 1);
#endif
rc = handle__packet(mosq);

@ -22,6 +22,7 @@ Contributors:
#ifdef WITH_BROKER
# include "mosquitto_broker_internal.h"
# include "sys_tree.h"
#endif
#include "mosquitto.h"

@ -24,6 +24,7 @@ Contributors:
#ifdef WITH_BROKER
# include "mosquitto_broker_internal.h"
# include "sys_tree.h"
#endif
#include "mosquitto.h"
@ -172,4 +173,3 @@ int send__simple_command(struct mosquitto *mosq, uint8_t command)
return packet__queue(mosq, packet);
}

@ -25,7 +25,7 @@ Contributors:
# include "mosquitto_broker_internal.h"
# include "sys_tree.h"
#else
# define G_PUB_BYTES_SENT_INC(A)
# define metrics__int_inc(stat, val)
#endif
#include "alias_mosq.h"
@ -156,7 +156,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
mapped_topic = topic_temp;
}
log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, mapped_topic, (long)payloadlen);
G_PUB_BYTES_SENT_INC(payloadlen);
metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
mosquitto__FREE(mapped_topic);
return rc;
@ -166,7 +166,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
}
#endif
log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
G_PUB_BYTES_SENT_INC(payloadlen);
metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
#else
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
#endif
@ -301,6 +301,10 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
#endif
}
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
#endif
/* Payload */
if(payloadlen && payload){
packet__write_bytes(packet, payload, payloadlen);

@ -93,4 +93,3 @@ int send__subscribe(struct mosquitto *mosq, int *mid, int topic_count, char *con
return packet__queue(mosq, packet);
}

@ -419,9 +419,8 @@
<varlistentry>
<term><option>$SYS/broker/clients/total</option></term>
<listitem>
<para>The total number of active and inactive clients
currently connected and registered on the
broker.</para>
<para>The total number of connected and disconnected client sessions
currently registered on the broker.</para>
</listitem>
</varlistentry>
<varlistentry>
@ -437,7 +436,17 @@
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/heap/current size</option></term>
<term><option>$SYS/broker/connections/socket/count</option></term>
<listitem>
<para>
The total number of socket connections that have been
made to the broker, whether or not the MQTT connections
were ultimately successful.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/heap/current</option></term>
<listitem>
<para>The current size of the heap memory in use by
mosquitto. Note that this topic may be unavailable
@ -445,7 +454,7 @@
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/heap/maximum size</option></term>
<term><option>$SYS/broker/heap/maximum</option></term>
<listitem>
<para>The largest amount of heap memory used by
mosquitto. Note that this topic may be unavailable
@ -550,13 +559,6 @@
or 15 minutes.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/messages/inflight</option></term>
<listitem>
<para>The number of messages with QoS>0 that are awaiting
acknowledgments.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/messages/received</option></term>
<listitem>
@ -569,6 +571,153 @@
<para>The total number of messages of any type sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/connect/received</option></term>
<listitem>
<para>The total number of MQTT CONNECT messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/connack/sent</option></term>
<listitem>
<para>The total number of MQTT CONNACK messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/publish/dropped</option></term>
<term><option>$SYS/broker/publish/messages/dropped</option></term>
<listitem>
<para>
The total number of MQTT PUBLISH messages that have been
dropped due to inflight/queuing limits. See the
max_inflight_messages and max_queued_messages options
in <citerefentry><refentrytitle><link xlink:href="mosquitto-conf-5.html">mosquitto.conf</link></refentrytitle><manvolnum>5</manvolnum></citerefentry>
for more information.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/publish/received</option></term>
<term><option>$SYS/broker/publish/messages/received</option></term>
<listitem>
<para>The total number of MQTT PUBLISH messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/publish/sent</option></term>
<term><option>$SYS/broker/publish/messages/sent</option></term>
<listitem>
<para>The total number of MQTT PUBLISH messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/puback/received</option></term>
<listitem>
<para>The total number of MQTT PUBACK messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/puback/sent</option></term>
<listitem>
<para>The total number of MQTT PUBACK messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubrec/received</option></term>
<listitem>
<para>The total number of MQTT PUBREC messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubrec/sent</option></term>
<listitem>
<para>The total number of MQTT PUBREC messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubrel/received</option></term>
<listitem>
<para>The total number of MQTT PUBREL messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubrel/sent</option></term>
<listitem>
<para>The total number of MQTT PUBREL messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubcomp/received</option></term>
<listitem>
<para>The total number of MQTT PUBCOMP messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pubcomp/sent</option></term>
<listitem>
<para>The total number of MQTT PUBCOMP messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/subscribe/received</option></term>
<listitem>
<para>The total number of MQTT SUBSCRIBE messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/suback/sent</option></term>
<listitem>
<para>The total number of MQTT SUBACK messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/unsubscribe/received</option></term>
<listitem>
<para>The total number of MQTT UNSUBSCRIBE messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/unsuback/sent</option></term>
<listitem>
<para>The total number of MQTT UNSUBACK messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pingreq/received</option></term>
<listitem>
<para>The total number of MQTT PINGREQ messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/pingresp/sent</option></term>
<listitem>
<para>The total number of MQTT PINGRESP messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/disconnect/received</option></term>
<listitem>
<para>The total number of MQTT DISCONNECT messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/disconnect/sent</option></term>
<listitem>
<para>The total number of MQTT DISCONNECT messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/auth/received</option></term>
<listitem>
<para>The total number of MQTT AUTH messages received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/mqtt/auth/sent</option></term>
<listitem>
<para>The total number of MQTT AUTH messages sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/packet/out/count</option></term>
<listitem>
@ -592,26 +741,15 @@
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/publish/messages/dropped</option></term>
<listitem>
<para>The total number of publish messages that have been
dropped due to inflight/queuing limits. See the
max_inflight_messages and max_queued_messages options
in
<citerefentry><refentrytitle><link xlink:href="mosquitto-conf-5.html">mosquitto.conf</link></refentrytitle><manvolnum>5</manvolnum></citerefentry>
for more information.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/publish/messages/received</option></term>
<term><option>$SYS/broker/publish/bytes/received</option></term>
<listitem>
<para>The total number of PUBLISH messages received since the broker started.</para>
<para>The total number of PUBLISH payload bytes received since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/publish/messages/sent</option></term>
<term><option>$SYS/broker/publish/bytes/sent</option></term>
<listitem>
<para>The total number of PUBLISH messages sent since the broker started.</para>
<para>The total number of PUBLISH payload bytes sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
@ -637,6 +775,12 @@
and messages queued for durable clients.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/shared_subscriptions/count</option></term>
<listitem>
<para>The total number of shared subscriptions active on the broker.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/subscriptions/count</option></term>
<listitem>

@ -849,8 +849,8 @@ static void bridge__packet_cleanup(struct mosquitto *context)
}
context->out_packet = NULL;
context->out_packet_last = NULL;
G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;

@ -165,8 +165,8 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next;
mosquitto__FREE(packet);
}
G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
metrics__int_dec(mosq_gauge_out_packets, context->out_packet_count);
metrics__int_dec(mosq_gauge_out_packet_bytes, context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)

@ -470,7 +470,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
"Outgoing messages are being dropped for client %s.",
context->id);
}
G_MSGS_DROPPED_INC();
metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
context->stats.messages_dropped++;
return 2;
@ -601,14 +601,14 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
"Outgoing messages are being dropped for client %s.",
context->id);
}
G_MSGS_DROPPED_INC();
metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
return 2;
}
}else{
if (db__ready_for_queue(context, qos, msg_data)){
state = mosq_ms_queued;
}else{
G_MSGS_DROPPED_INC();
metrics__int_inc(mosq_counter_mqtt_publish_dropped, 1);
if(context->is_dropping == false){
context->is_dropping = true;
log__printf(NULL, MOSQ_LOG_NOTICE,

@ -593,8 +593,6 @@ int handle__connect(struct mosquitto *context)
uint16_t auth_data_out_len = 0;
bool allow_zero_length_clientid;
G_CONNECTION_COUNT_INC();
if(!context->listener){
return MOSQ_ERR_INVAL;
}

@ -215,7 +215,7 @@ int handle__publish(struct mosquitto *context)
}
base_msg->data.payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(base_msg->data.payloadlen);
metrics__int_inc(mosq_counter_pub_bytes_received, base_msg->data.payloadlen);
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + strlen(base_msg->data.topic) + 1;
topic_mount = mosquitto__malloc(len+1);

@ -194,7 +194,7 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
db.next_event_ms = 86400000;
#ifdef WITH_SYS_TREE
if(db.config->sys_interval > 0){
sys_tree__update();
sys_tree__update(false);
}
#endif

@ -731,7 +731,7 @@ int db__message_reconnect_reset(struct mosquitto *context);
bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_direction dir, int qos);
bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_msg_data *msg_data);
void sys_tree__init(void);
void sys_tree__update(void);
void sys_tree__update(bool force);
int db__message_write_inflight_out_all(struct mosquitto *context);
int db__message_write_inflight_out_latest(struct mosquitto *context);
int db__message_write_queued_out(struct mosquitto *context);

@ -147,7 +147,7 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock
return NULL;
}
G_SOCKET_CONNECTIONS_INC();
metrics__int_inc(mosq_counter_socket_connections, 1);
if(net__socket_nonblock(&new_sock)){
return NULL;

@ -52,6 +52,7 @@ int handle__packet(struct mosquitto *context)
rc = handle__pubackcomp(context, "PUBCOMP");
break;
case CMD_PUBLISH:
metrics__int_inc(mosq_counter_mqtt_publish_received, 1);
rc = handle__publish(context);
break;
case CMD_PUBREC:
@ -61,6 +62,7 @@ int handle__packet(struct mosquitto *context)
rc = handle__pubrel(context);
break;
case CMD_CONNECT:
metrics__int_inc(mosq_counter_mqtt_connect_received, 1);
return handle__connect(context);
case CMD_DISCONNECT:
rc = handle__disconnect(context);

@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"
int send__auth(struct mosquitto *context, uint8_t reason_code, const void *auth_data, uint16_t auth_data_len)

@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"
int send__connack(struct mosquitto *context, uint8_t ack, uint8_t reason_code, const mosquitto_property *properties)

@ -23,6 +23,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "sys_tree.h"
#include "util_mosq.h"

@ -25,6 +25,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "sys_tree.h"
int send__unsuback(struct mosquitto *mosq, uint16_t mid, int reason_code_count, uint8_t *reason_codes, const mosquitto_property *properties)

@ -175,7 +175,7 @@ void session_expiry__check(void)
if(context->id){
log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring client %s due to timeout.", context->id);
}
G_CLIENTS_EXPIRED_INC();
metrics__int_inc(mosq_counter_clients_expired, 1);
/* Session has now expired, so clear interval */
context->session_expiry_interval = 0;

@ -34,25 +34,90 @@ Contributors:
#define SYS_TREE_QOS 2
uint64_t g_bytes_received = 0;
uint64_t g_bytes_sent = 0;
uint64_t g_pub_bytes_received = 0;
uint64_t g_pub_bytes_sent = 0;
int64_t g_out_packet_bytes = 0;
unsigned long g_msgs_received = 0;
unsigned long g_msgs_sent = 0;
unsigned long g_pub_msgs_received = 0;
unsigned long g_pub_msgs_sent = 0;
unsigned long g_msgs_dropped = 0;
long g_out_packet_count = 0;
unsigned int g_clients_expired = 0;
unsigned int g_socket_connections = 0;
unsigned int g_connection_count = 0;
#define METRIC_LOAD_1MIN 1
#define METRIC_LOAD_5MIN 2
#define METRIC_LOAD_15MIN 3
struct metric{
int64_t current;
int64_t next;
const char *topic, *topic_alias;
bool is_max;
};
struct metric_load{
double current;
const char *topic;
int load_ref;
uint8_t interval;
};
struct metric metrics[mosq_metric_max] = {
{ 1, 0, "$SYS/broker/clients/total", NULL, false }, /* mosq_gauge_clients_total */
{ 1, 0, "$SYS/broker/clients/maximum", NULL, true }, /* metric_clients_maximum */
{ 1, 0, "$SYS/broker/clients/disconnected", "$SYS/broker/clients/inactive", false }, /* mosq_gauge_clients_disconnected */
{ 1, 0, "$SYS/broker/clients/connected", "$SYS/broker/clients/active", false }, /* mosq_gauge_clients_connected */
{ 1, 0, "$SYS/broker/clients/expired", NULL, false }, /* mosq_counter_clients_expired */
{ 1, 0, "$SYS/broker/messages/stored", "$SYS/broker/store/messages/count", false }, /* mosq_gauge_message_store_count */
{ 1, 0, "$SYS/broker/store/messages/bytes", NULL, false }, /* mosq_gauge_message_store_bytes */
{ 1, 0, "$SYS/broker/subscriptions/count", NULL, false }, /* mosq_gauge_subscription_count */
{ 1, 0, "$SYS/broker/shared_subscriptions/count", NULL, false }, /* mosq_gauge_shared_subscription_count */
{ 1, 0, "$SYS/broker/retained messages/count", NULL, false }, /* mosq_gauge_retained_message_count */
#ifdef REAL_WITH_MEMORY_TRACKING
{ 1, 0, "$SYS/broker/heap/current", NULL, false }, /* mosq_gauge_heap_current */
{ 1, 0, "$SYS/broker/heap/maximum", NULL, true }, /* mosq_gauge_heap_maximum */
#else
{ 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_current */
{ 1, 0, NULL, NULL, NULL, 0 }, /* mosq_gauge_heap_maximum */
#endif
{ 1, 0, "$SYS/broker/messages/received", NULL, false }, /* mosq_counter_messages_received */
{ 1, 0, "$SYS/broker/messages/sent", NULL, false }, /* mosq_counter_messages_sent */
{ 1, 0, "$SYS/broker/bytes/received", NULL, false }, /* mosq_counter_bytes_received */
{ 1, 0, "$SYS/broker/bytes/sent", NULL, false }, /* mosq_counter_bytes_sent */
{ 1, 0, "$SYS/broker/publish/bytes/received", NULL, false }, /* mosq_counter_pub_bytes_received */
{ 1, 0, "$SYS/broker/publish/bytes/sent", NULL, false }, /* mosq_counter_pub_bytes_sent */
{ 1, 0, "$SYS/broker/packet/out/count", NULL, false }, /* mosq_gauge_out_packet_count */
{ 1, 0, "$SYS/broker/packet/out/bytes", NULL, false }, /* mosq_gauge_out_packet_bytes */
{ 1, 0, "$SYS/broker/connections/socket/count", NULL, false }, /* mosq_counter_socket_connections */
{ 1, 0, NULL, NULL, false }, /* mosq_counter_mqtt_connect_received */
{ 1, 0, "$SYS/broker/publish/messages/dropped", NULL, false }, /* mosq_counter_mqtt_publish_dropped */
{ 1, 0, "$SYS/broker/publish/messages/received", NULL, false }, /* mosq_counter_mqtt_publish_received */
{ 1, 0, "$SYS/broker/publish/messages/sent", NULL, false }, /* mosq_counter_mqtt_publish_sent */
};
struct metric_load metric_loads[mosq_metric_load_max] = {
{ 0.0, "$SYS/broker/load/messages/received/1min", mosq_counter_messages_received, METRIC_LOAD_1MIN }, /* metric_load_messages_received_1min */
{ 0.0, "$SYS/broker/load/messages/received/5min", mosq_counter_messages_received, METRIC_LOAD_5MIN }, /* metric_load_messages_received_5min */
{ 0.0, "$SYS/broker/load/messages/received/15min", mosq_counter_messages_received, METRIC_LOAD_15MIN }, /* metric_load_messages_received_15min */
{ 0.0, "$SYS/broker/load/messages/sent/1min", mosq_counter_messages_sent, METRIC_LOAD_1MIN }, /* metric_load_messages_sent_1min */
{ 0.0, "$SYS/broker/load/messages/sent/5min", mosq_counter_messages_sent, METRIC_LOAD_5MIN }, /* metric_load_messages_sent_5min */
{ 0.0, "$SYS/broker/load/messages/sent/15min", mosq_counter_messages_sent, METRIC_LOAD_15MIN }, /* metric_load_messages_sent_15min */
{ 0.0, "$SYS/broker/load/publish/dropped/1min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_dropped_1min */
{ 0.0, "$SYS/broker/load/publish/dropped/5min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_dropped_5min */
{ 0.0, "$SYS/broker/load/publish/dropped/15min", mosq_counter_mqtt_publish_dropped, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_dropped_15min */
{ 0.0, "$SYS/broker/load/publish/received/1min", mosq_counter_mqtt_publish_received, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_received_1min */
{ 0.0, "$SYS/broker/load/publish/received/5min", mosq_counter_mqtt_publish_received, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_received_5min */
{ 0.0, "$SYS/broker/load/publish/received/15min", mosq_counter_mqtt_publish_received, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_received_15min */
{ 0.0, "$SYS/broker/load/publish/sent/1min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_1MIN }, /* metric_load_pub_messages_sent_1min */
{ 0.0, "$SYS/broker/load/publish/sent/5min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_5MIN }, /* metric_load_pub_messages_sent_5min */
{ 0.0, "$SYS/broker/load/publish/sent/15min", mosq_counter_mqtt_publish_sent, METRIC_LOAD_15MIN }, /* metric_load_pub_messages_sent_15min */
{ 0.0, "$SYS/broker/load/bytes/received/1min", mosq_counter_bytes_received, METRIC_LOAD_1MIN }, /* metric_load_bytes_received_1min */
{ 0.0, "$SYS/broker/load/bytes/received/5min", mosq_counter_bytes_received, METRIC_LOAD_5MIN }, /* metric_load_bytes_received_5min */
{ 0.0, "$SYS/broker/load/bytes/received/15min", mosq_counter_bytes_received, METRIC_LOAD_15MIN }, /* metric_load_bytes_received_15min */
{ 0.0, "$SYS/broker/load/bytes/sent/1min", mosq_counter_bytes_sent, METRIC_LOAD_1MIN }, /* metric_load_bytes_sent_1min */
{ 0.0, "$SYS/broker/load/bytes/sent/5min", mosq_counter_bytes_sent, METRIC_LOAD_5MIN }, /* metric_load_bytes_sent_5min */
{ 0.0, "$SYS/broker/load/bytes/sent/15min", mosq_counter_bytes_sent, METRIC_LOAD_15MIN }, /* metric_load_bytes_sent_15min */
{ 0.0, "$SYS/broker/load/sockets/1min", mosq_counter_socket_connections, METRIC_LOAD_1MIN }, /* metric_load_socket_connections_1min */
{ 0.0, "$SYS/broker/load/sockets/5min", mosq_counter_socket_connections, METRIC_LOAD_5MIN }, /* metric_load_socket_connections_5min */
{ 0.0, "$SYS/broker/load/sockets/15min", mosq_counter_socket_connections, METRIC_LOAD_15MIN }, /* metric_load_socket_connections_15min */
{ 0.0, "$SYS/broker/load/connections/1min", mosq_counter_mqtt_connect_received, METRIC_LOAD_1MIN }, /* metric_load_connections_1min */
{ 0.0, "$SYS/broker/load/connections/5min", mosq_counter_mqtt_connect_received, METRIC_LOAD_5MIN }, /* metric_load_connections_5min */
{ 0.0, "$SYS/broker/load/connections/15min", mosq_counter_mqtt_connect_received, METRIC_LOAD_15MIN }, /* metric_load_connections_15min */
};
static time_t start_time = 0;
static time_t last_update = 0;
void sys_tree__init(void)
{
char buf[64];
@ -68,156 +133,50 @@ void sys_tree__init(void)
start_time = mosquitto_time();
last_update = start_time;
sys_tree__update(true);
}
static void sys_tree__update_clients(char *buf)
void metrics__int_inc(enum mosq_metric_type m, int64_t value)
{
static unsigned int client_count = UINT_MAX;
static unsigned int clients_expired = UINT_MAX;
static unsigned int client_max = 0;
static unsigned int disconnected_count = UINT_MAX;
static unsigned int connected_count = UINT_MAX;
uint32_t len;
unsigned int count_total, count_by_sock;
count_total = HASH_CNT(hh_id, db.contexts_by_id);
count_by_sock = HASH_CNT(hh_sock, db.contexts_by_sock);
if(client_count != count_total){
client_count = count_total;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/total", SYS_TREE_QOS, len, buf, 1, 0, NULL);
if(client_count > client_max){
client_max = client_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", client_max);
db__messages_easy_queue(NULL, "$SYS/broker/clients/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
}
if(disconnected_count != count_total-count_by_sock){
disconnected_count = count_total-count_by_sock;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", disconnected_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/inactive", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/clients/disconnected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(connected_count != count_by_sock){
connected_count = count_by_sock;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", connected_count);
db__messages_easy_queue(NULL, "$SYS/broker/clients/active", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/clients/connected", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(g_clients_expired != clients_expired){
clients_expired = g_clients_expired;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", clients_expired);
db__messages_easy_queue(NULL, "$SYS/broker/clients/expired", SYS_TREE_QOS, len, buf, 1, 0, NULL);
if(m < mosq_metric_max){
metrics[m].next += value;
}
}
#ifdef REAL_WITH_MEMORY_TRACKING
static void sys_tree__update_memory(char *buf)
void metrics__int_dec(enum mosq_metric_type m, int64_t value)
{
static unsigned long current_heap = ULONG_MAX;
static unsigned long max_heap = ULONG_MAX;
unsigned long value_ul;
uint32_t len;
value_ul = mosquitto__memory_used();
if(current_heap != value_ul){
current_heap = value_ul;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", current_heap);
db__messages_easy_queue(NULL, "$SYS/broker/heap/current", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
value_ul =mosquitto__max_memory_used();
if(max_heap != value_ul){
max_heap = value_ul;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", max_heap);
db__messages_easy_queue(NULL, "$SYS/broker/heap/maximum", SYS_TREE_QOS, len, buf, 1, 0, NULL);
if(m < mosq_metric_max){
metrics[m].next -= value;
}
}
#endif
static void calc_load(char *buf, const char *topic, double exponent, double interval, double *current)
static void calc_load(char *buf, double exponent, double i_mult, struct metric_load *m)
{
double new_value;
uint32_t len;
double interval;
new_value = interval + exponent*((*current) - interval);
if(fabs(new_value - (*current)) >= 0.01){
interval = (double)(metrics[m->load_ref].next - metrics[m->load_ref].current)*i_mult;
new_value = interval + exponent*(m->current - interval);
if(fabs(new_value - (m->current)) >= 0.01){
len = (uint32_t)snprintf(buf, BUFLEN, "%.2f", new_value);
db__messages_easy_queue(NULL, topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, m->topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
(*current) = new_value;
m->current = new_value;
}
/* Send messages for the $SYS hierarchy if the last update is longer than
* 'interval' seconds ago.
* 'interval' is the amount of seconds between updates. If 0, then no periodic
* messages are sent for the $SYS hierarchy.
* 'start_time' is the result of time() that the broker was started at.
*/
void sys_tree__update(void)
void sys_tree__update(bool force)
{
time_t uptime;
char buf[BUFLEN];
static int msg_store_count = INT_MAX;
static unsigned long msg_store_bytes = ULONG_MAX;
static unsigned long msgs_received = ULONG_MAX;
static unsigned long msgs_sent = ULONG_MAX;
static unsigned long publish_dropped = ULONG_MAX;
static unsigned long pub_msgs_received = ULONG_MAX;
static unsigned long pub_msgs_sent = ULONG_MAX;
static unsigned long long bytes_received = ULLONG_MAX;
static unsigned long long bytes_sent = ULLONG_MAX;
static unsigned long long pub_bytes_received = ULLONG_MAX;
static unsigned long long pub_bytes_sent = ULLONG_MAX;
static int subscription_count = INT_MAX;
static int shared_subscription_count = INT_MAX;
static int retained_count = INT_MAX;
static long out_packet_count = LONG_MAX;
static long long out_packet_bytes = LLONG_MAX;
static double msgs_received_load1 = 0;
static double msgs_received_load5 = 0;
static double msgs_received_load15 = 0;
static double msgs_sent_load1 = 0;
static double msgs_sent_load5 = 0;
static double msgs_sent_load15 = 0;
static double publish_dropped_load1 = 0;
static double publish_dropped_load5 = 0;
static double publish_dropped_load15 = 0;
double msgs_received_interval, msgs_sent_interval, publish_dropped_interval;
static double publish_received_load1 = 0;
static double publish_received_load5 = 0;
static double publish_received_load15 = 0;
static double publish_sent_load1 = 0;
static double publish_sent_load5 = 0;
static double publish_sent_load15 = 0;
double publish_received_interval, publish_sent_interval;
static double bytes_received_load1 = 0;
static double bytes_received_load5 = 0;
static double bytes_received_load15 = 0;
static double bytes_sent_load1 = 0;
static double bytes_sent_load5 = 0;
static double bytes_sent_load15 = 0;
double bytes_received_interval, bytes_sent_interval;
static double socket_load1 = 0;
static double socket_load5 = 0;
static double socket_load15 = 0;
double socket_interval;
static double connection_load1 = 0;
static double connection_load5 = 0;
static double connection_load15 = 0;
double connection_interval;
double exponent;
double i_mult;
uint32_t len;
time_t next_event;
static time_t last_update_real = 0;
@ -231,170 +190,59 @@ void sys_tree__update(void)
}
if(db.config->sys_interval
&& db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s){
&& ((db.now_real_s % db.config->sys_interval == 0 && last_update_real != db.now_real_s) || force)){
uptime = db.now_s - start_time;
len = (uint32_t)snprintf(buf, BUFLEN, "%" PRIu64 " seconds", (uint64_t)uptime);
db__messages_easy_queue(NULL, "$SYS/broker/uptime", SYS_TREE_QOS, len, buf, 1, 0, NULL);
sys_tree__update_clients(buf);
if(db.now_s > last_update){
i_mult = 60.0/(double)(db.now_s-last_update);
msgs_received_interval = (double)(g_msgs_received - msgs_received)*i_mult;
msgs_sent_interval = (double)(g_msgs_sent - msgs_sent)*i_mult;
publish_dropped_interval = (double)(g_msgs_dropped - publish_dropped)*i_mult;
publish_received_interval = (double)(g_pub_msgs_received - pub_msgs_received)*i_mult;
publish_sent_interval = (double)(g_pub_msgs_sent - pub_msgs_sent)*i_mult;
bytes_received_interval = (double)(g_bytes_received - bytes_received)*i_mult;
bytes_sent_interval = (double)(g_bytes_sent - bytes_sent)*i_mult;
socket_interval = g_socket_connections*i_mult;
g_socket_connections = 0;
connection_interval = g_connection_count*i_mult;
g_connection_count = 0;
/* 1 minute load */
exponent = exp(-1.0*(double)(db.now_s-last_update)/60.0);
calc_load(buf, "$SYS/broker/load/messages/received/1min", exponent, msgs_received_interval, &msgs_received_load1);
calc_load(buf, "$SYS/broker/load/messages/sent/1min", exponent, msgs_sent_interval, &msgs_sent_load1);
calc_load(buf, "$SYS/broker/load/publish/dropped/1min", exponent, publish_dropped_interval, &publish_dropped_load1);
calc_load(buf, "$SYS/broker/load/publish/received/1min", exponent, publish_received_interval, &publish_received_load1);
calc_load(buf, "$SYS/broker/load/publish/sent/1min", exponent, publish_sent_interval, &publish_sent_load1);
calc_load(buf, "$SYS/broker/load/bytes/received/1min", exponent, bytes_received_interval, &bytes_received_load1);
calc_load(buf, "$SYS/broker/load/bytes/sent/1min", exponent, bytes_sent_interval, &bytes_sent_load1);
calc_load(buf, "$SYS/broker/load/sockets/1min", exponent, socket_interval, &socket_load1);
calc_load(buf, "$SYS/broker/load/connections/1min", exponent, connection_interval, &connection_load1);
/* 5 minute load */
exponent = exp(-1.0*(double)(db.now_s-last_update)/300.0);
calc_load(buf, "$SYS/broker/load/messages/received/5min", exponent, msgs_received_interval, &msgs_received_load5);
calc_load(buf, "$SYS/broker/load/messages/sent/5min", exponent, msgs_sent_interval, &msgs_sent_load5);
calc_load(buf, "$SYS/broker/load/publish/dropped/5min", exponent, publish_dropped_interval, &publish_dropped_load5);
calc_load(buf, "$SYS/broker/load/publish/received/5min", exponent, publish_received_interval, &publish_received_load5);
calc_load(buf, "$SYS/broker/load/publish/sent/5min", exponent, publish_sent_interval, &publish_sent_load5);
calc_load(buf, "$SYS/broker/load/bytes/received/5min", exponent, bytes_received_interval, &bytes_received_load5);
calc_load(buf, "$SYS/broker/load/bytes/sent/5min", exponent, bytes_sent_interval, &bytes_sent_load5);
calc_load(buf, "$SYS/broker/load/sockets/5min", exponent, socket_interval, &socket_load5);
calc_load(buf, "$SYS/broker/load/connections/5min", exponent, connection_interval, &connection_load5);
/* 15 minute load */
exponent = exp(-1.0*(double)(db.now_s-last_update)/900.0);
calc_load(buf, "$SYS/broker/load/messages/received/15min", exponent, msgs_received_interval, &msgs_received_load15);
calc_load(buf, "$SYS/broker/load/messages/sent/15min", exponent, msgs_sent_interval, &msgs_sent_load15);
calc_load(buf, "$SYS/broker/load/publish/dropped/15min", exponent, publish_dropped_interval, &publish_dropped_load15);
calc_load(buf, "$SYS/broker/load/publish/received/15min", exponent, publish_received_interval, &publish_received_load15);
calc_load(buf, "$SYS/broker/load/publish/sent/15min", exponent, publish_sent_interval, &publish_sent_load15);
calc_load(buf, "$SYS/broker/load/bytes/received/15min", exponent, bytes_received_interval, &bytes_received_load15);
calc_load(buf, "$SYS/broker/load/bytes/sent/15min", exponent, bytes_sent_interval, &bytes_sent_load15);
calc_load(buf, "$SYS/broker/load/sockets/15min", exponent, socket_interval, &socket_load15);
calc_load(buf, "$SYS/broker/load/connections/15min", exponent, connection_interval, &connection_load15);
}
if(db.msg_store_count != msg_store_count){
msg_store_count = db.msg_store_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", msg_store_count);
db__messages_easy_queue(NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, len, buf, 1, 0, NULL);
db__messages_easy_queue(NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if (db.msg_store_bytes != msg_store_bytes){
msg_store_bytes = db.msg_store_bytes;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msg_store_bytes);
db__messages_easy_queue(NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(db.subscription_count != subscription_count){
subscription_count = db.subscription_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", subscription_count);
db__messages_easy_queue(NULL, "$SYS/broker/subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(db.shared_subscription_count != shared_subscription_count){
shared_subscription_count = db.shared_subscription_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", shared_subscription_count);
db__messages_easy_queue(NULL, "$SYS/broker/shared_subscriptions/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(db.retained_count != retained_count){
retained_count = db.retained_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%d", retained_count);
db__messages_easy_queue(NULL, "$SYS/broker/retained messages/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
/* Update metrics values where not otherwise updated */
metrics[mosq_gauge_message_store_count].next = db.msg_store_count;
metrics[mosq_gauge_message_store_bytes].next = (int64_t)db.msg_store_bytes;
metrics[mosq_gauge_subscriptions].next = db.subscription_count;
metrics[mosq_gauge_shared_subscriptions].next = db.shared_subscription_count;
metrics[mosq_gauge_retained_messages].next = db.retained_count;
#ifdef REAL_WITH_MEMORY_TRACKING
sys_tree__update_memory(buf);
metrics[mosq_gauge_heap_current].next = (int64_t)mosquitto__memory_used();
metrics[mosq_counter_heap_maximum].next = (int64_t)mosquitto__max_memory_used();
#endif
metrics[mosq_gauge_clients_total].next = HASH_CNT(hh_id, db.contexts_by_id);
metrics[mosq_counter_clients_maximum].next = HASH_CNT(hh_id, db.contexts_by_id);
metrics[mosq_gauge_clients_connected].next = HASH_CNT(hh_sock, db.contexts_by_sock);
metrics[mosq_gauge_clients_disconnected].next = HASH_CNT(hh_id, db.contexts_by_id) - HASH_CNT(hh_sock, db.contexts_by_sock);
if(msgs_received != g_msgs_received){
msgs_received = g_msgs_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_received);
db__messages_easy_queue(NULL, "$SYS/broker/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(msgs_sent != g_msgs_sent){
msgs_sent = g_msgs_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", msgs_sent);
db__messages_easy_queue(NULL, "$SYS/broker/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(publish_dropped != g_msgs_dropped){
publish_dropped = g_msgs_dropped;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", publish_dropped);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/dropped", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(pub_msgs_received != g_pub_msgs_received){
pub_msgs_received = g_pub_msgs_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_received);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(pub_msgs_sent != g_pub_msgs_sent){
pub_msgs_sent = g_pub_msgs_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", pub_msgs_sent);
db__messages_easy_queue(NULL, "$SYS/broker/publish/messages/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(bytes_received != g_bytes_received){
bytes_received = g_bytes_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_received);
db__messages_easy_queue(NULL, "$SYS/broker/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(bytes_sent != g_bytes_sent){
bytes_sent = g_bytes_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", bytes_sent);
db__messages_easy_queue(NULL, "$SYS/broker/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(pub_bytes_received != g_pub_bytes_received){
pub_bytes_received = g_pub_bytes_received;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_received);
db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/received", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(pub_bytes_sent != g_pub_bytes_sent){
pub_bytes_sent = g_pub_bytes_sent;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", pub_bytes_sent);
db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(out_packet_count != g_out_packet_count){
out_packet_count = g_out_packet_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", out_packet_count);
db__messages_easy_queue(NULL, "$SYS/broker/packet/out/count", SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(out_packet_bytes != g_out_packet_bytes){
out_packet_bytes = g_out_packet_bytes;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_bytes);
db__messages_easy_queue(NULL, "$SYS/broker/packet/out/bytes", SYS_TREE_QOS, len, buf, 1, 0, NULL);
/* Handle loads first, because they reference other metrics and need next != current */
if(db.now_s > last_update){
double i_mult = 60.0/(double)(db.now_s-last_update);
double exponent_1min = exp(-1.0*(double)(db.now_s-last_update)/60.0);
double exponent_5min = exp(-1.0*(double)(db.now_s-last_update)/300.0);
double exponent_15min = exp(-1.0*(double)(db.now_s-last_update)/900.0);
for(int i=0; i<mosq_metric_load_max; i++){
if(metric_loads[i].interval == METRIC_LOAD_1MIN){
calc_load(buf, exponent_1min, i_mult, &metric_loads[i]);
}else if(metric_loads[i].interval == METRIC_LOAD_5MIN){
calc_load(buf, exponent_5min, i_mult, &metric_loads[i]);
}else{
calc_load(buf, exponent_15min, i_mult, &metric_loads[i]);
}
}
}
for(int i=0; i<mosq_metric_max; i++){
if((metrics[i].is_max && metrics[i].next > metrics[i].current) ||
(!metrics[i].is_max && metrics[i].next != metrics[i].current)){
metrics[i].current = metrics[i].next;
len = (uint32_t)snprintf(buf, BUFLEN, "%lu", metrics[i].current);
if(metrics[i].topic){
db__messages_easy_queue(NULL, metrics[i].topic, SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
if(metrics[i].topic_alias){
db__messages_easy_queue(NULL, metrics[i].topic_alias, SYS_TREE_QOS, len, buf, 1, 0, NULL);
}
}
}
last_update = db.now_s;

@ -20,56 +20,77 @@ Contributors:
#define SYS_TREE_H
#if defined(WITH_SYS_TREE) && defined(WITH_BROKER)
extern uint64_t g_bytes_received;
extern uint64_t g_bytes_sent;
extern uint64_t g_pub_bytes_received;
extern uint64_t g_pub_bytes_sent;
extern int64_t g_out_packet_bytes;
extern unsigned long g_msgs_received;
extern unsigned long g_msgs_sent;
extern unsigned long g_pub_msgs_received;
extern unsigned long g_pub_msgs_sent;
extern unsigned long g_msgs_dropped;
extern long g_out_packet_count;
extern unsigned int g_clients_expired;
extern unsigned int g_socket_connections;
extern unsigned int g_connection_count;
#define G_BYTES_RECEIVED_INC(A) (g_bytes_received+=(uint64_t)(A))
#define G_BYTES_SENT_INC(A) (g_bytes_sent+=(uint64_t)(A))
#define G_PUB_BYTES_RECEIVED_INC(A) (g_pub_bytes_received+=(A))
#define G_PUB_BYTES_SENT_INC(A) (g_pub_bytes_sent+=(A))
#define G_MSGS_RECEIVED_INC(A) (g_msgs_received+=(A))
#define G_MSGS_SENT_INC(A) (g_msgs_sent+=(A))
#define G_PUB_MSGS_RECEIVED_INC(A) (g_pub_msgs_received+=(A))
#define G_PUB_MSGS_SENT_INC(A) (g_pub_msgs_sent+=(A))
#define G_MSGS_DROPPED_INC() (g_msgs_dropped++)
#define G_CLIENTS_EXPIRED_INC() (g_clients_expired++)
#define G_SOCKET_CONNECTIONS_INC() (g_socket_connections++)
#define G_CONNECTION_COUNT_INC() (g_connection_count++)
#define G_OUT_PACKET_COUNT_INC(A) (g_out_packet_count+=(A))
#define G_OUT_PACKET_COUNT_DEC(A) (g_out_packet_count-=(A))
#define G_OUT_PACKET_BYTES_INC(A) (g_out_packet_bytes+=(A))
#define G_OUT_PACKET_BYTES_DEC(A) (g_out_packet_bytes-=(A))
/* This ordering *must* match the metrics array in sys_tree.c. */
enum mosq_metric_type{
mosq_gauge_clients_total = 0,
mosq_counter_clients_maximum = 1,
mosq_gauge_clients_disconnected = 2,
mosq_gauge_clients_connected = 3,
mosq_counter_clients_expired = 4,
mosq_gauge_message_store_count = 5,
mosq_gauge_message_store_bytes = 6,
mosq_gauge_subscriptions = 7,
mosq_gauge_shared_subscriptions = 8,
mosq_gauge_retained_messages = 9,
mosq_gauge_heap_current = 10,
mosq_counter_heap_maximum = 11,
mosq_counter_messages_received = 12,
mosq_counter_messages_sent = 13,
mosq_counter_bytes_received = 14,
mosq_counter_bytes_sent = 15,
mosq_counter_pub_bytes_received = 16,
mosq_counter_pub_bytes_sent = 17,
mosq_gauge_out_packets = 18,
mosq_gauge_out_packet_bytes = 19,
mosq_counter_socket_connections = 20,
mosq_counter_mqtt_connect_received = 21,
mosq_counter_mqtt_publish_dropped = 22,
mosq_counter_mqtt_publish_received = 23,
mosq_counter_mqtt_publish_sent = 24,
#else
mosq_metric_max,
};
/* This ordering *must* match the metrics_load array in sys_tree.c. */
enum mosq_metric_load_type{
mosq_load_messages_received_1min = 0,
mosq_load_messages_received_5min = 1,
mosq_load_messages_received_15min = 2,
mosq_load_messages_sent_1min = 3,
mosq_load_messages_sent_5min = 4,
mosq_load_messages_sent_15min = 5,
mosq_load_pub_messages_dropped_1min = 6,
mosq_load_pub_messages_dropped_5min = 7,
mosq_load_pub_messages_dropped_15min = 8,
mosq_load_pub_messages_received_1min = 9,
mosq_load_pub_messages_received_5min = 10,
mosq_load_pub_messages_received_15min = 11,
mosq_load_pub_messages_sent_1min = 12,
mosq_load_pub_messages_sent_5min = 13,
mosq_load_pub_messages_sent_15min = 14,
mosq_load_bytes_received_1min = 15,
mosq_load_bytes_received_5min = 16,
mosq_load_bytes_received_15min = 17,
mosq_load_bytes_sent_1min = 18,
mosq_load_bytes_sent_5min = 19,
mosq_load_bytes_sent_15min = 20,
mosq_load_sockets_1min = 21,
mosq_load_sockets_5min = 22,
mosq_load_sockets_15min = 23,
mosq_load_connections_1min = 24,
mosq_load_connections_5min = 25,
mosq_load_connections_15min = 26,
mosq_metric_load_max,
};
#define G_BYTES_RECEIVED_INC(A)
#define G_BYTES_SENT_INC(A)
#define G_PUB_BYTES_RECEIVED_INC(A)
#define G_PUB_BYTES_SENT_INC(A)
#define G_MSGS_RECEIVED_INC(A)
#define G_MSGS_SENT_INC(A)
#define G_PUB_MSGS_RECEIVED_INC(A)
#define G_PUB_MSGS_SENT_INC(A)
#define G_MSGS_DROPPED_INC()
#define G_CLIENTS_EXPIRED_INC()
#define G_SOCKET_CONNECTIONS_INC()
#define G_CONNECTION_COUNT_INC()
#define G_OUT_PACKET_COUNT_INC(A)
#define G_OUT_PACKET_COUNT_DEC(A)
#define G_OUT_PACKET_BYTES_INC(A)
#define G_OUT_PACKET_BYTES_DEC(A)
void metrics__int_inc(enum mosq_metric_type m, int64_t value);
void metrics__int_dec(enum mosq_metric_type m, int64_t value);
#else
# define metrics__int_inc(A, B)
# define metrics__int_dec(A, B)
#endif

@ -236,7 +236,7 @@ static int callback_mqtt(
}
ucount = (unsigned int)count;
#ifdef WITH_SYS_TREE
g_bytes_sent += ucount;
metrics__int_inc(mosq_counter_bytes_sent, ucount);
#endif
packet->to_process -= ucount;
packet->pos += ucount;
@ -251,9 +251,9 @@ static int callback_mqtt(
}
#ifdef WITH_SYS_TREE
g_msgs_sent++;
metrics__int_inc(mosq_counter_messages_sent, 1);
if(((packet->command)&0xF0) == CMD_PUBLISH){
g_pub_msgs_sent++;
metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
}
#endif
@ -280,7 +280,7 @@ static int callback_mqtt(
mosq = u->mosq;
pos = 0;
buf = (uint8_t *)in;
G_BYTES_RECEIVED_INC(len);
metrics__int_inc(mosq_counter_bytes_received, (int64_t)len);
while(pos < len){
if(!mosq->in_packet.command){
mosq->in_packet.command = buf[pos];
@ -336,10 +336,7 @@ static int callback_mqtt(
mosq->in_packet.pos = 0;
#ifdef WITH_SYS_TREE
G_MSGS_RECEIVED_INC(1);
if(((mosq->in_packet.command)&0xF0) == CMD_PUBLISH){
G_PUB_MSGS_RECEIVED_INC(1);
}
metrics__int_inc(mosq_counter_messages_received, 1);
#endif
rc = handle__packet(mosq);

@ -7,21 +7,7 @@
#include <send_mosq.h>
#include <time_mosq.h>
#include <callbacks.h>
uint64_t g_bytes_received;
uint64_t g_bytes_sent;
uint64_t g_pub_bytes_received;
uint64_t g_pub_bytes_sent;
int64_t g_out_packet_bytes;
unsigned long g_msgs_received;
unsigned long g_msgs_sent;
unsigned long g_pub_msgs_received;
unsigned long g_pub_msgs_sent;
unsigned long g_msgs_dropped;
long g_out_packet_count;
unsigned int g_clients_expired;
unsigned int g_socket_connections;
unsigned int g_connection_count;
#include <sys_tree.h>
extern uint64_t last_retained;
extern char *last_sub;
@ -292,3 +278,12 @@ int send__disconnect(struct mosquitto *mosq, uint8_t reason_code, const mosquitt
UNUSED(properties);
return 0;
}
void metrics__int_inc(enum mosq_metric_type m, int64_t value)
{
UNUSED(m); UNUSED(value);
}
void metrics__int_dec(enum mosq_metric_type m, int64_t value)
{
UNUSED(m); UNUSED(value);
}

@ -9,21 +9,7 @@
#include <util_mosq.h>
#include <logging_mosq.h>
#include <persist.h>
uint64_t g_bytes_received;
uint64_t g_bytes_sent;
uint64_t g_pub_bytes_received;
uint64_t g_pub_bytes_sent;
int64_t g_out_packet_bytes;
unsigned long g_msgs_received;
unsigned long g_msgs_sent;
unsigned long g_pub_msgs_received;
unsigned long g_pub_msgs_sent;
unsigned long g_msgs_dropped;
long g_out_packet_count;
unsigned int g_clients_expired;
unsigned int g_socket_connections;
unsigned int g_connection_count;
#include <sys_tree.h>
int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt, ...)
{
@ -233,3 +219,12 @@ int session_expiry__add_from_persistence(struct mosquitto *context, time_t expir
UNUSED(expiry_time);
return 0;
}
void metrics__int_inc(enum mosq_metric_type m, int64_t value)
{
UNUSED(m); UNUSED(value);
}
void metrics__int_dec(enum mosq_metric_type m, int64_t value)
{
UNUSED(m); UNUSED(value);
}

Loading…
Cancel
Save