|
|
|
@ -54,8 +54,6 @@ POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
#define WS_SERV_BUF_SIZE 4096
|
|
|
|
|
#define WS_TX_BUF_SIZE (WS_SERV_BUF_SIZE*2)
|
|
|
|
|
|
|
|
|
|
extern struct mosquitto_db int_db;
|
|
|
|
|
|
|
|
|
|
#if defined(LWS_LIBRARY_VERSION_NUMBER)
|
|
|
|
|
static int callback_mqtt(
|
|
|
|
|
#else
|
|
|
|
@ -184,7 +182,6 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
void *in,
|
|
|
|
|
size_t len)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_db *db;
|
|
|
|
|
struct mosquitto *mosq = NULL;
|
|
|
|
|
struct mosquitto__packet *packet;
|
|
|
|
|
size_t txlen;
|
|
|
|
@ -197,11 +194,9 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
int rc;
|
|
|
|
|
uint8_t byte;
|
|
|
|
|
|
|
|
|
|
db = &int_db;
|
|
|
|
|
|
|
|
|
|
switch (reason) {
|
|
|
|
|
case LWS_CALLBACK_ESTABLISHED:
|
|
|
|
|
mosq = context__init(db, WEBSOCKET_CLIENT);
|
|
|
|
|
mosq = context__init(WEBSOCKET_CLIENT);
|
|
|
|
|
if(mosq){
|
|
|
|
|
p = libwebsockets_get_protocol(wsi);
|
|
|
|
|
mosq->listener = p->user;
|
|
|
|
@ -233,7 +228,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
if(mosq->listener->max_connections > 0 && mosq->listener->client_count > mosq->listener->max_connections){
|
|
|
|
|
if(db->config->connection_messages == true){
|
|
|
|
|
if(db.config->connection_messages == true){
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_NOTICE, "Client connection from %s denied: max_connections exceeded.", mosq->address);
|
|
|
|
|
}
|
|
|
|
|
mosquitto__free(mosq->address);
|
|
|
|
@ -242,8 +237,8 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
mosq->sock = libwebsocket_get_socket_fd(wsi);
|
|
|
|
|
HASH_ADD(hh_sock, db->contexts_by_sock, sock, sizeof(mosq->sock), mosq);
|
|
|
|
|
mux__add_in(db, mosq);
|
|
|
|
|
HASH_ADD(hh_sock, db.contexts_by_sock, sock, sizeof(mosq->sock), mosq);
|
|
|
|
|
mux__add_in(mosq);
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case LWS_CALLBACK_CLOSED:
|
|
|
|
@ -253,16 +248,16 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
mosq = u->mosq;
|
|
|
|
|
if(mosq){
|
|
|
|
|
if(mosq->sock != INVALID_SOCKET){
|
|
|
|
|
HASH_DELETE(hh_sock, db->contexts_by_sock, mosq);
|
|
|
|
|
HASH_DELETE(hh_sock, db.contexts_by_sock, mosq);
|
|
|
|
|
mosq->sock = INVALID_SOCKET;
|
|
|
|
|
mosq->pollfd_index = -1;
|
|
|
|
|
mux__delete(db, mosq);
|
|
|
|
|
mux__delete(mosq);
|
|
|
|
|
}
|
|
|
|
|
mosq->wsi = NULL;
|
|
|
|
|
#ifdef WITH_TLS
|
|
|
|
|
mosq->ssl = NULL;
|
|
|
|
|
#endif
|
|
|
|
|
do_disconnect(db, mosq, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
do_disconnect(mosq, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
@ -275,9 +270,9 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rc = db__message_write_inflight_out_latest(db, mosq);
|
|
|
|
|
rc = db__message_write_inflight_out_latest(mosq);
|
|
|
|
|
if(rc) return -1;
|
|
|
|
|
rc = db__message_write_queued_out(db, mosq);
|
|
|
|
|
rc = db__message_write_queued_out(mosq);
|
|
|
|
|
if(rc) return -1;
|
|
|
|
|
|
|
|
|
|
if(mosq->out_packet && !mosq->current_out_packet){
|
|
|
|
@ -351,7 +346,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
packet__cleanup(packet);
|
|
|
|
|
mosquitto__free(packet);
|
|
|
|
|
|
|
|
|
|
mosq->next_msg_out = db->now_s + mosq->keepalive;
|
|
|
|
|
mosq->next_msg_out = db.now_s + mosq->keepalive;
|
|
|
|
|
}
|
|
|
|
|
if (mosq->state == mosq_cs_disconnect_ws
|
|
|
|
|
|| mosq->state == mosq_cs_disconnecting
|
|
|
|
@ -432,7 +427,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
G_PUB_MSGS_RECEIVED_INC(1);
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
rc = handle__packet(db, mosq);
|
|
|
|
|
rc = handle__packet(mosq);
|
|
|
|
|
|
|
|
|
|
/* Free data and reset values */
|
|
|
|
|
packet__cleanup(&mosq->in_packet);
|
|
|
|
@ -445,7 +440,7 @@ static int callback_mqtt(struct libwebsocket_context *context,
|
|
|
|
|
}
|
|
|
|
|
libwebsocket_callback_on_writable(mosq->ws_context, mosq->wsi);
|
|
|
|
|
} else if (rc) {
|
|
|
|
|
do_disconnect(db, mosq, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
do_disconnect(mosq, MOSQ_ERR_CONN_LOST);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -543,7 +538,6 @@ static int callback_http(struct libwebsocket_context *context,
|
|
|
|
|
char *filename_canonical;
|
|
|
|
|
unsigned char buf[4096];
|
|
|
|
|
struct stat filestat;
|
|
|
|
|
struct mosquitto_db *db = &int_db;
|
|
|
|
|
struct mosquitto *mosq;
|
|
|
|
|
struct lws_pollargs *pollargs = (struct lws_pollargs *)in;
|
|
|
|
|
|
|
|
|
@ -691,13 +685,13 @@ static int callback_http(struct libwebsocket_context *context,
|
|
|
|
|
case LWS_CALLBACK_ADD_POLL_FD:
|
|
|
|
|
case LWS_CALLBACK_DEL_POLL_FD:
|
|
|
|
|
case LWS_CALLBACK_CHANGE_MODE_POLL_FD:
|
|
|
|
|
HASH_FIND(hh_sock, db->contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq);
|
|
|
|
|
HASH_FIND(hh_sock, db.contexts_by_sock, &pollargs->fd, sizeof(pollargs->fd), mosq);
|
|
|
|
|
if(mosq){
|
|
|
|
|
if(pollargs->events & POLLOUT){
|
|
|
|
|
mux__add_out(db, mosq);
|
|
|
|
|
mux__add_out(mosq);
|
|
|
|
|
mosq->ws_want_write = true;
|
|
|
|
|
}else{
|
|
|
|
|
mux__remove_out(db, mosq);
|
|
|
|
|
mux__remove_out(mosq);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|