Fix slow websockets performance when sending large messages.

Closes #1390. Thanks to aalibasic.
pull/1600/head
Roger A. Light 6 years ago
parent 701c1a9069
commit 0a8d5d6db0

@ -2,6 +2,7 @@ Broker:
- Fix v5 DISCONNECT packets with remaining length == 2 being treated as a - Fix v5 DISCONNECT packets with remaining length == 2 being treated as a
protocol error. Closes #1367. protocol error. Closes #1367.
- Fix support for libwebsockets 3.x. - Fix support for libwebsockets 3.x.
- Fix slow websockets performance when sending large messages. Closes #1390.
Build: Build:
- Fix missing function warnings on NetBSD. - Fix missing function warnings on NetBSD.

@ -47,6 +47,12 @@ POSSIBILITY OF SUCH DAMAGE.
# include <sys/socket.h> # include <sys/socket.h>
#endif #endif
/* Be careful if changing these, if TX is not bigger than SERV then there can
* be very large write performance penalties.
*/
#define WS_SERV_BUF_SIZE 4096
#define WS_TX_BUF_SIZE (WS_SERV_BUF_SIZE*2)
extern struct mosquitto_db int_db; extern struct mosquitto_db int_db;
#if defined(LWS_LIBRARY_VERSION_NUMBER) #if defined(LWS_LIBRARY_VERSION_NUMBER)
@ -97,7 +103,7 @@ static struct libwebsocket_protocols protocols[] = {
#ifdef LWS_LIBRARY_VERSION_NUMBER #ifdef LWS_LIBRARY_VERSION_NUMBER
NULL, /* user v1.4 on */ NULL, /* user v1.4 on */
# if LWS_LIBRARY_VERSION_NUMBER >= 2003000 # if LWS_LIBRARY_VERSION_NUMBER >= 2003000
0 /* tx_packet_size v2.3.0 */ WS_TX_BUF_SIZE /* tx_packet_size v2.3.0 */
# endif # endif
#endif #endif
}, },
@ -115,7 +121,7 @@ static struct libwebsocket_protocols protocols[] = {
#ifdef LWS_LIBRARY_VERSION_NUMBER #ifdef LWS_LIBRARY_VERSION_NUMBER
NULL, /* user v1.4 on */ NULL, /* user v1.4 on */
# if LWS_LIBRARY_VERSION_NUMBER >= 2003000 # if LWS_LIBRARY_VERSION_NUMBER >= 2003000
0 /* tx_packet_size v2.3.0 */ WS_TX_BUF_SIZE /* tx_packet_size v2.3.0 */
# endif # endif
#endif #endif
}, },
@ -133,7 +139,7 @@ static struct libwebsocket_protocols protocols[] = {
#ifdef LWS_LIBRARY_VERSION_NUMBER #ifdef LWS_LIBRARY_VERSION_NUMBER
NULL, /* user v1.4 on */ NULL, /* user v1.4 on */
# if LWS_LIBRARY_VERSION_NUMBER >= 2003000 # if LWS_LIBRARY_VERSION_NUMBER >= 2003000
0 /* tx_packet_size v2.3.0 */ WS_TX_BUF_SIZE /* tx_packet_size v2.3.0 */
# endif # endif
#endif #endif
}, },
@ -180,6 +186,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
struct mosquitto_db *db; struct mosquitto_db *db;
struct mosquitto *mosq = NULL; struct mosquitto *mosq = NULL;
struct mosquitto__packet *packet; struct mosquitto__packet *packet;
size_t txlen;
int count; int count;
const struct libwebsocket_protocols *p; const struct libwebsocket_protocols *p;
struct libws_mqtt_data *u = (struct libws_mqtt_data *)user; struct libws_mqtt_data *u = (struct libws_mqtt_data *)user;
@ -287,7 +294,12 @@ static int callback_mqtt(struct libwebsocket_context *context,
memmove(&packet->payload[LWS_SEND_BUFFER_PRE_PADDING], packet->payload, packet->packet_length); memmove(&packet->payload[LWS_SEND_BUFFER_PRE_PADDING], packet->payload, packet->packet_length);
packet->pos += LWS_SEND_BUFFER_PRE_PADDING; packet->pos += LWS_SEND_BUFFER_PRE_PADDING;
} }
count = libwebsocket_write(wsi, &packet->payload[packet->pos], packet->to_process, LWS_WRITE_BINARY); if(packet->to_process > WS_TX_BUF_SIZE){
txlen = WS_TX_BUF_SIZE;
}else{
txlen = packet->to_process;
}
count = libwebsocket_write(wsi, &packet->payload[packet->pos], txlen, LWS_WRITE_BINARY);
if(count < 0){ if(count < 0){
if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){ if (mosq->state == mosq_cs_disconnect_ws || mosq->state == mosq_cs_disconnecting){
return -1; return -1;
@ -757,6 +769,7 @@ struct libwebsocket_context *mosq_websockets_init(struct mosquitto__listener *li
} }
info.user = user; info.user = user;
info.pt_serv_buf_size = WS_SERV_BUF_SIZE;
listener->ws_protocol = p; listener->ws_protocol = p;
lws_set_log_level(conf->websockets_log_level, log_wrap); lws_set_log_level(conf->websockets_log_level, log_wrap);

Loading…
Cancel
Save