|
|
|
@ -58,25 +58,36 @@ static bool db__ready_for_flight(struct mosquitto *context, int qos)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* For a given client context, are more messages allowed to be queued?
|
|
|
|
|
* It is assumed that inflight checks and queue_qos0 checks have already
|
|
|
|
|
* been made.
|
|
|
|
|
* @param context client of interest
|
|
|
|
|
* @param qos destination qos for the packet of interest
|
|
|
|
|
* @return true if queuing is allowed, false if should be dropped
|
|
|
|
|
*/
|
|
|
|
|
static bool db__ready_for_queue(struct mosquitto *context)
|
|
|
|
|
static bool db__ready_for_queue(struct mosquitto *context, int qos)
|
|
|
|
|
{
|
|
|
|
|
if(max_queued == 0 && max_queued_bytes == 0){
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
unsigned long source_bytes = context->msg_bytes12;
|
|
|
|
|
int source_count = context->msg_count12;
|
|
|
|
|
unsigned long adjust_bytes = max_inflight_bytes;
|
|
|
|
|
int adjust_count = max_inflight;
|
|
|
|
|
|
|
|
|
|
/* nothing in flight for offline clients */
|
|
|
|
|
if(context->sock == INVALID_SOCKET){
|
|
|
|
|
adjust_bytes = 0;
|
|
|
|
|
adjust_count = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes;
|
|
|
|
|
bool valid_count = context->msg_count12 - adjust_count < max_queued;
|
|
|
|
|
if(qos == 0){
|
|
|
|
|
source_bytes = context->msg_bytes;
|
|
|
|
|
source_count = context->msg_count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool valid_bytes = source_bytes - adjust_bytes < max_queued_bytes;
|
|
|
|
|
bool valid_count = source_count - adjust_count < max_queued;
|
|
|
|
|
|
|
|
|
|
if(max_queued_bytes == 0){
|
|
|
|
|
return valid_count;
|
|
|
|
@ -388,7 +399,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}else if(db__ready_for_queue(context)){
|
|
|
|
|
}else if(db__ready_for_queue(context, qos)){
|
|
|
|
|
state = mosq_ms_queued;
|
|
|
|
|
rc = 2;
|
|
|
|
|
}else{
|
|
|
|
|