Store out_packet bytes rather than having to calculate it.

pull/2559/head
Roger A. Light 3 years ago
parent e3246f547c
commit 4099f8d1b6

@ -183,6 +183,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
packet__cleanup(&mosq->in_packet); packet__cleanup(&mosq->in_packet);
mosq->out_packet = NULL; mosq->out_packet = NULL;
mosq->out_packet_count = 0; mosq->out_packet_count = 0;
mosq->out_packet_bytes = 0;
mosq->last_msg_in = mosquitto_time(); mosq->last_msg_in = mosquitto_time();
mosq->next_msg_out = mosquitto_time() + mosq->keepalive; mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
mosq->ping_t = 0; mosq->ping_t = 0;

@ -307,6 +307,7 @@ struct mosquitto {
uint16_t alias_max_l2r; uint16_t alias_max_l2r;
uint32_t will_delay_interval; uint32_t will_delay_interval;
int out_packet_count; int out_packet_count;
int64_t out_packet_bytes;
time_t will_delay_time; time_t will_delay_time;
#ifdef WITH_TLS #ifdef WITH_TLS
SSL *ssl; SSL *ssl;

@ -121,6 +121,7 @@ void packet__cleanup_all_no_locks(struct mosquitto *mosq)
mosquitto__FREE(packet); mosquitto__FREE(packet);
} }
mosq->out_packet_count = 0; mosq->out_packet_count = 0;
mosq->out_packet_bytes = 0;
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
packet__cleanup(&mosq->in_packet); packet__cleanup(&mosq->in_packet);
@ -134,6 +135,21 @@ void packet__cleanup_all(struct mosquitto *mosq)
} }
static void packet__queue_append(struct mosquitto *mosq, struct mosquitto__packet *packet)
{
pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet){
mosq->out_packet_last->next = packet;
}else{
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
mosq->out_packet_count++;
mosq->out_packet_bytes += packet->packet_length;
pthread_mutex_unlock(&mosq->out_packet_mutex);
}
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
{ {
#ifndef WITH_BROKER #ifndef WITH_BROKER
@ -148,15 +164,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
packet->pos = WS_PACKET_OFFSET; packet->pos = WS_PACKET_OFFSET;
packet->to_process = packet->packet_length - WS_PACKET_OFFSET; packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
pthread_mutex_lock(&mosq->out_packet_mutex); packet__queue_append(mosq, packet);
if(mosq->out_packet){
mosq->out_packet_last->next = packet;
}else{
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
mosq->out_packet_count++;
pthread_mutex_unlock(&mosq->out_packet_mutex);
lws_callback_on_writable(mosq->wsi); lws_callback_on_writable(mosq->wsi);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
@ -173,14 +181,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
packet->to_process = packet->packet_length - WS_PACKET_OFFSET; packet->to_process = packet->packet_length - WS_PACKET_OFFSET;
} }
pthread_mutex_lock(&mosq->out_packet_mutex); packet__queue_append(mosq, packet);
if(mosq->out_packet){
mosq->out_packet_last->next = packet;
}else{
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
pthread_mutex_unlock(&mosq->out_packet_mutex);
#ifdef WITH_BROKER #ifdef WITH_BROKER
return packet__write(mosq); return packet__write(mosq);
@ -225,11 +226,13 @@ struct mosquitto__packet *packet__get_next_out(struct mosquitto *mosq)
pthread_mutex_lock(&mosq->out_packet_mutex); pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet){ if(mosq->out_packet){
mosq->out_packet_count--;
mosq->out_packet_bytes -= mosq->out_packet->packet_length;
mosq->out_packet = mosq->out_packet->next; mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){ if(!mosq->out_packet){
mosq->out_packet_last = NULL; mosq->out_packet_last = NULL;
} }
mosq->out_packet_count--;
packet = mosq->out_packet; packet = mosq->out_packet;
} }
pthread_mutex_unlock(&mosq->out_packet_mutex); pthread_mutex_unlock(&mosq->out_packet_mutex);

@ -848,6 +848,7 @@ static void bridge__packet_cleanup(struct mosquitto *context)
context->out_packet = NULL; context->out_packet = NULL;
context->out_packet_last = NULL; context->out_packet_last = NULL;
context->out_packet_count = 0; context->out_packet_count = 0;
context->out_packet_bytes = 0;
packet__cleanup(&(context->in_packet)); packet__cleanup(&(context->in_packet));
} }

@ -99,6 +99,7 @@ struct mosquitto *context__init(void)
packet__cleanup(&context->in_packet); packet__cleanup(&context->in_packet);
context->out_packet = NULL; context->out_packet = NULL;
context->out_packet_count = 0; context->out_packet_count = 0;
context->out_packet_bytes = 0;
context->address = NULL; context->address = NULL;
context->bridge = NULL; context->bridge = NULL;
@ -164,6 +165,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
mosquitto__FREE(packet); mosquitto__FREE(packet);
} }
context->out_packet_count = 0; context->out_packet_count = 0;
context->out_packet_bytes = 0;
#if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS)
if(context->adns){ if(context->adns){
gai_cancel(context->adns); gai_cancel(context->adns);

@ -43,13 +43,7 @@ static void client_cost(FILE *fptr, struct mosquitto *context, int fn_index)
long tBytes; long tBytes;
pkt_count = 1; pkt_count = 1;
pkt_bytes = context->in_packet.packet_length; pkt_bytes = context->in_packet.packet_length + context->out_packet_bytes;
pkt_tmp = context->out_packet;
while(pkt_tmp){
pkt_count++;
pkt_bytes += pkt_tmp->packet_length;
pkt_tmp = pkt_tmp->next;
}
cmsg_count = context->msgs_in.inflight_count + context->msgs_in.queued_count; cmsg_count = context->msgs_in.inflight_count + context->msgs_in.queued_count;
cmsg_bytes = context->msgs_in.inflight_bytes + context->msgs_in.queued_bytes; cmsg_bytes = context->msgs_in.inflight_bytes + context->msgs_in.queued_bytes;

Loading…
Cancel
Save