diff --git a/lib/mosquitto.c b/lib/mosquitto.c index b97221f6..1a227f7e 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -169,7 +169,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se } } mosq->in_packet.payload = NULL; - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->out_packet = NULL; mosq->current_out_packet = NULL; mosq->last_msg_in = mosquitto_time(); @@ -357,11 +357,11 @@ void mosquitto__destroy(struct mosquitto *mosq) mosq->out_packet = mosq->out_packet->next; } - mosquitto__packet_cleanup(packet); + packet__cleanup(packet); mosquitto__free(packet); } - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); if(mosq->sockpairR != INVALID_SOCKET){ COMPAT_CLOSE(mosq->sockpairR); mosq->sockpairR = INVALID_SOCKET; @@ -482,7 +482,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking) mosq->ping_t = 0; - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); pthread_mutex_lock(&mosq->current_out_packet_mutex); pthread_mutex_lock(&mosq->out_packet_mutex); @@ -500,7 +500,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking) mosq->out_packet = mosq->out_packet->next; } - mosquitto__packet_cleanup(packet); + packet__cleanup(packet); mosquitto__free(packet); } pthread_mutex_unlock(&mosq->out_packet_mutex); @@ -1134,7 +1134,7 @@ int mosquitto_loop_read(struct mosquitto *mosq, int max_packets) }else #endif { - rc = mosquitto__packet_read(mosq); + rc = packet__read(mosq); } if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ return mosquitto__loop_rc_handle(mosq, rc); @@ -1162,7 +1162,7 @@ int mosquitto_loop_write(struct mosquitto *mosq, int max_packets) * have QoS > 0. We should try to deal with that many in this loop in order * to keep up. */ for(i=0; iremaining_length; + packet->payload = NULL; + packet->remaining_count = 0; + do{ + byte = remaining_length % 128; + remaining_length = remaining_length / 128; + /* If there are more digits to encode, set the top bit of this digit */ + if(remaining_length > 0){ + byte = byte | 0x80; + } + remaining_bytes[packet->remaining_count] = byte; + packet->remaining_count++; + }while(remaining_length > 0 && packet->remaining_count < 5); + if(packet->remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE; + packet->packet_length = packet->remaining_length + 1 + packet->remaining_count; +#ifdef WITH_WEBSOCKETS + packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length + LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING); +#else + packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length); +#endif + if(!packet->payload) return MOSQ_ERR_NOMEM; + + packet->payload[0] = packet->command; + for(i=0; iremaining_count; i++){ + packet->payload[i+1] = remaining_bytes[i]; + } + packet->pos = 1 + packet->remaining_count; + + return MOSQ_ERR_SUCCESS; +} + +void packet__cleanup(struct mosquitto__packet *packet) { if(!packet) return; @@ -55,7 +94,7 @@ void mosquitto__packet_cleanup(struct mosquitto__packet *packet) packet->pos = 0; } -int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet) +int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) { #ifndef WITH_BROKER char sockpair_data = 0; @@ -81,10 +120,10 @@ int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *pa libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi); return 0; }else{ - return mosquitto__packet_write(mosq); + return packet__write(mosq); } # else - return mosquitto__packet_write(mosq); + return packet__write(mosq); # endif #else @@ -100,7 +139,7 @@ int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *pa } if(mosq->in_callback == false && mosq->threaded == false){ - return mosquitto__packet_write(mosq); + return packet__write(mosq); }else{ return MOSQ_ERR_SUCCESS; } @@ -108,7 +147,7 @@ int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *pa } -int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte) +int packet__read_byte(struct mosquitto__packet *packet, uint8_t *byte) { assert(packet); if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL; @@ -120,7 +159,7 @@ int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte) } -void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte) +void packet__write_byte(struct mosquitto__packet *packet, uint8_t byte) { assert(packet); assert(packet->pos+1 <= packet->packet_length); @@ -130,7 +169,7 @@ void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte) } -int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count) +int packet__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count) { assert(packet); if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL; @@ -142,7 +181,7 @@ int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_ } -void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count) +void packet__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count) { assert(packet); assert(packet->pos+count <= packet->packet_length); @@ -152,13 +191,13 @@ void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, } -int mosquitto__read_string(struct mosquitto__packet *packet, char **str) +int packet__read_string(struct mosquitto__packet *packet, char **str) { uint16_t len; int rc; assert(packet); - rc = mosquitto__read_uint16(packet, &len); + rc = packet__read_uint16(packet, &len); if(rc) return rc; if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL; @@ -176,15 +215,15 @@ int mosquitto__read_string(struct mosquitto__packet *packet, char **str) } -void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length) +void packet__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length) { assert(packet); - mosquitto__write_uint16(packet, length); - mosquitto__write_bytes(packet, str, length); + packet__write_uint16(packet, length); + packet__write_bytes(packet, str, length); } -int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word) +int packet__read_uint16(struct mosquitto__packet *packet, uint16_t *word) { uint8_t msb, lsb; @@ -202,14 +241,14 @@ int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word) } -void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word) +void packet__write_uint16(struct mosquitto__packet *packet, uint16_t word) { - mosquitto__write_byte(packet, MOSQ_MSB(word)); - mosquitto__write_byte(packet, MOSQ_LSB(word)); + packet__write_byte(packet, MOSQ_MSB(word)); + packet__write_byte(packet, MOSQ_LSB(word)); } -int mosquitto__packet_write(struct mosquitto *mosq) +int packet__write(struct mosquitto *mosq) { ssize_t write_length; struct mosquitto__packet *packet; @@ -291,7 +330,7 @@ int mosquitto__packet_write(struct mosquitto *mosq) } pthread_mutex_unlock(&mosq->out_packet_mutex); - mosquitto__packet_cleanup(packet); + packet__cleanup(packet); mosquitto__free(packet); pthread_mutex_lock(&mosq->msgtime_mutex); @@ -322,7 +361,7 @@ int mosquitto__packet_write(struct mosquitto *mosq) } pthread_mutex_unlock(&mosq->out_packet_mutex); - mosquitto__packet_cleanup(packet); + packet__cleanup(packet); mosquitto__free(packet); pthread_mutex_lock(&mosq->msgtime_mutex); @@ -335,9 +374,9 @@ int mosquitto__packet_write(struct mosquitto *mosq) #ifdef WITH_BROKER -int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq) +int packet__read(struct mosquitto_db *db, struct mosquitto *mosq) #else -int mosquitto__packet_read(struct mosquitto *mosq) +int packet__read(struct mosquitto *mosq) #endif { uint8_t byte; @@ -485,7 +524,7 @@ int mosquitto__packet_read(struct mosquitto *mosq) #endif /* Free data and reset values */ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); pthread_mutex_lock(&mosq->msgtime_mutex); mosq->last_msg_in = mosquitto_time(); diff --git a/lib/packet_mosq.h b/lib/packet_mosq.h index e9001e05..2ff3b73f 100644 --- a/lib/packet_mosq.h +++ b/lib/packet_mosq.h @@ -23,24 +23,25 @@ Contributors: struct mosquitto_db; #endif -void mosquitto__packet_cleanup(struct mosquitto__packet *packet); -int mosquitto__packet_queue(struct mosquitto *mosq, struct mosquitto__packet *packet); +int packet__alloc(struct mosquitto__packet *packet); +void packet__cleanup(struct mosquitto__packet *packet); +int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet); -int mosquitto__read_byte(struct mosquitto__packet *packet, uint8_t *byte); -int mosquitto__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count); -int mosquitto__read_string(struct mosquitto__packet *packet, char **str); -int mosquitto__read_uint16(struct mosquitto__packet *packet, uint16_t *word); +int packet__read_byte(struct mosquitto__packet *packet, uint8_t *byte); +int packet__read_bytes(struct mosquitto__packet *packet, void *bytes, uint32_t count); +int packet__read_string(struct mosquitto__packet *packet, char **str); +int packet__read_uint16(struct mosquitto__packet *packet, uint16_t *word); -void mosquitto__write_byte(struct mosquitto__packet *packet, uint8_t byte); -void mosquitto__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count); -void mosquitto__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length); -void mosquitto__write_uint16(struct mosquitto__packet *packet, uint16_t word); +void packet__write_byte(struct mosquitto__packet *packet, uint8_t byte); +void packet__write_bytes(struct mosquitto__packet *packet, const void *bytes, uint32_t count); +void packet__write_string(struct mosquitto__packet *packet, const char *str, uint16_t length); +void packet__write_uint16(struct mosquitto__packet *packet, uint16_t word); -int mosquitto__packet_write(struct mosquitto *mosq); +int packet__write(struct mosquitto *mosq); #ifdef WITH_BROKER -int mosquitto__packet_read(struct mosquitto_db *db, struct mosquitto *mosq); +int packet__read(struct mosquitto_db *db, struct mosquitto *mosq); #else -int mosquitto__packet_read(struct mosquitto *mosq); +int packet__read(struct mosquitto *mosq); #endif #endif diff --git a/lib/read_handle.c b/lib/read_handle.c index bf51d84f..e0d22d6f 100644 --- a/lib/read_handle.c +++ b/lib/read_handle.c @@ -80,7 +80,7 @@ int mosquitto__handle_publish(struct mosquitto *mosq) message->msg.qos = (header & 0x06)>>1; message->msg.retain = (header & 0x01); - rc = mosquitto__read_string(&mosq->in_packet, &message->msg.topic); + rc = packet__read_string(&mosq->in_packet, &message->msg.topic); if(rc){ mosquitto__message_cleanup(&message); return rc; @@ -91,7 +91,7 @@ int mosquitto__handle_publish(struct mosquitto *mosq) } if(message->msg.qos > 0){ - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc){ mosquitto__message_cleanup(&message); return rc; @@ -106,7 +106,7 @@ int mosquitto__handle_publish(struct mosquitto *mosq) mosquitto__message_cleanup(&message); return MOSQ_ERR_NOMEM; } - rc = mosquitto__read_bytes(&mosq->in_packet, message->msg.payload, message->msg.payloadlen); + rc = packet__read_bytes(&mosq->in_packet, message->msg.payload, message->msg.payloadlen); if(rc){ mosquitto__message_cleanup(&message); return rc; diff --git a/lib/read_handle_client.c b/lib/read_handle_client.c index a0abbc5a..42f9ef98 100644 --- a/lib/read_handle_client.c +++ b/lib/read_handle_client.c @@ -31,9 +31,9 @@ int mosquitto__handle_connack(struct mosquitto *mosq) assert(mosq); mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK", mosq->id); - rc = mosquitto__read_byte(&mosq->in_packet, &byte); // Reserved byte, not used + rc = packet__read_byte(&mosq->in_packet, &byte); // Reserved byte, not used if(rc) return rc; - rc = mosquitto__read_byte(&mosq->in_packet, &result); + rc = packet__read_byte(&mosq->in_packet, &result); if(rc) return rc; pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_connect){ diff --git a/lib/read_handle_shared.c b/lib/read_handle_shared.c index 47811ae3..64e13541 100644 --- a/lib/read_handle_shared.c +++ b/lib/read_handle_shared.c @@ -65,7 +65,7 @@ int mosquitto__handle_pubackcomp(struct mosquitto *mosq, const char *type) int rc; assert(mosq); - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; #ifdef WITH_BROKER mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid); @@ -98,7 +98,7 @@ int mosquitto__handle_pubrec(struct mosquitto *mosq) int rc; assert(mosq); - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; #ifdef WITH_BROKER mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid); @@ -130,7 +130,7 @@ int mosquitto__handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq) return MOSQ_ERR_PROTOCOL; } } - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; #ifdef WITH_BROKER mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid); @@ -176,14 +176,14 @@ int mosquitto__handle_suback(struct mosquitto *mosq) #else mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received SUBACK", mosq->id); #endif - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; qos_count = mosq->in_packet.remaining_length - mosq->in_packet.pos; granted_qos = mosquitto__malloc(qos_count*sizeof(int)); if(!granted_qos) return MOSQ_ERR_NOMEM; while(mosq->in_packet.pos < mosq->in_packet.remaining_length){ - rc = mosquitto__read_byte(&mosq->in_packet, &qos); + rc = packet__read_byte(&mosq->in_packet, &qos); if(rc){ mosquitto__free(granted_qos); return rc; @@ -216,7 +216,7 @@ int mosquitto__handle_unsuback(struct mosquitto *mosq) #else mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received UNSUBACK", mosq->id); #endif - rc = mosquitto__read_uint16(&mosq->in_packet, &mid); + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; #ifndef WITH_BROKER pthread_mutex_lock(&mosq->callback_mutex); diff --git a/lib/send_client_mosq.c b/lib/send_client_mosq.c index a93e7c85..61b685db 100644 --- a/lib/send_client_mosq.c +++ b/lib/send_client_mosq.c @@ -89,7 +89,7 @@ int mosquitto__send_connect(struct mosquitto *mosq, uint16_t keepalive, bool cle packet->command = CONNECT; packet->remaining_length = headerlen+payloadlen; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; @@ -97,9 +97,9 @@ int mosquitto__send_connect(struct mosquitto *mosq, uint16_t keepalive, bool cle /* Variable header */ if(version == MQTT_PROTOCOL_V31){ - mosquitto__write_string(packet, PROTOCOL_NAME_v31, strlen(PROTOCOL_NAME_v31)); + packet__write_string(packet, PROTOCOL_NAME_v31, strlen(PROTOCOL_NAME_v31)); }else if(version == MQTT_PROTOCOL_V311){ - mosquitto__write_string(packet, PROTOCOL_NAME_v311, strlen(PROTOCOL_NAME_v311)); + packet__write_string(packet, PROTOCOL_NAME_v311, strlen(PROTOCOL_NAME_v311)); }else{ mosquitto__free(packet); return MOSQ_ERR_INVAL; @@ -110,7 +110,7 @@ int mosquitto__send_connect(struct mosquitto *mosq, uint16_t keepalive, bool cle }else{ } #endif - mosquitto__write_byte(packet, version); + packet__write_byte(packet, version); byte = (clean_session&0x1)<<1; if(will){ byte = byte | ((mosq->will->retain&0x1)<<5) | ((mosq->will->qos&0x3)<<3) | ((will&0x1)<<2); @@ -121,19 +121,19 @@ int mosquitto__send_connect(struct mosquitto *mosq, uint16_t keepalive, bool cle byte = byte | 0x1<<6; } } - mosquitto__write_byte(packet, byte); - mosquitto__write_uint16(packet, keepalive); + packet__write_byte(packet, byte); + packet__write_uint16(packet, keepalive); /* Payload */ - mosquitto__write_string(packet, clientid, strlen(clientid)); + packet__write_string(packet, clientid, strlen(clientid)); if(will){ - mosquitto__write_string(packet, mosq->will->topic, strlen(mosq->will->topic)); - mosquitto__write_string(packet, (const char *)mosq->will->payload, mosq->will->payloadlen); + packet__write_string(packet, mosq->will->topic, strlen(mosq->will->topic)); + packet__write_string(packet, (const char *)mosq->will->payload, mosq->will->payloadlen); } if(username){ - mosquitto__write_string(packet, username, strlen(username)); + packet__write_string(packet, username, strlen(username)); if(password){ - mosquitto__write_string(packet, password, strlen(password)); + packet__write_string(packet, password, strlen(password)); } } @@ -145,7 +145,7 @@ int mosquitto__send_connect(struct mosquitto *mosq, uint16_t keepalive, bool cle #else mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending CONNECT", clientid); #endif - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } int mosquitto__send_disconnect(struct mosquitto *mosq) @@ -179,7 +179,7 @@ int mosquitto__send_subscribe(struct mosquitto *mosq, int *mid, const char *topi packet->command = SUBSCRIBE | (1<<1); packet->remaining_length = packetlen; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; @@ -188,11 +188,11 @@ int mosquitto__send_subscribe(struct mosquitto *mosq, int *mid, const char *topi /* Variable header */ local_mid = mosquitto__mid_generate(mosq); if(mid) *mid = (int)local_mid; - mosquitto__write_uint16(packet, local_mid); + packet__write_uint16(packet, local_mid); /* Payload */ - mosquitto__write_string(packet, topic, strlen(topic)); - mosquitto__write_byte(packet, topic_qos); + packet__write_string(packet, topic, strlen(topic)); + packet__write_byte(packet, topic_qos); #ifdef WITH_BROKER # ifdef WITH_BRIDGE @@ -202,7 +202,7 @@ int mosquitto__send_subscribe(struct mosquitto *mosq, int *mid, const char *topi mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending SUBSCRIBE (Mid: %d, Topic: %s, QoS: %d)", mosq->id, local_mid, topic, topic_qos); #endif - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } @@ -224,7 +224,7 @@ int mosquitto__send_unsubscribe(struct mosquitto *mosq, int *mid, const char *to packet->command = UNSUBSCRIBE | (1<<1); packet->remaining_length = packetlen; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; @@ -233,10 +233,10 @@ int mosquitto__send_unsubscribe(struct mosquitto *mosq, int *mid, const char *to /* Variable header */ local_mid = mosquitto__mid_generate(mosq); if(mid) *mid = (int)local_mid; - mosquitto__write_uint16(packet, local_mid); + packet__write_uint16(packet, local_mid); /* Payload */ - mosquitto__write_string(packet, topic, strlen(topic)); + packet__write_string(packet, topic, strlen(topic)); #ifdef WITH_BROKER # ifdef WITH_BRIDGE @@ -245,6 +245,6 @@ int mosquitto__send_unsubscribe(struct mosquitto *mosq, int *mid, const char *to #else mosquitto__log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending UNSUBSCRIBE (Mid: %d, Topic: %s)", mosq->id, local_mid, topic); #endif - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 82d7ff67..8f2a091d 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -208,7 +208,7 @@ int mosquitto__send_command_with_mid(struct mosquitto *mosq, uint8_t command, ui packet->command |= 8; } packet->remaining_length = 2; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; @@ -217,7 +217,7 @@ int mosquitto__send_command_with_mid(struct mosquitto *mosq, uint8_t command, ui packet->payload[packet->pos+0] = MOSQ_MSB(mid); packet->payload[packet->pos+1] = MOSQ_LSB(mid); - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } /* For DISCONNECT, PINGREQ and PINGRESP */ @@ -233,13 +233,13 @@ int mosquitto__send_simple_command(struct mosquitto *mosq, uint8_t command) packet->command = command; packet->remaining_length = 0; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; } - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } int mosquitto__send_real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup) @@ -259,21 +259,21 @@ int mosquitto__send_real_publish(struct mosquitto *mosq, uint16_t mid, const cha packet->mid = mid; packet->command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain; packet->remaining_length = packetlen; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; } /* Variable header (topic string) */ - mosquitto__write_string(packet, topic, strlen(topic)); + packet__write_string(packet, topic, strlen(topic)); if(qos > 0){ - mosquitto__write_uint16(packet, mid); + packet__write_uint16(packet, mid); } /* Payload */ if(payloadlen){ - mosquitto__write_bytes(packet, payload, payloadlen); + packet__write_bytes(packet, payload, payloadlen); } - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } diff --git a/lib/socks_mosq.c b/lib/socks_mosq.c index 930acdf0..b2c9233a 100644 --- a/lib/socks_mosq.c +++ b/lib/socks_mosq.c @@ -130,7 +130,7 @@ int mosquitto__socks5_send(struct mosquitto *mosq) return MOSQ_ERR_NOMEM; } - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); }else if(mosq->state == mosq_cs_socks5_auth_ok){ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; @@ -163,7 +163,7 @@ int mosquitto__socks5_send(struct mosquitto *mosq) return MOSQ_ERR_NOMEM; } - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); }else if(mosq->state == mosq_cs_socks5_send_userpass){ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet)); if(!packet) return MOSQ_ERR_NOMEM; @@ -194,7 +194,7 @@ int mosquitto__socks5_send(struct mosquitto *mosq) return MOSQ_ERR_NOMEM; } - return mosquitto__packet_queue(mosq, packet); + return packet__queue(mosq, packet); } return MOSQ_ERR_SUCCESS; } @@ -218,7 +218,7 @@ int mosquitto__socks5_read(struct mosquitto *mosq) if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ return MOSQ_ERR_SUCCESS; }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); switch(errno){ case 0: return MOSQ_ERR_PROXY; @@ -231,20 +231,20 @@ int mosquitto__socks5_read(struct mosquitto *mosq) } } if(mosq->in_packet.payload[0] != 5){ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_PROXY; } switch(mosq->in_packet.payload[1]){ case SOCKS_AUTH_NONE: - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->state = mosq_cs_socks5_auth_ok; return mosquitto__socks5_send(mosq); case SOCKS_AUTH_USERPASS: - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->state = mosq_cs_socks5_send_userpass; return mosquitto__socks5_send(mosq); default: - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_AUTH; } }else if(mosq->state == mosq_cs_socks5_userpass_reply){ @@ -260,7 +260,7 @@ int mosquitto__socks5_read(struct mosquitto *mosq) if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ return MOSQ_ERR_SUCCESS; }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); switch(errno){ case 0: return MOSQ_ERR_PROXY; @@ -273,16 +273,16 @@ int mosquitto__socks5_read(struct mosquitto *mosq) } } if(mosq->in_packet.payload[0] != 1){ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_PROXY; } if(mosq->in_packet.payload[1] == 0){ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->state = mosq_cs_socks5_auth_ok; return mosquitto__socks5_send(mosq); }else{ i = mosq->in_packet.payload[1]; - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); switch(i){ case SOCKS_REPLY_CONNECTION_NOT_ALLOWED: return MOSQ_ERR_AUTH; @@ -316,7 +316,7 @@ int mosquitto__socks5_read(struct mosquitto *mosq) if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ return MOSQ_ERR_SUCCESS; }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); switch(errno){ case 0: return MOSQ_ERR_PROXY; @@ -343,21 +343,21 @@ int mosquitto__socks5_read(struct mosquitto *mosq) mosq->in_packet.packet_length += mosq->in_packet.payload[4]; } }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_PROTOCOL; } payload = mosquitto__realloc(mosq->in_packet.payload, mosq->in_packet.packet_length); if(payload){ mosq->in_packet.payload = payload; }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_NOMEM; } payload = mosquitto__realloc(mosq->in_packet.payload, mosq->in_packet.packet_length); if(payload){ mosq->in_packet.payload = payload; }else{ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_NOMEM; } return MOSQ_ERR_SUCCESS; @@ -365,17 +365,17 @@ int mosquitto__socks5_read(struct mosquitto *mosq) /* Entire packet is now read. */ if(mosq->in_packet.payload[0] != 5){ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); return MOSQ_ERR_PROXY; } if(mosq->in_packet.payload[1] == 0){ /* Auth passed */ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->state = mosq_cs_new; return mosquitto__send_connect(mosq, mosq->keepalive, mosq->clean_session); }else{ i = mosq->in_packet.payload[1]; - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->state = mosq_cs_socks5_new; switch(i){ case SOCKS_REPLY_CONNECTION_NOT_ALLOWED: @@ -397,7 +397,7 @@ int mosquitto__socks5_read(struct mosquitto *mosq) } } }else{ - return mosquitto__packet_read(mosq); + return packet__read(mosq); } return MOSQ_ERR_SUCCESS; } diff --git a/lib/util_mosq.c b/lib/util_mosq.c index dfd51a81..716f5630 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -38,45 +38,6 @@ Contributors: #include #endif -int mosquitto__packet_alloc(struct mosquitto__packet *packet) -{ - uint8_t remaining_bytes[5], byte; - uint32_t remaining_length; - int i; - - assert(packet); - - remaining_length = packet->remaining_length; - packet->payload = NULL; - packet->remaining_count = 0; - do{ - byte = remaining_length % 128; - remaining_length = remaining_length / 128; - /* If there are more digits to encode, set the top bit of this digit */ - if(remaining_length > 0){ - byte = byte | 0x80; - } - remaining_bytes[packet->remaining_count] = byte; - packet->remaining_count++; - }while(remaining_length > 0 && packet->remaining_count < 5); - if(packet->remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE; - packet->packet_length = packet->remaining_length + 1 + packet->remaining_count; -#ifdef WITH_WEBSOCKETS - packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length + LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING); -#else - packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length); -#endif - if(!packet->payload) return MOSQ_ERR_NOMEM; - - packet->payload[0] = packet->command; - for(i=0; iremaining_count; i++){ - packet->payload[i+1] = remaining_bytes[i]; - } - packet->pos = 1 + packet->remaining_count; - - return MOSQ_ERR_SUCCESS; -} - #ifdef WITH_BROKER void mosquitto__check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq) #else diff --git a/lib/util_mosq.h b/lib/util_mosq.h index 046e3bac..cfb5c1ca 100644 --- a/lib/util_mosq.h +++ b/lib/util_mosq.h @@ -25,7 +25,6 @@ Contributors: # include "mosquitto_broker.h" #endif -int mosquitto__packet_alloc(struct mosquitto__packet *packet); #ifdef WITH_BROKER void mosquitto__check_keepalive(struct mosquitto_db *db, struct mosquitto *mosq); #else diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 672a5b16..e386960d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -14,7 +14,7 @@ set (MOSQ_SRCS mosquitto_broker.h net.c ../lib/net_mosq.c ../lib/net_mosq.h - ../lib/packet.c ../lib/packet.h + ../lib/packet_mosq.c ../lib/packet_mosq.h persist.c persist.h read_handle.c read_handle_client.c read_handle_server.c ../lib/read_handle_shared.c ../lib/read_handle.h @@ -23,7 +23,7 @@ set (MOSQ_SRCS ../lib/send_client_mosq.c ../lib/send_mosq.h ../lib/send_mosq.c ../lib/send_mosq.h send_server.c - sys_tree.c + sys_tree.c sys_tree.h ../lib/time_mosq.c ../lib/tls_mosq.c ../lib/util_mosq.c ../lib/util_mosq.h diff --git a/src/bridge.c b/src/bridge.c index 4bc265db..6e36a777 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -252,12 +252,12 @@ void mqtt3_bridge_packet_cleanup(struct mosquitto *context) if(!context) return; if(context->current_out_packet){ - mosquitto__packet_cleanup(context->current_out_packet); + packet__cleanup(context->current_out_packet); mosquitto__free(context->current_out_packet); context->current_out_packet = NULL; } while(context->out_packet){ - mosquitto__packet_cleanup(context->out_packet); + packet__cleanup(context->out_packet); packet = context->out_packet; context->out_packet = context->out_packet->next; mosquitto__free(packet); @@ -265,7 +265,7 @@ void mqtt3_bridge_packet_cleanup(struct mosquitto *context) context->out_packet = NULL; context->out_packet_last = NULL; - mosquitto__packet_cleanup(&(context->in_packet)); + packet__cleanup(&(context->in_packet)); } #endif diff --git a/src/context.c b/src/context.c index 7bc06d47..7f9582d2 100644 --- a/src/context.c +++ b/src/context.c @@ -54,7 +54,7 @@ struct mosquitto *mqtt3_context_init(struct mosquitto_db *db, int sock) context->is_bridge = false; context->in_packet.payload = NULL; - mosquitto__packet_cleanup(&context->in_packet); + packet__cleanup(&context->in_packet); context->out_packet = NULL; context->current_out_packet = NULL; @@ -152,14 +152,14 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b mosquitto__free(context->id); context->id = NULL; } - mosquitto__packet_cleanup(&(context->in_packet)); + packet__cleanup(&(context->in_packet)); if(context->current_out_packet){ - mosquitto__packet_cleanup(context->current_out_packet); + packet__cleanup(context->current_out_packet); mosquitto__free(context->current_out_packet); context->current_out_packet = NULL; } while(context->out_packet){ - mosquitto__packet_cleanup(context->out_packet); + packet__cleanup(context->out_packet); packet = context->out_packet; context->out_packet = context->out_packet->next; mosquitto__free(packet); diff --git a/src/loop.c b/src/loop.c index 66d8644f..6ea3e8e6 100644 --- a/src/loop.c +++ b/src/loop.c @@ -481,7 +481,7 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol continue; } } - if(mosquitto__packet_write(context)){ + if(packet__write(context)){ do_disconnect(db, context); continue; } @@ -499,7 +499,7 @@ static void loop_handle_reads_writes(struct mosquitto_db *db, struct pollfd *pol #else if(pollfds[context->pollfd_index].revents & POLLIN){ #endif - if(mosquitto__packet_read(db, context)){ + if(packet__read(db, context)){ do_disconnect(db, context); continue; } diff --git a/src/read_handle.c b/src/read_handle.c index 4fb34cf0..b45c6028 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -99,7 +99,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } retain = (header & 0x01); - if(mosquitto__read_string(&context->in_packet, &topic)) return 1; + if(packet__read_string(&context->in_packet, &topic)) return 1; if(strlen(topic) == 0){ /* Invalid publish topic, disconnect client. */ mosquitto__free(topic); @@ -160,7 +160,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } if(qos > 0){ - if(mosquitto__read_uint16(&context->in_packet, &mid)){ + if(packet__read_uint16(&context->in_packet, &mid)){ mosquitto__free(topic); return 1; } @@ -192,7 +192,7 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) mosquitto__free(topic); return 1; } - if(mosquitto__read_bytes(&context->in_packet, payload, payloadlen)){ + if(packet__read_bytes(&context->in_packet, payload, payloadlen)){ mosquitto__free(topic); mosquitto__free(payload); return 1; diff --git a/src/read_handle_client.c b/src/read_handle_client.c index 30697352..1284ed66 100644 --- a/src/read_handle_client.c +++ b/src/read_handle_client.c @@ -38,8 +38,8 @@ int mqtt3_handle_connack(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_INVAL; } mosquitto__log_printf(NULL, MOSQ_LOG_DEBUG, "Received CONNACK on connection %s.", context->id); - if(mosquitto__read_byte(&context->in_packet, &byte)) return 1; // Reserved byte, not used - if(mosquitto__read_byte(&context->in_packet, &rc)) return 1; + if(packet__read_byte(&context->in_packet, &byte)) return 1; // Reserved byte, not used + if(packet__read_byte(&context->in_packet, &rc)) return 1; switch(rc){ case CONNACK_ACCEPTED: if(context->bridge){ diff --git a/src/read_handle_server.c b/src/read_handle_server.c index 67b0ed40..94d9e3a0 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -106,7 +106,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) goto handle_connect_error; } - if(mosquitto__read_string(&context->in_packet, &protocol_name)){ + if(packet__read_string(&context->in_packet, &protocol_name)){ rc = 1; goto handle_connect_error; return 1; @@ -116,7 +116,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) goto handle_connect_error; return 3; } - if(mosquitto__read_byte(&context->in_packet, &protocol_version)){ + if(packet__read_byte(&context->in_packet, &protocol_version)){ rc = 1; goto handle_connect_error; return 1; @@ -162,7 +162,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) } mosquitto__free(protocol_name); - if(mosquitto__read_byte(&context->in_packet, &connect_flags)){ + if(packet__read_byte(&context->in_packet, &connect_flags)){ rc = 1; goto handle_connect_error; } @@ -179,12 +179,12 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) password_flag = connect_flags & 0x40; username_flag = connect_flags & 0x80; - if(mosquitto__read_uint16(&context->in_packet, &(context->keepalive))){ + if(packet__read_uint16(&context->in_packet, &(context->keepalive))){ rc = 1; goto handle_connect_error; } - if(mosquitto__read_string(&context->in_packet, &client_id)){ + if(packet__read_string(&context->in_packet, &client_id)){ rc = 1; goto handle_connect_error; } @@ -228,7 +228,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) rc = MOSQ_ERR_NOMEM; goto handle_connect_error; } - if(mosquitto__read_string(&context->in_packet, &will_topic)){ + if(packet__read_string(&context->in_packet, &will_topic)){ rc = 1; goto handle_connect_error; } @@ -241,7 +241,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) goto handle_connect_error; } - if(mosquitto__read_uint16(&context->in_packet, &will_payloadlen)){ + if(packet__read_uint16(&context->in_packet, &will_payloadlen)){ rc = 1; goto handle_connect_error; } @@ -252,7 +252,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) goto handle_connect_error; } - rc = mosquitto__read_bytes(&context->in_packet, will_payload, will_payloadlen); + rc = packet__read_bytes(&context->in_packet, will_payload, will_payloadlen); if(rc){ rc = 1; goto handle_connect_error; @@ -268,10 +268,10 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) } if(username_flag){ - rc = mosquitto__read_string(&context->in_packet, &username); + rc = packet__read_string(&context->in_packet, &username); if(rc == MOSQ_ERR_SUCCESS){ if(password_flag){ - rc = mosquitto__read_string(&context->in_packet, &password); + rc = packet__read_string(&context->in_packet, &password); if(rc == MOSQ_ERR_NOMEM){ rc = MOSQ_ERR_NOMEM; goto handle_connect_error; @@ -608,11 +608,11 @@ int mqtt3_handle_subscribe(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_PROTOCOL; } } - if(mosquitto__read_uint16(&context->in_packet, &mid)) return 1; + if(packet__read_uint16(&context->in_packet, &mid)) return 1; while(context->in_packet.pos < context->in_packet.remaining_length){ sub = NULL; - if(mosquitto__read_string(&context->in_packet, &sub)){ + if(packet__read_string(&context->in_packet, &sub)){ if(payload) mosquitto__free(payload); return 1; } @@ -633,7 +633,7 @@ int mqtt3_handle_subscribe(struct mosquitto_db *db, struct mosquitto *context) return 1; } - if(mosquitto__read_byte(&context->in_packet, &qos)){ + if(packet__read_byte(&context->in_packet, &qos)){ mosquitto__free(sub); if(payload) mosquitto__free(payload); return 1; @@ -743,11 +743,11 @@ int mqtt3_handle_unsubscribe(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_PROTOCOL; } } - if(mosquitto__read_uint16(&context->in_packet, &mid)) return 1; + if(packet__read_uint16(&context->in_packet, &mid)) return 1; while(context->in_packet.pos < context->in_packet.remaining_length){ sub = NULL; - if(mosquitto__read_string(&context->in_packet, &sub)){ + if(packet__read_string(&context->in_packet, &sub)){ return 1; } diff --git a/src/send_server.c b/src/send_server.c index 01dcb76b..29c8f70e 100644 --- a/src/send_server.c +++ b/src/send_server.c @@ -40,7 +40,7 @@ int mosquitto__send_connack(struct mosquitto *context, int ack, int result) packet->command = CONNACK; packet->remaining_length = 2; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; @@ -48,7 +48,7 @@ int mosquitto__send_connack(struct mosquitto *context, int ack, int result) packet->payload[packet->pos+0] = ack; packet->payload[packet->pos+1] = result; - return mosquitto__packet_queue(context, packet); + return packet__queue(context, packet); } int mosquitto__send_suback(struct mosquitto *context, uint16_t mid, uint32_t payloadlen, const void *payload) @@ -63,15 +63,15 @@ int mosquitto__send_suback(struct mosquitto *context, uint16_t mid, uint32_t pay packet->command = SUBACK; packet->remaining_length = 2+payloadlen; - rc = mosquitto__packet_alloc(packet); + rc = packet__alloc(packet); if(rc){ mosquitto__free(packet); return rc; } - mosquitto__write_uint16(packet, mid); + packet__write_uint16(packet, mid); if(payloadlen){ - mosquitto__write_bytes(packet, payload, payloadlen); + packet__write_bytes(packet, payload, payloadlen); } - return mosquitto__packet_queue(context, packet); + return packet__queue(context, packet); } diff --git a/src/sys_tree.h b/src/sys_tree.h new file mode 100644 index 00000000..132ed584 --- /dev/null +++ b/src/sys_tree.h @@ -0,0 +1,64 @@ +/* +Copyright (c) 2015 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#ifndef SYS_TREE_H +#define SYS_TREE_H + +#if defined(WITH_SYS_TREE) && defined(WITH_BROKER) +extern uint64_t g_bytes_received; +extern uint64_t g_bytes_sent; +extern uint64_t g_pub_bytes_received; +extern uint64_t g_pub_bytes_sent; +extern unsigned long g_msgs_received; +extern unsigned long g_msgs_sent; +extern unsigned long g_pub_msgs_received; +extern unsigned long g_pub_msgs_sent; +extern unsigned long g_msgs_dropped; +extern int g_clients_expired; +extern unsigned int g_socket_connections; +extern unsigned int g_connection_count; + +#define G_BYTES_RECEIVED_INC(A) (g_bytes_received+=(A)) +#define G_BYTES_SENT_INC(A) (g_bytes_sent+=(A)) +#define G_PUB_BYTES_RECEIVED_INC(A) (g_pub_bytes_received+=(A)) +#define G_PUB_BYTES_SENT_INC(A) (g_pub_bytes_sent+=(A)) +#define G_MSGS_RECEIVED_INC(A) (g_msgs_received+=(A)) +#define G_MSGS_SENT_INC(A) (g_msgs_sent+=(A)) +#define G_PUB_MSGS_RECEIVED_INC(A) (g_pub_msgs_received+=(A)) +#define G_PUB_MSGS_SENT_INC(A) (g_pub_msgs_sent+=(A)) +#define G_MSGS_DROPPED_INC(A) (g_msgs_dropped++) +#define G_CLIENTS_EXPIRED_INC(A) (g_clients_expired++) +#define G_SOCKET_CONNECTIONS_INC(A) (g_socket_connections++) +#define G_CONNECTION_COUNT_INC(A) (g_connection_count++) + +#else + +#define G_BYTES_RECEIVED_INC(A) +#define G_BYTES_SENT_INC(A) +#define G_PUB_BYTES_RECEIVED_INC(A) +#define G_PUB_BYTES_SENT_INC(A) +#define G_MSGS_RECEIVED_INC(A) +#define G_MSGS_SENT_INC(A) +#define G_PUB_MSGS_RECEIVED_INC(A) +#define G_PUB_MSGS_SENT_INC(A) +#define G_MSGS_DROPPED_INC(A) +#define G_CLIENTS_EXPIRED_INC(A) +#define G_SOCKET_CONNECTIONS_INC(A) +#define G_CONNECTION_COUNT_INC(A) + +#endif + +#endif diff --git a/src/websockets.c b/src/websockets.c index f68bc0e5..03e7ab6b 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -34,6 +34,7 @@ POSSIBILITY OF SUCH DAMAGE. #include "mosquitto_broker.h" #include "mqtt3_protocol.h" #include "memory_mosq.h" +#include "packet_mosq.h" #include "sys_tree.h" #include @@ -230,7 +231,7 @@ static int callback_mqtt(struct libwebsocket_context *context, } } - mosquitto__packet_cleanup(packet); + packet__cleanup(packet); mosquitto__free(packet); mosq->last_msg_out = mosquitto_time(); @@ -308,15 +309,15 @@ static int callback_mqtt(struct libwebsocket_context *context, mosq->in_packet.pos = 0; #ifdef WITH_SYS_TREE - G_MSGS_RECEIVED_INC(); + G_MSGS_RECEIVED_INC(1); if(((mosq->in_packet.command)&0xF5) == PUBLISH){ - G_PUB_MSGS_RECEIVED_INC(); + G_PUB_MSGS_RECEIVED_INC(1); } #endif rc = mqtt3_packet_handle(db, mosq); /* Free data and reset values */ - mosquitto__packet_cleanup(&mosq->in_packet); + packet__cleanup(&mosq->in_packet); mosq->last_msg_in = mosquitto_time(); @@ -514,7 +515,7 @@ static void log_wrap(int level, const char *line) mosquitto__log_printf(NULL, MOSQ_LOG_WEBSOCKETS, "%s", l); } -struct libwebsocket_context *mosq_websockets_init(struct _mqtt3_listener *listener, int log_level) +struct libwebsocket_context *mosq_websockets_init(struct mqtt3__listener *listener, int log_level) { struct lws_context_creation_info info; struct libwebsocket_protocols *p;