From 8a03b5ad5c880601d312b3d5cf0643ed08ae5c27 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Sun, 2 May 2021 23:15:07 +0100 Subject: [PATCH] Function for checking if a context is connected. --- apps/db_dump/stubs.c | 6 ++++++ lib/actions.c | 4 ++-- lib/connect.c | 4 ++-- lib/loop.c | 15 ++++++++------- lib/mosquitto.c | 2 +- lib/net_mosq.c | 12 +++++++++++- lib/net_mosq.h | 1 + lib/packet_mosq.c | 4 ++-- lib/send_publish.c | 6 +----- lib/util_mosq.c | 4 ++-- src/bridge.c | 6 ++---- src/database.c | 12 ++++++------ src/handle_connect.c | 2 +- src/keepalive.c | 2 +- test/unit/persist_read_stubs.c | 6 ++++++ test/unit/persist_write_stubs.c | 6 ++++++ test/unit/stubs.c | 6 ++++++ test/unit/subs_stubs.c | 6 ++++++ 18 files changed, 70 insertions(+), 34 deletions(-) diff --git a/apps/db_dump/stubs.c b/apps/db_dump/stubs.c index 42a14291..f9cfff9d 100644 --- a/apps/db_dump/stubs.c +++ b/apps/db_dump/stubs.c @@ -72,6 +72,12 @@ int mux__remove_out(struct mosquitto *mosq) return 0; } +bool net__is_connected(struct mosquitto *mosq) +{ + UNUSED(mosq); + return false; +} + ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count) { UNUSED(mosq); diff --git a/lib/actions.c b/lib/actions.c index db66f8d5..dacc5642 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -188,7 +188,7 @@ int mosquitto_subscribe_multiple(struct mosquitto *mosq, int *mid, int sub_count if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; if(qos < 0 || qos > 2) return MOSQ_ERR_INVAL; if((options & 0x30) == 0x30 || (options & 0xC0) != 0) return MOSQ_ERR_INVAL; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN; if(properties){ if(properties->client_generated){ @@ -245,7 +245,7 @@ int mosquitto_unsubscribe_multiple(struct mosquitto *mosq, int *mid, int sub_cou if(!mosq) return MOSQ_ERR_INVAL; if(mosq->protocol != mosq_p_mqtt5 && properties) return MOSQ_ERR_NOT_SUPPORTED; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN; if(properties){ if(properties->client_generated){ diff --git a/lib/connect.c b/lib/connect.c index af4743cf..d5eb4860 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -192,7 +192,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking) message__reconnect_reset(mosq, false); - if(mosq->sock != INVALID_SOCKET){ + if(net__is_connected(mosq)){ net__socket_close(mosq); //close socket } @@ -258,7 +258,7 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu } mosquitto__set_state(mosq, mosq_cs_disconnected); - if(mosq->sock == INVALID_SOCKET){ + if(!net__is_connected(mosq)){ return MOSQ_ERR_NO_CONN; }else{ return send__disconnect(mosq, (uint8_t)reason_code, outgoing_properties); diff --git a/lib/loop.c b/lib/loop.c index 05c46234..b07e2cb1 100644 --- a/lib/loop.c +++ b/lib/loop.c @@ -61,7 +61,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) FD_ZERO(&readfds); FD_ZERO(&writefds); - if(mosq->sock != INVALID_SOCKET){ + if(net__is_connected(mosq)){ maxfd = mosq->sock; FD_SET(mosq->sock, &readfds); pthread_mutex_lock(&mosq->current_out_packet_mutex); @@ -147,7 +147,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) return MOSQ_ERR_ERRNO; } }else{ - if(mosq->sock != INVALID_SOCKET){ + if(net__is_connected(mosq)){ if(FD_ISSET(mosq->sock, &readfds)){ rc = mosquitto_loop_read(mosq, max_packets); if(rc || mosq->sock == INVALID_SOCKET){ @@ -164,10 +164,12 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) /* Fake write possible, to stimulate output write even though * we didn't ask for it, because at that point the publish or * other command wasn't present. */ - if(mosq->sock != INVALID_SOCKET) + if(net__is_connected(mosq)){ FD_SET(mosq->sock, &writefds); + } } - if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){ + + if(net__is_connected(mosq) && FD_ISSET(mosq->sock, &writefds)){ #ifdef WITH_TLS if(mosq->want_connect){ rc = net__socket_connect_tls(mosq); @@ -176,7 +178,7 @@ int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) #endif { rc = mosquitto_loop_write(mosq, max_packets); - if(rc || mosq->sock == INVALID_SOCKET){ + if(rc || !net__is_connected(mosq)){ return rc; } } @@ -333,7 +335,7 @@ int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) int mosquitto_loop_misc(struct mosquitto *mosq) { if(!mosq) return MOSQ_ERR_INVAL; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN; return mosquitto__check_keepalive(mosq); } @@ -410,4 +412,3 @@ int mosquitto_loop_write(struct mosquitto *mosq, int max_packets) } return rc; } - diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 1b69102c..be98c3bb 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -254,7 +254,7 @@ void mosquitto__destroy(struct mosquitto *mosq) pthread_mutex_destroy(&mosq->mid_mutex); } #endif - if(mosq->sock != INVALID_SOCKET){ + if(net__is_connected(mosq)){ net__socket_close(mosq); } message__cleanup_all(mosq); diff --git a/lib/net_mosq.c b/lib/net_mosq.c index 16d0e8c9..a18d8d67 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -198,6 +198,16 @@ void net__init_tls(void) } #endif +bool net__is_connected(struct mosquitto *mosq) +{ +#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) + return mosq->sock != INVALID_SOCKET || mosq->wsi != NULL; +#else + return mosq->sock != INVALID_SOCKET; +#endif +} + + /* Close a socket associated with a context and set it to -1. * Returns 1 on failure (context is NULL) * Returns 0 on success. @@ -235,7 +245,7 @@ int net__socket_close(struct mosquitto *mosq) }else #endif { - if(mosq->sock != INVALID_SOCKET){ + if(net__is_connected(mosq)){ #ifdef WITH_BROKER HASH_FIND(hh_sock, db.contexts_by_sock, &mosq->sock, sizeof(mosq->sock), mosq_found); if(mosq_found){ diff --git a/lib/net_mosq.h b/lib/net_mosq.h index 30342179..021b2bff 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -67,6 +67,7 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s int net__socket_connect_step3(struct mosquitto *mosq, const char *host); int net__socket_nonblock(mosq_sock_t *sock); int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2); +bool net__is_connected(struct mosquitto *mosq); ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count); ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count); diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 1e564678..78783a52 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -214,7 +214,7 @@ int packet__write(struct mosquitto *mosq) enum mosquitto_client_state state; if(!mosq) return MOSQ_ERR_INVAL; - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN; pthread_mutex_lock(&mosq->current_out_packet_mutex); pthread_mutex_lock(&mosq->out_packet_mutex); @@ -332,7 +332,7 @@ int packet__read(struct mosquitto *mosq) if(!mosq){ return MOSQ_ERR_INVAL; } - if(mosq->sock == INVALID_SOCKET){ + if(!net__is_connected(mosq)){ return MOSQ_ERR_NO_CONN; } diff --git a/lib/send_publish.c b/lib/send_publish.c index 1957fc09..4ba871f7 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -55,11 +55,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 #endif assert(mosq); -#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) - if(mosq->sock == INVALID_SOCKET && !mosq->wsi) return MOSQ_ERR_NO_CONN; -#else - if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; -#endif + if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN; if(!mosq->retain_available){ retain = false; diff --git a/lib/util_mosq.c b/lib/util_mosq.c index e3a21e1c..27d0625e 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -81,7 +81,7 @@ int mosquitto__check_keepalive(struct mosquitto *mosq) #if defined(WITH_BROKER) && defined(WITH_BRIDGE) /* Check if a lazy bridge should be timed out due to idle. */ if(mosq->bridge && mosq->bridge->start_type == bst_lazy - && mosq->sock != INVALID_SOCKET + && net__is_connected(mosq) && now - mosq->next_msg_out - mosq->keepalive >= mosq->bridge->idle_timeout){ log__printf(NULL, MOSQ_LOG_NOTICE, "Bridge connection %s has exceeded idle timeout, disconnecting.", mosq->id); @@ -93,7 +93,7 @@ int mosquitto__check_keepalive(struct mosquitto *mosq) next_msg_out = mosq->next_msg_out; last_msg_in = mosq->last_msg_in; pthread_mutex_unlock(&mosq->msgtime_mutex); - if(mosq->keepalive && mosq->sock != INVALID_SOCKET && + if(mosq->keepalive && net__is_connected(mosq) && (now >= next_msg_out || now - last_msg_in >= mosq->keepalive)){ state = mosquitto__get_state(mosq); diff --git a/src/bridge.c b/src/bridge.c index 4825850a..18e0cee0 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -888,7 +888,7 @@ void bridge_check(void) context = db.bridges[i]; - if(context->sock != INVALID_SOCKET){ + if(net__is_connected(context)){ mosquitto__check_keepalive(context); bridge_check_pending(context); @@ -935,9 +935,7 @@ void bridge_check(void) } } - - - if(context->sock == INVALID_SOCKET){ + if(!net__is_connected(context)){ if(reload_if_needed(context)) continue; /* Want to try to restart the bridge connection */ diff --git a/src/database.c b/src/database.c index 9ac0ac63..5d3932ba 100644 --- a/src/database.c +++ b/src/database.c @@ -108,7 +108,7 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms adjust_count = msg_data->inflight_maximum; /* nothing in flight for offline clients */ - if(context->sock == INVALID_SOCKET){ + if(!net__is_connected(context)){ adjust_bytes = 0; adjust_count = 0; } @@ -409,7 +409,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m } } } - if(context->sock == INVALID_SOCKET){ + if(!net__is_connected(context)){ /* Client is not connected only queue messages with QoS>0. */ if(qos == 0 && !db.config->queue_qos0_messages){ if(!context->bridge){ @@ -428,7 +428,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m } } - if(context->sock != INVALID_SOCKET){ + if(net__is_connected(context)){ if(db__ready_for_flight(msg_data, qos)){ if(dir == mosq_md_out){ switch(qos){ @@ -541,7 +541,7 @@ int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_m } #ifdef WITH_BRIDGE if(context->bridge && context->bridge->start_type == bst_lazy - && context->sock == INVALID_SOCKET + && !net__is_connected(context) && context->msgs_out.msg_count >= context->bridge->threshold){ context->bridge->lazy_reconnect = true; @@ -1077,7 +1077,7 @@ int db__message_write_inflight_out_all(struct mosquitto *context) struct mosquitto_client_msg *tail, *tmp; int rc; - if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){ + if(context->state != mosq_cs_active || !net__is_connected(context)){ return MOSQ_ERR_SUCCESS; } @@ -1095,7 +1095,7 @@ int db__message_write_inflight_out_latest(struct mosquitto *context) int rc; if(context->state != mosq_cs_active - || context->sock == INVALID_SOCKET + || !net__is_connected(context) || context->msgs_out.inflight == NULL){ return MOSQ_ERR_SUCCESS; diff --git a/src/handle_connect.c b/src/handle_connect.c index 28c6f399..ecd7ade8 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -118,7 +118,7 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 HASH_FIND(hh_id, db.contexts_by_id, context->id, strlen(context->id), found_context); if(found_context){ /* Found a matching client */ - if(found_context->sock == INVALID_SOCKET){ + if(!net__is_connected(found_context)){ /* Client is reconnecting after a disconnect */ /* FIXME - does anything need to be done here? */ }else{ diff --git a/src/keepalive.c b/src/keepalive.c index 89d5c50b..2e280e93 100644 --- a/src/keepalive.c +++ b/src/keepalive.c @@ -42,7 +42,7 @@ void keepalive__check(void) /* FIXME - this needs replacing with something more efficient */ HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ - if(context->sock != INVALID_SOCKET){ + if(net__is_connected(context)){ /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index b0b30d7b..b58be660 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -114,6 +114,12 @@ time_t mosquitto_time(void) return 123; } +bool net__is_connected(struct mosquitto *mosq) +{ + UNUSED(mosq); + return false; +} + int net__socket_close(struct mosquitto *mosq) { UNUSED(mosq); diff --git a/test/unit/persist_write_stubs.c b/test/unit/persist_write_stubs.c index b2cae03c..9778ac6d 100644 --- a/test/unit/persist_write_stubs.c +++ b/test/unit/persist_write_stubs.c @@ -34,6 +34,12 @@ time_t mosquitto_time(void) return 123; } +bool net__is_connected(struct mosquitto *mosq) +{ + UNUSED(mosq); + return false; +} + int net__socket_close(struct mosquitto *mosq) { UNUSED(mosq); diff --git a/test/unit/stubs.c b/test/unit/stubs.c index 432c118e..3b7c8317 100644 --- a/test/unit/stubs.c +++ b/test/unit/stubs.c @@ -21,6 +21,12 @@ time_t mosquitto_time(void) return 123; } +bool net__is_connected(struct mosquitto *mosq) +{ + UNUSED(mosq); + return false; +} + int net__socket_close(struct mosquitto_db *db, struct mosquitto *mosq) { UNUSED(db); diff --git a/test/unit/subs_stubs.c b/test/unit/subs_stubs.c index 0815e4a9..d85a8cca 100644 --- a/test/unit/subs_stubs.c +++ b/test/unit/subs_stubs.c @@ -72,6 +72,12 @@ int acl__find_acls(struct mosquitto *context) #endif +bool net__is_connected(struct mosquitto *mosq) +{ + UNUSED(mosq); + return false; +} + int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval) { UNUSED(mosq);