diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 4d6f51f0..305da9fb 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -19,14 +19,6 @@ Contributors: #ifdef WITH_BROKER # include "mosquitto_broker.h" -# ifdef WITH_SYS_TREE - extern uint64_t g_bytes_received; - extern uint64_t g_bytes_sent; - extern unsigned long g_msgs_received; - extern unsigned long g_msgs_sent; - extern unsigned long g_pub_msgs_received; - extern unsigned long g_pub_msgs_sent; -# endif # ifdef WITH_WEBSOCKETS # include # endif @@ -39,6 +31,14 @@ Contributors: #include "net_mosq.h" #include "packet_mosq.h" #include "read_handle.h" +#ifdef WITH_BROKER +# include "sys_tree.h" +#else +# define G_BYTES_RECEIVED_INC(A) +# define G_BYTES_SENT_INC(A) +# define G_MSGS_SENT_INC(A) +# define G_PUB_MSGS_SENT_INC(A) +#endif void mosquitto__packet_cleanup(struct mosquitto__packet *packet) { @@ -239,9 +239,7 @@ int mosquitto__packet_write(struct mosquitto *mosq) while(packet->to_process > 0){ write_length = mosquitto__net_write(mosq, &(packet->payload[packet->pos]), packet->to_process); if(write_length > 0){ -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_sent += write_length; -#endif + G_BYTES_SENT_INC(write_length); packet->to_process -= write_length; packet->pos += write_length; }else{ @@ -263,15 +261,10 @@ int mosquitto__packet_write(struct mosquitto *mosq) } } -#ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_msgs_sent++; - if(((packet->command)&0xF6) == PUBLISH){ - g_pub_msgs_sent++; - } -# endif -#else + G_MSGS_SENT_INC(1); if(((packet->command)&0xF6) == PUBLISH){ + G_PUB_MSGS_SENT_INC(1); +#ifndef WITH_BROKER pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_publish){ /* This is a QoS=0 message */ @@ -315,8 +308,8 @@ int mosquitto__packet_write(struct mosquitto *mosq) pthread_mutex_unlock(&mosq->callback_mutex); pthread_mutex_unlock(&mosq->current_out_packet_mutex); return MOSQ_ERR_SUCCESS; - } #endif + } /* Free data and reset values */ pthread_mutex_lock(&mosq->out_packet_mutex); @@ -376,9 +369,7 @@ int mosquitto__packet_read(struct mosquitto *mosq) if(read_length == 1){ mosq->in_packet.command = byte; #ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_bytes_received++; -# endif + G_BYTES_RECEIVED_INC(1); /* Clients must send CONNECT as their first command. */ if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL; #endif @@ -418,9 +409,7 @@ int mosquitto__packet_read(struct mosquitto *mosq) */ if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL; -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_received++; -#endif + G_BYTES_RECEIVED_INC(1); mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult; mosq->in_packet.remaining_mult *= 128; }else{ @@ -453,9 +442,7 @@ int mosquitto__packet_read(struct mosquitto *mosq) while(mosq->in_packet.to_process>0){ read_length = mosquitto__net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process); if(read_length > 0){ -#if defined(WITH_BROKER) && defined(WITH_SYS_TREE) - g_bytes_received += read_length; -#endif + G_BYTES_RECEIVED_INC(read_length); mosq->in_packet.to_process -= read_length; mosq->in_packet.pos += read_length; }else{ @@ -488,12 +475,10 @@ int mosquitto__packet_read(struct mosquitto *mosq) /* All data for this packet is read. */ mosq->in_packet.pos = 0; #ifdef WITH_BROKER -# ifdef WITH_SYS_TREE - g_msgs_received++; + G_MSGS_RECEIVED_INC(1); if(((mosq->in_packet.command)&0xF5) == PUBLISH){ - g_pub_msgs_received++; + G_PUB_MSGS_RECEIVED_INC(1); } -# endif rc = mqtt3_packet_handle(db, mosq); #else rc = mosquitto__packet_handle(mosq); diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 2c17797d..82d7ff67 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -30,10 +30,10 @@ Contributors: #include "util_mosq.h" #ifdef WITH_BROKER -#include "mosquitto_broker.h" -# ifdef WITH_SYS_TREE -extern uint64_t g_pub_bytes_sent; -# endif +# include "mosquitto_broker.h" +# include "sys_tree.h" +#else +# define G_PUB_BYTES_SENT_INC(A) #endif int mosquitto__send_pingreq(struct mosquitto *mosq) @@ -155,9 +155,7 @@ int mosquitto__send_publish(struct mosquitto *mosq, uint16_t mid, const char *to mapped_topic = topic_temp; } mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen); -#ifdef WITH_SYS_TREE - g_pub_bytes_sent += payloadlen; -#endif + G_PUB_BYTES_SENT_INC(payloadlen); rc = mosquitto__send_real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup); mosquitto__free(mapped_topic); return rc; @@ -167,9 +165,7 @@ int mosquitto__send_publish(struct mosquitto *mosq, uint16_t mid, const char *to } #endif mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen); -# ifdef WITH_SYS_TREE - g_pub_bytes_sent += payloadlen; -# endif + G_PUB_BYTES_SENT_INC(payloadlen); #else mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen); #endif diff --git a/src/database.c b/src/database.c index 5343005e..86fd7000 100644 --- a/src/database.c +++ b/src/database.c @@ -22,13 +22,11 @@ Contributors: #include "mosquitto_broker.h" #include "memory_mosq.h" #include "send_mosq.h" +#include "sys_tree.h" #include "time_mosq.h" static int max_inflight = 20; static int max_queued = 100; -#ifdef WITH_SYS_TREE -extern unsigned long g_msgs_dropped; -#endif int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db) { @@ -359,16 +357,12 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, "Outgoing messages are being dropped for client %s.", context->id); } -#ifdef WITH_SYS_TREE - g_msgs_dropped++; -#endif + G_MSGS_DROPPED_INC(); return 2; } }else{ if(max_queued > 0 && context->msg_count12 >= max_queued){ -#ifdef WITH_SYS_TREE - g_msgs_dropped++; -#endif + G_MSGS_DROPPED_INC(); if(context->is_dropping == false){ context->is_dropping = true; mosquitto__log_printf(NULL, MOSQ_LOG_NOTICE, diff --git a/src/loop.c b/src/loop.c index ea90675d..66d8644f 100644 --- a/src/loop.c +++ b/src/loop.c @@ -44,6 +44,7 @@ Contributors: #include "memory_mosq.h" #include "packet_mosq.h" #include "send_mosq.h" +#include "sys_tree.h" #include "time_mosq.h" #include "util_mosq.h" @@ -53,9 +54,6 @@ extern bool flag_db_backup; #endif extern bool flag_tree_print; extern int run; -#ifdef WITH_SYS_TREE -extern int g_clients_expired; -#endif static void loop_handle_errors(struct mosquitto_db *db, struct pollfd *pollfds); static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pollfds); @@ -296,9 +294,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock id = ""; } mosquitto__log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id); -#ifdef WITH_SYS_TREE - g_clients_expired++; -#endif + G_CLIENTS_EXPIRED_INC(); context->clean_session = true; context->state = mosq_cs_expiring; do_disconnect(db, context); diff --git a/src/net.c b/src/net.c index a3542770..c6303fc8 100644 --- a/src/net.c +++ b/src/net.c @@ -59,9 +59,7 @@ static int tls_ex_index_context = -1; static int tls_ex_index_listener = -1; #endif -#ifdef WITH_SYS_TREE -extern unsigned int g_socket_connections; -#endif +#include "sys_tree.h" int mqtt3_socket_accept(struct mosquitto_db *db, int listensock) { @@ -83,9 +81,7 @@ int mqtt3_socket_accept(struct mosquitto_db *db, int listensock) new_sock = accept(listensock, NULL, 0); if(new_sock == INVALID_SOCKET) return -1; -#ifdef WITH_SYS_TREE - g_socket_connections++; -#endif + G_SOCKET_CONNECTIONS_INC(); if(mosquitto__socket_nonblock(new_sock)){ return INVALID_SOCKET; diff --git a/src/read_handle.c b/src/read_handle.c index 59a5e546..4fb34cf0 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -26,12 +26,9 @@ Contributors: #include "packet_mosq.h" #include "read_handle.h" #include "send_mosq.h" +#include "sys_tree.h" #include "util_mosq.h" -#ifdef WITH_SYS_TREE -extern uint64_t g_pub_bytes_received; -#endif - int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context) { if(!context) return MOSQ_ERR_INVAL; @@ -170,9 +167,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } payloadlen = context->in_packet.remaining_length - context->in_packet.pos; -#ifdef WITH_SYS_TREE - g_pub_bytes_received += payloadlen; -#endif + G_PUB_BYTES_RECEIVED_INC(payloadlen); if(context->listener && context->listener->mount_point){ len = strlen(context->listener->mount_point) + strlen(topic) + 1; topic_mount = mosquitto__malloc(len+1); diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 922b409d..67b0ed40 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -17,13 +17,14 @@ Contributors: #include #include -#include "config.h> +#include "config.h" #include "mosquitto_broker.h" #include "mqtt3_protocol.h" #include "memory_mosq.h" #include "packet_mosq.h" #include "send_mosq.h" +#include "sys_tree.h" #include "time_mosq.h" #include "tls_mosq.h" #include "util_mosq.h" @@ -36,10 +37,6 @@ Contributors: #include #endif -#ifdef WITH_SYS_TREE -extern unsigned int g_connection_count; -#endif - static char *client_id_gen(struct mosquitto_db *db) { char *client_id; @@ -101,9 +98,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) X509_NAME_ENTRY *name_entry; #endif -#ifdef WITH_SYS_TREE - g_connection_count++; -#endif + G_CONNECTION_COUNT_INC(); /* Don't accept multiple CONNECT commands. */ if(context->state != mosq_cs_new){ diff --git a/src/websockets.c b/src/websockets.c index c5620f12..f68bc0e5 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -34,19 +34,12 @@ POSSIBILITY OF SUCH DAMAGE. #include "mosquitto_broker.h" #include "mqtt3_protocol.h" #include "memory_mosq.h" +#include "sys_tree.h" #include #include #include -#ifdef WITH_SYS_TREE -extern uint64_t g_bytes_received; -extern uint64_t g_bytes_sent; -extern unsigned long g_msgs_received; -extern unsigned long g_msgs_sent; -extern unsigned long g_pub_msgs_received; -extern unsigned long g_pub_msgs_sent; -#endif extern struct mosquitto_db int_db; static int callback_mqtt(struct libwebsocket_context *context, @@ -258,9 +251,7 @@ static int callback_mqtt(struct libwebsocket_context *context, mosq = u->mosq; pos = 0; buf = (uint8_t *)in; -#ifdef WITH_SYS_TREE - g_bytes_received += len; -#endif + G_BYTES_RECEIVED_INC(len); while(pos < len){ if(!mosq->in_packet.command){ mosq->in_packet.command = buf[pos]; @@ -315,10 +306,11 @@ static int callback_mqtt(struct libwebsocket_context *context, /* All data for this packet is read. */ mosq->in_packet.pos = 0; + #ifdef WITH_SYS_TREE - g_msgs_received++; + G_MSGS_RECEIVED_INC(); if(((mosq->in_packet.command)&0xF5) == PUBLISH){ - g_pub_msgs_received++; + G_PUB_MSGS_RECEIVED_INC(); } #endif rc = mqtt3_packet_handle(db, mosq);