diff --git a/lib/mosquitto.c b/lib/mosquitto.c index e24a6916..1fecc3c1 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -183,6 +183,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st packet__cleanup(&mosq->in_packet); mosq->out_packet = NULL; mosq->out_packet_count = 0; + mosq->out_packet_bytes = 0; mosq->last_msg_in = mosquitto_time(); mosq->next_msg_out = mosquitto_time() + mosq->keepalive; mosq->ping_t = 0; diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 7f0cdd86..5db31bc4 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -307,6 +307,7 @@ struct mosquitto { uint16_t alias_max_l2r; uint32_t will_delay_interval; int out_packet_count; + int64_t out_packet_bytes; time_t will_delay_time; #ifdef WITH_TLS SSL *ssl; diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index afbd0a16..41764f5e 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -121,6 +121,7 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq) mosquitto__FREE(packet); } mosq->out_packet_count = 0; + mosq->out_packet_bytes = 0; mosq->out_packet_last = NULL; packet__cleanup(&mosq->in_packet); @@ -134,6 +135,21 @@ void packet__cleanup_all(struct mosquitto *mosq) } +static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet) +{ + pthread_mutex_lock(&mosq->out_packet_mutex); + if(mosq->out_packet){ + mosq->out_packet_last->next = packet; + }else{ + mosq->out_packet = packet; + } + mosq->out_packet_last = packet; + mosq->out_packet_count++; + mosq->out_packet_bytes += packet->packet_length; + pthread_mutex_unlock(&mosq->out_packet_mutex); +} + + int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) { #ifndef WITH_BROKER @@ -148,15 +164,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) packet->pos = WS_PACKET_OFFSET; packet->to_process = packet->packet_length - WS_PACKET_OFFSET; - pthread_mutex_lock(&mosq->out_packet_mutex); - if(mosq->out_packet){ - mosq->out_packet_last->next = packet; - }else{ - mosq->out_packet = packet; - } - mosq->out_packet_last = packet; - mosq->out_packet_count++; - pthread_mutex_unlock(&mosq->out_packet_mutex); + packet__queue_append(mosq, packet); lws_callback_on_writable(mosq->wsi); return MOSQ_ERR_SUCCESS; @@ -173,14 +181,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) packet->to_process = packet->packet_length - WS_PACKET_OFFSET; } - pthread_mutex_lock(&mosq->out_packet_mutex); - if(mosq->out_packet){ - mosq->out_packet_last->next = packet; - }else{ - mosq->out_packet = packet; - } - mosq->out_packet_last = packet; - pthread_mutex_unlock(&mosq->out_packet_mutex); + packet__queue_append(mosq, packet); #ifdef WITH_BROKER return packet__write(mosq); @@ -225,11 +226,13 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq) pthread_mutex_lock(&mosq->out_packet_mutex); if(mosq->out_packet){ + mosq->out_packet_count--; + mosq->out_packet_bytes -= mosq->out_packet->packet_length; + mosq->out_packet = mosq->out_packet->next; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } - mosq->out_packet_count--; packet = mosq->out_packet; } pthread_mutex_unlock(&mosq->out_packet_mutex); diff --git a/src/bridge.c b/src/bridge.c index 793a15fc..9bb2ff1d 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -848,6 +848,7 @@ static void bridge__packet_cleanup(struct mosquitto *context) context->out_packet = NULL; context->out_packet_last = NULL; context->out_packet_count = 0; + context->out_packet_bytes = 0; packet__cleanup(&(context->in_packet)); } diff --git a/src/context.c b/src/context.c index 9d3a07ec..e078acc3 100644 --- a/src/context.c +++ b/src/context.c @@ -99,6 +99,7 @@ struct mosquitto *context__init(void) packet__cleanup(&context->in_packet); context->out_packet = NULL; context->out_packet_count = 0; + context->out_packet_bytes = 0; context->address = NULL; context->bridge = NULL; @@ -164,6 +165,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) mosquitto__FREE(packet); } context->out_packet_count = 0; + context->out_packet_bytes = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ gai_cancel(context->adns); diff --git a/src/xtreport.c b/src/xtreport.c index a9093b06..67334abf 100644 --- a/src/xtreport.c +++ b/src/xtreport.c @@ -43,13 +43,7 @@ static void client_cost(FILE *fptr, struct mosquitto *context, int fn_index) long tBytes; pkt_count = 1; - pkt_bytes = context->in_packet.packet_length; - pkt_tmp = context->out_packet; - while(pkt_tmp){ - pkt_count++; - pkt_bytes += pkt_tmp->packet_length; - pkt_tmp = pkt_tmp->next; - } + pkt_bytes = context->in_packet.packet_length + context->out_packet_bytes; cmsg_count = context->msgs_in.inflight_count + context->msgs_in.queued_count; cmsg_bytes = context->msgs_in.inflight_bytes + context->msgs_in.queued_bytes;