diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 41764f5e..d5ef8792 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -46,6 +46,10 @@ Contributors: # define G_BYTES_SENT_INC(A) # define G_MSGS_SENT_INC(A) # define G_PUB_MSGS_SENT_INC(A) +# define G_OUT_PACKET_COUNT_INC(A) +# define G_OUT_PACKET_COUNT_DEC(A) +# define G_OUT_PACKET_BYTES_INC(A) +# define G_OUT_PACKET_BYTES_DEC(A) #endif int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length) @@ -120,6 +124,8 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq) mosquitto__FREE(packet); } + G_OUT_PACKET_COUNT_DEC(mosq->out_packet_count); + G_OUT_PACKET_BYTES_DEC(mosq->out_packet_bytes); mosq->out_packet_count = 0; mosq->out_packet_bytes = 0; mosq->out_packet_last = NULL; @@ -146,6 +152,8 @@ static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packe mosq->out_packet_last = packet; mosq->out_packet_count++; mosq->out_packet_bytes += packet->packet_length; + G_OUT_PACKET_COUNT_INC(1); + G_OUT_PACKET_BYTES_INC(packet->packet_length); pthread_mutex_unlock(&mosq->out_packet_mutex); } @@ -228,6 +236,8 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq) if(mosq->out_packet){ mosq->out_packet_count--; mosq->out_packet_bytes -= mosq->out_packet->packet_length; + G_OUT_PACKET_COUNT_DEC(1); + G_OUT_PACKET_BYTES_DEC(mosq->out_packet->packet_length); mosq->out_packet = mosq->out_packet->next; if(!mosq->out_packet){ diff --git a/man/mosquitto.8.xml b/man/mosquitto.8.xml index 7d358a79..3696c17a 100644 --- a/man/mosquitto.8.xml +++ b/man/mosquitto.8.xml @@ -556,6 +556,28 @@ The total number of messages of any type sent since the broker started. + + + + + The current number of packets queued for delivery across + all clients. A large and increasing value here may + indicate messages are being sent faster than the network + can handle. + + + + + + + + The current number of bytes in packets queued for + delivery across all clients. A large and increasing + value here may indicate messages are being sent faster + than the network can handle. + + + diff --git a/src/bridge.c b/src/bridge.c index 9bb2ff1d..7370cc9d 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -49,6 +49,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "send_mosq.h" +#include "sys_tree.h" #include "time_mosq.h" #include "tls_mosq.h" #include "util_mosq.h" @@ -847,6 +848,8 @@ static void bridge__packet_cleanup(struct mosquitto *context) } context->out_packet = NULL; context->out_packet_last = NULL; + G_OUT_PACKET_COUNT_DEC(context->out_packet_count); + G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes); context->out_packet_count = 0; context->out_packet_bytes = 0; diff --git a/src/context.c b/src/context.c index e078acc3..1aad869b 100644 --- a/src/context.c +++ b/src/context.c @@ -29,6 +29,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "property_mosq.h" +#include "sys_tree.h" #include "time_mosq.h" #include "util_mosq.h" #include "will_mosq.h" @@ -164,6 +165,8 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->out_packet = context->out_packet->next; mosquitto__FREE(packet); } + G_OUT_PACKET_COUNT_DEC(context->out_packet_count); + G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes); context->out_packet_count = 0; context->out_packet_bytes = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) diff --git a/src/sys_tree.c b/src/sys_tree.c index bd03787f..314c80b5 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -37,11 +37,13 @@ uint64_t g_bytes_received = 0; uint64_t g_bytes_sent = 0; uint64_t g_pub_bytes_received = 0; uint64_t g_pub_bytes_sent = 0; +int64_t g_out_packet_bytes = 0; unsigned long g_msgs_received = 0; unsigned long g_msgs_sent = 0; unsigned long g_pub_msgs_received = 0; unsigned long g_pub_msgs_sent = 0; unsigned long g_msgs_dropped = 0; +long g_out_packet_count = 0; unsigned int g_clients_expired = 0; unsigned int g_socket_connections = 0; unsigned int g_connection_count = 0; @@ -179,6 +181,8 @@ void sys_tree__update(void) static int subscription_count = INT_MAX; static int shared_subscription_count = INT_MAX; static int retained_count = INT_MAX; + static long out_packet_count = LONG_MAX; + static long long out_packet_bytes = LLONG_MAX; static double msgs_received_load1 = 0; static double msgs_received_load5 = 0; @@ -391,6 +395,18 @@ void sys_tree__update(void) db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL); } + if(out_packet_count != g_out_packet_count){ + out_packet_count = g_out_packet_count; + len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_count); + db__messages_easy_queue(NULL, "$SYS/broker/packet/out/count", SYS_TREE_QOS, len, buf, 1, 60, NULL); + } + + if(out_packet_bytes != g_out_packet_bytes){ + out_packet_bytes = g_out_packet_bytes; + len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_bytes); + db__messages_easy_queue(NULL, "$SYS/broker/packet/out/bytes", SYS_TREE_QOS, len, buf, 1, 60, NULL); + } + last_update = db.now_s; } } diff --git a/src/sys_tree.h b/src/sys_tree.h index d0d91200..ed5e7a21 100644 --- a/src/sys_tree.h +++ b/src/sys_tree.h @@ -24,11 +24,13 @@ extern uint64_t g_bytes_received; extern uint64_t g_bytes_sent; extern uint64_t g_pub_bytes_received; extern uint64_t g_pub_bytes_sent; +extern int64_t g_out_packet_bytes; extern unsigned long g_msgs_received; extern unsigned long g_msgs_sent; extern unsigned long g_pub_msgs_received; extern unsigned long g_pub_msgs_sent; extern unsigned long g_msgs_dropped; +extern long g_out_packet_count; extern unsigned int g_clients_expired; extern unsigned int g_socket_connections; extern unsigned int g_connection_count; @@ -45,6 +47,10 @@ extern unsigned int g_connection_count; #define G_CLIENTS_EXPIRED_INC() (g_clients_expired++) #define G_SOCKET_CONNECTIONS_INC() (g_socket_connections++) #define G_CONNECTION_COUNT_INC() (g_connection_count++) +#define G_OUT_PACKET_COUNT_INC(A) (g_out_packet_count+=(A)) +#define G_OUT_PACKET_COUNT_DEC(A) (g_out_packet_count-=(A)) +#define G_OUT_PACKET_BYTES_INC(A) (g_out_packet_bytes+=(A)) +#define G_OUT_PACKET_BYTES_DEC(A) (g_out_packet_bytes-=(A)) #else @@ -60,6 +66,10 @@ extern unsigned int g_connection_count; #define G_CLIENTS_EXPIRED_INC() #define G_SOCKET_CONNECTIONS_INC() #define G_CONNECTION_COUNT_INC() +#define G_OUT_PACKET_COUNT_INC(A) +#define G_OUT_PACKET_COUNT_DEC(A) +#define G_OUT_PACKET_BYTES_INC(A) +#define G_OUT_PACKET_BYTES_DEC(A) #endif