Function for checking if a context is connected.

pull/2215/head
Roger A. Light 4 years ago
parent ef7662ca8e
commit 8a03b5ad5c

@ -72,6 +72,12 @@ int mux__remove_out(struct mosquitto *mosq)
return 0;
}
bool net__is_connected(struct mosquitto *mosq)
{
UNUSED(mosq);
return false;
}
ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count)
{
UNUSED(mosq);

@ -188,7 +188,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL;
if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
if(properties){
if(properties->client_generated){
@ -245,7 +245,7 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou
if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
if(properties){
if(properties->client_generated){

@ -192,7 +192,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking)
message__reconnect_reset(mosq, false);
if(mosq->sock != INVALID_SOCKET){
if(net__is_connected(mosq)){
net__socket_close(mosq); //close socket
}
@ -258,7 +258,7 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu
}
mosquitto__set_state(mosq, mosq_cs_disconnected);
if(mosq->sock == INVALID_SOCKET){
if(!net__is_connected(mosq)){
return MOSQ_ERR_NO_CONN;
}else{
return send__disconnect(mosq, (uint8_t)reason_code, outgoing_properties);

@ -61,7 +61,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
FD_ZERO(&readfds);
FD_ZERO(&writefds);
if(mosq->sock != INVALID_SOCKET){
if(net__is_connected(mosq)){
maxfd = mosq->sock;
FD_SET(mosq->sock, &readfds);
pthread_mutex_lock(&mosq->current_out_packet_mutex);
@ -147,7 +147,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
return MOSQ_ERR_ERRNO;
}
}else{
if(mosq->sock != INVALID_SOCKET){
if(net__is_connected(mosq)){
if(FD_ISSET(mosq->sock, &readfds)){
rc = mosquitto_loop_read(mosq, max_packets);
if(rc || mosq->sock == INVALID_SOCKET){
@ -164,10 +164,12 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
/* Fake write possible, to stimulate output write even though
* we didn't ask for it, because at that point the publish or
* other command wasn't present. */
if(mosq->sock != INVALID_SOCKET)
if(net__is_connected(mosq)){
FD_SET(mosq->sock, &writefds);
}
}
if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
if(net__is_connected(mosq) && FD_ISSET(mosq->sock, &writefds)){
#ifdef WITH_TLS
if(mosq->want_connect){
rc = net__socket_connect_tls(mosq);
@ -176,7 +178,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
#endif
{
rc = mosquitto_loop_write(mosq, max_packets);
if(rc || mosq->sock == INVALID_SOCKET){
if(rc || !net__is_connected(mosq)){
return rc;
}
}
@ -333,7 +335,7 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
int mosquitto_loop_misc(struct mosquitto *mosq)
{
if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
return mosquitto__check_keepalive(mosq);
}
@ -410,4 +412,3 @@ int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
}
return rc;
}

@ -254,7 +254,7 @@ void mosquitto__destroy(struct mosquitto *mosq)
pthread_mutex_destroy(&mosq->mid_mutex);
}
#endif
if(mosq->sock != INVALID_SOCKET){
if(net__is_connected(mosq)){
net__socket_close(mosq);
}
message__cleanup_all(mosq);

@ -198,6 +198,16 @@ void net__init_tls(void)
}
#endif
bool net__is_connected(struct mosquitto *mosq)
{
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
return mosq->sock != INVALID_SOCKET || mosq->wsi != NULL;
#else
return mosq->sock != INVALID_SOCKET;
#endif
}
/* Close a socket associated with a context and set it to -1.
* Returns 1 on failure (context is NULL)
* Returns 0 on success.
@ -235,7 +245,7 @@ int net__socket_close(struct mosquitto *mosq)
}else
#endif
{
if(mosq->sock != INVALID_SOCKET){
if(net__is_connected(mosq)){
#ifdef WITH_BROKER
HASH_FIND(hh_sock, db.contexts_by_sock, &mosq->sock, sizeof(mosq->sock), mosq_found);
if(mosq_found){

@ -67,6 +67,7 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s
int net__socket_connect_step3(struct mosquitto *mosq, const char *host);
int net__socket_nonblock(mosq_sock_t *sock);
int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2);
bool net__is_connected(struct mosquitto *mosq);
ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count);
ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count);

@ -214,7 +214,7 @@ int packet__write(struct mosquitto *mosq)
enum mosquitto_client_state state;
if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);
@ -332,7 +332,7 @@ int packet__read(struct mosquitto *mosq)
if(!mosq){
return MOSQ_ERR_INVAL;
}
if(mosq->sock == INVALID_SOCKET){
if(!net__is_connected(mosq)){
return MOSQ_ERR_NO_CONN;
}

@ -55,11 +55,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
#endif
assert(mosq);
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
if(mosq->sock == INVALID_SOCKET && !mosq->wsi) return MOSQ_ERR_NO_CONN;
#else
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
#endif
if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
if(!mosq->retain_available){
retain = false;

@ -81,7 +81,7 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
#if defined(WITH_BROKER) && defined(WITH_BRIDGE)
/* Check if a lazy bridge should be timed out due to idle. */
if(mosq->bridge && mosq->bridge->start_type == bst_lazy
&& mosq->sock != INVALID_SOCKET
&& net__is_connected(mosq)
&& now - mosq->next_msg_out - mosq->keepalive >= mosq->bridge->idle_timeout){
log__printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id);
@ -93,7 +93,7 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
next_msg_out = mosq->next_msg_out;
last_msg_in = mosq->last_msg_in;
pthread_mutex_unlock(&mosq->msgtime_mutex);
if(mosq->keepalive && mosq->sock != INVALID_SOCKET &&
if(mosq->keepalive && net__is_connected(mosq) &&
(now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){
state = mosquitto__get_state(mosq);

@ -888,7 +888,7 @@ void bridge_check(void)
context = db.bridges[i];
if(context->sock != INVALID_SOCKET){
if(net__is_connected(context)){
mosquitto__check_keepalive(context);
bridge_check_pending(context);
@ -935,9 +935,7 @@ void bridge_check(void)
}
}
if(context->sock == INVALID_SOCKET){
if(!net__is_connected(context)){
if(reload_if_needed(context)) continue;
/* Want to try to restart the bridge connection */

@ -108,7 +108,7 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
adjust_count = msg_data->inflight_maximum;
/* nothing in flight for offline clients */
if(context->sock == INVALID_SOCKET){
if(!net__is_connected(context)){
adjust_bytes = 0;
adjust_count = 0;
}
@ -409,7 +409,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
}
}
}
if(context->sock == INVALID_SOCKET){
if(!net__is_connected(context)){
/* Client is not connected only queue messages with QoS>0. */
if(qos == 0 && !db.config->queue_qos0_messages){
if(!context->bridge){
@ -428,7 +428,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
}
}
if(context->sock != INVALID_SOCKET){
if(net__is_connected(context)){
if(db__ready_for_flight(msg_data, qos)){
if(dir == mosq_md_out){
switch(qos){
@ -541,7 +541,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m
}
#ifdef WITH_BRIDGE
if(context->bridge && context->bridge->start_type == bst_lazy
&& context->sock == INVALID_SOCKET
&& !net__is_connected(context)
&& context->msgs_out.msg_count >= context->bridge->threshold){
context->bridge->lazy_reconnect = true;
@ -1077,7 +1077,7 @@ int db__message_write_inflight_out_all(struct mosquitto *context)
struct mosquitto_client_msg *tail, *tmp;
int rc;
if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){
if(context->state != mosq_cs_active || !net__is_connected(context)){
return MOSQ_ERR_SUCCESS;
}
@ -1095,7 +1095,7 @@ int db__message_write_inflight_out_latest(struct mosquitto *context)
int rc;
if(context->state != mosq_cs_active
|| context->sock == INVALID_SOCKET
|| !net__is_connected(context)
|| context->msgs_out.inflight == NULL){
return MOSQ_ERR_SUCCESS;

@ -118,7 +118,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
HASH_FIND(hh_id, db.contexts_by_id, context->id, strlen(context->id), found_context);
if(found_context){
/* Found a matching client */
if(found_context->sock == INVALID_SOCKET){
if(!net__is_connected(found_context)){
/* Client is reconnecting after a disconnect */
/* FIXME - does anything need to be done here? */
}else{

@ -42,7 +42,7 @@ void keepalive__check(void)
/* FIXME - this needs replacing with something more efficient */
HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
if(context->sock != INVALID_SOCKET){
if(net__is_connected(context)){
/* Local bridges never time out in this fashion. */
if(!(context->keepalive)
|| context->bridge

@ -114,6 +114,12 @@ time_t mosquitto_time(void)
return 123;
}
bool net__is_connected(struct mosquitto *mosq)
{
UNUSED(mosq);
return false;
}
int net__socket_close(struct mosquitto *mosq)
{
UNUSED(mosq);

@ -34,6 +34,12 @@ time_t mosquitto_time(void)
return 123;
}
bool net__is_connected(struct mosquitto *mosq)
{
UNUSED(mosq);
return false;
}
int net__socket_close(struct mosquitto *mosq)
{
UNUSED(mosq);

@ -21,6 +21,12 @@ time_t mosquitto_time(void)
return 123;
}
bool net__is_connected(struct mosquitto *mosq)
{
UNUSED(mosq);
return false;
}
int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq)
{
UNUSED(db);

@ -72,6 +72,12 @@ int acl__find_acls(struct mosquitto *context)
#endif
bool net__is_connected(struct mosquitto *mosq)
{
UNUSED(mosq);
return false;
}
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval)
{
UNUSED(mosq);

Loading…
Cancel
Save