From 67c1d4453e7574b2f3f9caf86fd4d1c8b72220da Mon Sep 17 00:00:00 2001 From: Roger Light Date: Tue, 8 Jan 2019 12:27:19 +0000 Subject: [PATCH] Receive maximum support for clients. --- lib/connect.c | 3 ++- lib/handle_publish.c | 9 +++++++++ lib/mosquitto.c | 1 + lib/mosquitto.h | 11 +++++++++++ lib/mosquitto_internal.h | 2 ++ lib/options.c | 7 +++++++ lib/send_connect.c | 27 +++++++++++++++++++++------ lib/send_mosq.c | 7 +++++++ lib/util_mosq.c | 9 +++++++++ lib/util_mosq.h | 1 + test/mosq_test.py | 2 ++ 11 files changed, 72 insertions(+), 7 deletions(-) diff --git a/lib/connect.c b/lib/connect.c index 5beed13a..a08f7902 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -68,6 +68,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int } mosq->keepalive = keepalive; + mosq->receive_quota = mosq->receive_maximum; if(mosq->sockpairR != INVALID_SOCKET){ COMPAT_CLOSE(mosq->sockpairR); @@ -191,7 +192,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking, const mos mosq->ping_t = 0; packet__cleanup(&mosq->in_packet); - + pthread_mutex_lock(&mosq->current_out_packet_mutex); pthread_mutex_lock(&mosq->out_packet_mutex); diff --git a/lib/handle_publish.c b/lib/handle_publish.c index ed93703e..acebd90a 100644 --- a/lib/handle_publish.c +++ b/lib/handle_publish.c @@ -62,6 +62,15 @@ int handle__publish(struct mosquitto *mosq) } if(message->msg.qos > 0){ + if(mosq->protocol == mosq_p_mqtt5){ + if(mosq->receive_quota == 0){ + message__cleanup(&message); + /* FIXME - should send a DISCONNECT here */ + return MOSQ_ERR_PROTOCOL; + } + mosq->receive_quota--; + } + rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc){ message__cleanup(&message); diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 92c4e9ea..b07c69f8 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -147,6 +147,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st mosq->out_messages = NULL; mosq->out_messages_last = NULL; mosq->max_inflight_messages = 20; + mosq->receive_maximum = 20; mosq->will = NULL; mosq->on_connect = NULL; mosq->on_publish = NULL; diff --git a/lib/mosquitto.h b/lib/mosquitto.h index 742ef599..cadb2ee0 100644 --- a/lib/mosquitto.h +++ b/lib/mosquitto.h @@ -99,6 +99,7 @@ enum mosq_opt_t { MOSQ_OPT_PROTOCOL_VERSION = 1, MOSQ_OPT_SSL_CTX = 2, MOSQ_OPT_SSL_CTX_WITH_DEFAULTS = 3, + MOSQ_OPT_RECEIVE_MAXIMUM = 4, }; /* MQTT specification restricts client ids to a maximum of 23 characters */ @@ -1364,6 +1365,16 @@ libmosq_EXPORT int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t op * MQTT_PROTOCOL_V311, or MQTT_PROTOCOL_V5. Must be set before the * client connects. Defaults to MQTT_PROTOCOL_V311. * + * MOSQ_OPT_RECEIVE_MAXIMUM + * Value can be set between 1 and 65535 inclusive, and represents + * the maximum number of incoming QoS 1 and QoS 2 messages that this + * client wants to process at once. Defaults to 20. This option is + * not valid for MQTT v3.1 or v3.1.1 clients. + * Note that if the MQTT_PROP_RECEIVE_MAXIMUM property is in the + * proplist passed to mosquitto_connect_v5(), then that property + * will override this option. Using this option is the recommended + * method however. + * * MOSQ_OPT_SSL_CTX_WITH_DEFAULTS * If value is set to a non zero value, then the user specified * SSL_CTX passed in using MOSQ_OPT_SSL_CTX will have the default diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 5eff5d06..f62cf3d3 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -283,6 +283,8 @@ struct mosquitto { ares_channel achan; # endif #endif + int receive_quota; + int receive_maximum; int max_inflight_messages; #ifdef WITH_BROKER diff --git a/lib/options.c b/lib/options.c index c3208050..053a1e4b 100644 --- a/lib/options.c +++ b/lib/options.c @@ -315,6 +315,13 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val } break; + case MOSQ_OPT_RECEIVE_MAXIMUM: + if(value < 0 || value > 65535){ + return MOSQ_ERR_INVAL; + } + mosq->receive_maximum = value; + break; + case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS: #if defined(WITH_TLS) && OPENSSL_VERSION_NUMBER >= 0x10100000L if(value){ diff --git a/lib/send_connect.c b/lib/send_connect.c index 99778634..d0bd3cd3 100644 --- a/lib/send_connect.c +++ b/lib/send_connect.c @@ -41,7 +41,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session uint8_t version; char *clientid, *username, *password; int headerlen; - int proplen, varbytes; + int proplen = 0, will_proplen, varbytes; + mosquitto_property *local_props = NULL; + uint16_t receive_maximum; assert(mosq); @@ -64,9 +66,20 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session #endif if(mosq->protocol == mosq_p_mqtt5){ + /* Generate properties from options */ + if(!mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &receive_maximum, false)){ + rc = mosquitto_property_add_int16(&local_props, MQTT_PROP_RECEIVE_MAXIMUM, mosq->receive_maximum); + if(rc) return rc; + }else{ + mosq->receive_maximum = receive_maximum; + mosq->receive_quota = receive_maximum; + } + version = MQTT_PROTOCOL_V5; headerlen = 10; - proplen = property__get_length_all(properties); + proplen = 0; + proplen += property__get_length_all(properties); + proplen += property__get_length_all(local_props); varbytes = packet__varint_bytes(proplen); headerlen += proplen + varbytes; }else if(mosq->protocol == mosq_p_mqtt311){ @@ -93,9 +106,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session payloadlen += 2+strlen(mosq->will->msg.topic) + 2+mosq->will->msg.payloadlen; if(mosq->protocol == mosq_p_mqtt5){ - proplen = property__get_length_all(mosq->will->properties); - varbytes = packet__varint_bytes(proplen); - payloadlen += proplen + varbytes; + will_proplen = property__get_length_all(mosq->will->properties); + varbytes = packet__varint_bytes(will_proplen); + payloadlen += will_proplen + varbytes; } } if(username){ @@ -141,7 +154,9 @@ int send__connect(struct mosquitto *mosq, uint16_t keepalive, bool clean_session if(mosq->protocol == mosq_p_mqtt5){ /* Write properties */ - property__write_all(packet, properties, true); + packet__write_varint(packet, proplen); + property__write_all(packet, properties, false); + property__write_all(packet, local_props, false); } /* Payload */ diff --git a/lib/send_mosq.c b/lib/send_mosq.c index 8b54b996..5366fd1e 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -72,6 +72,7 @@ int send__puback(struct mosquitto *mosq, uint16_t mid) #else if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid); #endif + util__increment_receive_quota(mosq); /* We don't use Reason String or User Property yet. */ return send__command_with_mid(mosq, CMD_PUBACK, mid, false, 0, NULL); } @@ -83,6 +84,7 @@ int send__pubcomp(struct mosquitto *mosq, uint16_t mid) #else if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (Mid: %d)", mosq->id, mid); #endif + util__increment_receive_quota(mosq); /* We don't use Reason String or User Property yet. */ return send__command_with_mid(mosq, CMD_PUBCOMP, mid, false, 0, NULL); } @@ -95,6 +97,11 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid) #else if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid); #endif + /* FIXME - if rc >= 0x80 quota needs incrementing + if(rc >= 0x80){ + util__increment_receive_quota(mosq); + } + */ /* We don't use Reason String or User Property yet. */ return send__command_with_mid(mosq, CMD_PUBREC, mid, false, 0, NULL); } diff --git a/lib/util_mosq.c b/lib/util_mosq.c index 07ba6afb..1683a2e9 100644 --- a/lib/util_mosq.c +++ b/lib/util_mosq.c @@ -252,3 +252,12 @@ FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read) } #endif } + +void util__increment_receive_quota(struct mosquitto *mosq) +{ + if(mosq->protocol == mosq_p_mqtt5){ + if(mosq->receive_quota < mosq->receive_maximum){ + mosq->receive_quota++; + } + } +} diff --git a/lib/util_mosq.h b/lib/util_mosq.h index d94661e7..62a14f2b 100644 --- a/lib/util_mosq.h +++ b/lib/util_mosq.h @@ -37,4 +37,5 @@ FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read); int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len); #endif +void util__increment_receive_quota(struct mosquitto *mosq); #endif diff --git a/test/mosq_test.py b/test/mosq_test.py index abea7af5..cfa182f5 100644 --- a/test/mosq_test.py +++ b/test/mosq_test.py @@ -320,6 +320,8 @@ def gen_connect(client_id, clean_session=True, keepalive=60, username=None, pass connect_flags = connect_flags | 0x02 if proto_ver == 5: + properties += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20) + properties = mqtt5_props.prop_finalise(properties) if properties == "": properties = struct.pack("B", 0) remaining_length += len(properties)