Publish global out_packet values to $SYS

pull/2559/head
Roger A. Light 3 years ago
parent 4099f8d1b6
commit 29f49bf6ab

@ -46,6 +46,10 @@ Contributors:
# define G_BYTES_SENT_INC(A)
# define G_MSGS_SENT_INC(A)
# define G_PUB_MSGS_SENT_INC(A)
# define G_OUT_PACKET_COUNT_INC(A)
# define G_OUT_PACKET_COUNT_DEC(A)
# define G_OUT_PACKET_BYTES_INC(A)
# define G_OUT_PACKET_BYTES_DEC(A)
#endif
int packet__alloc(struct mosquitto__packet **packet, uint8_t command, uint32_t remaining_length)
@ -120,6 +124,8 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq)
mosquitto__FREE(packet);
}
G_OUT_PACKET_COUNT_DEC(mosq->out_packet_count);
G_OUT_PACKET_BYTES_DEC(mosq->out_packet_bytes);
mosq->out_packet_count = 0;
mosq->out_packet_bytes = 0;
mosq->out_packet_last = NULL;
@ -146,6 +152,8 @@ static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packe
mosq->out_packet_last = packet;
mosq->out_packet_count++;
mosq->out_packet_bytes += packet->packet_length;
G_OUT_PACKET_COUNT_INC(1);
G_OUT_PACKET_BYTES_INC(packet->packet_length);
pthread_mutex_unlock(&mosq->out_packet_mutex);
}
@ -228,6 +236,8 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
if(mosq->out_packet){
mosq->out_packet_count--;
mosq->out_packet_bytes -= mosq->out_packet->packet_length;
G_OUT_PACKET_COUNT_DEC(1);
G_OUT_PACKET_BYTES_DEC(mosq->out_packet->packet_length);
mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){

@ -556,6 +556,28 @@
<para>The total number of messages of any type sent since the broker started.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/packet/out/count</option></term>
<listitem>
<para>
The current number of packets queued for delivery across
all clients. A large and increasing value here may
indicate messages are being sent faster than the network
can handle.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/packet/out/bytes</option></term>
<listitem>
<para>
The current number of bytes in packets queued for
delivery across all clients. A large and increasing
value here may indicate messages are being sent faster
than the network can handle.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>$SYS/broker/publish/messages/dropped</option></term>
<listitem>

@ -49,6 +49,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "send_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"
#include "tls_mosq.h"
#include "util_mosq.h"
@ -847,6 +848,8 @@ static void bridge__packet_cleanup(struct mosquitto *context)
}
context->out_packet = NULL;
context->out_packet_last = NULL;
G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;

@ -29,6 +29,7 @@ Contributors:
#include "memory_mosq.h"
#include "packet_mosq.h"
#include "property_mosq.h"
#include "sys_tree.h"
#include "time_mosq.h"
#include "util_mosq.h"
#include "will_mosq.h"
@ -164,6 +165,8 @@ void context__cleanup(struct mosquitto *context, bool force_free)
context->out_packet = context->out_packet->next;
mosquitto__FREE(packet);
}
G_OUT_PACKET_COUNT_DEC(context->out_packet_count);
G_OUT_PACKET_BYTES_DEC(context->out_packet_bytes);
context->out_packet_count = 0;
context->out_packet_bytes = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)

@ -37,11 +37,13 @@ uint64_t g_bytes_received = 0;
uint64_t g_bytes_sent = 0;
uint64_t g_pub_bytes_received = 0;
uint64_t g_pub_bytes_sent = 0;
int64_t g_out_packet_bytes = 0;
unsigned long g_msgs_received = 0;
unsigned long g_msgs_sent = 0;
unsigned long g_pub_msgs_received = 0;
unsigned long g_pub_msgs_sent = 0;
unsigned long g_msgs_dropped = 0;
long g_out_packet_count = 0;
unsigned int g_clients_expired = 0;
unsigned int g_socket_connections = 0;
unsigned int g_connection_count = 0;
@ -179,6 +181,8 @@ void sys_tree__update(void)
static int subscription_count = INT_MAX;
static int shared_subscription_count = INT_MAX;
static int retained_count = INT_MAX;
static long out_packet_count = LONG_MAX;
static long long out_packet_bytes = LLONG_MAX;
static double msgs_received_load1 = 0;
static double msgs_received_load5 = 0;
@ -391,6 +395,18 @@ void sys_tree__update(void)
db__messages_easy_queue(NULL, "$SYS/broker/publish/bytes/sent", SYS_TREE_QOS, len, buf, 1, 60, NULL);
}
if(out_packet_count != g_out_packet_count){
out_packet_count = g_out_packet_count;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_count);
db__messages_easy_queue(NULL, "$SYS/broker/packet/out/count", SYS_TREE_QOS, len, buf, 1, 60, NULL);
}
if(out_packet_bytes != g_out_packet_bytes){
out_packet_bytes = g_out_packet_bytes;
len = (uint32_t)snprintf(buf, BUFLEN, "%llu", out_packet_bytes);
db__messages_easy_queue(NULL, "$SYS/broker/packet/out/bytes", SYS_TREE_QOS, len, buf, 1, 60, NULL);
}
last_update = db.now_s;
}
}

@ -24,11 +24,13 @@ extern uint64_t g_bytes_received;
extern uint64_t g_bytes_sent;
extern uint64_t g_pub_bytes_received;
extern uint64_t g_pub_bytes_sent;
extern int64_t g_out_packet_bytes;
extern unsigned long g_msgs_received;
extern unsigned long g_msgs_sent;
extern unsigned long g_pub_msgs_received;
extern unsigned long g_pub_msgs_sent;
extern unsigned long g_msgs_dropped;
extern long g_out_packet_count;
extern unsigned int g_clients_expired;
extern unsigned int g_socket_connections;
extern unsigned int g_connection_count;
@ -45,6 +47,10 @@ extern unsigned int g_connection_count;
#define G_CLIENTS_EXPIRED_INC() (g_clients_expired++)
#define G_SOCKET_CONNECTIONS_INC() (g_socket_connections++)
#define G_CONNECTION_COUNT_INC() (g_connection_count++)
#define G_OUT_PACKET_COUNT_INC(A) (g_out_packet_count+=(A))
#define G_OUT_PACKET_COUNT_DEC(A) (g_out_packet_count-=(A))
#define G_OUT_PACKET_BYTES_INC(A) (g_out_packet_bytes+=(A))
#define G_OUT_PACKET_BYTES_DEC(A) (g_out_packet_bytes-=(A))
#else
@ -60,6 +66,10 @@ extern unsigned int g_connection_count;
#define G_CLIENTS_EXPIRED_INC()
#define G_SOCKET_CONNECTIONS_INC()
#define G_CONNECTION_COUNT_INC()
#define G_OUT_PACKET_COUNT_INC(A)
#define G_OUT_PACKET_COUNT_DEC(A)
#define G_OUT_PACKET_BYTES_INC(A)
#define G_OUT_PACKET_BYTES_DEC(A)
#endif

Loading…
Cancel
Save