|
|
|
@ -18,6 +18,7 @@ Contributors:
|
|
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
|
#include <stdio.h>
|
|
|
|
|
#include <utlist.h>
|
|
|
|
|
|
|
|
|
|
#include "mosquitto_broker_internal.h"
|
|
|
|
|
#include "memory_mosq.h"
|
|
|
|
@ -248,39 +249,25 @@ void db__msg_store_compact(struct mosquitto_db *db)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
|
|
|
|
|
static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **head, struct mosquitto_client_msg *item)
|
|
|
|
|
{
|
|
|
|
|
if(!context || !msg || !(*msg)){
|
|
|
|
|
if(!context || !head || !item){
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if((*msg)->store){
|
|
|
|
|
DL_DELETE(*head, item);
|
|
|
|
|
|
|
|
|
|
if(item->store){
|
|
|
|
|
context->msg_count--;
|
|
|
|
|
context->msg_bytes -= (*msg)->store->payloadlen;
|
|
|
|
|
if((*msg)->qos > 0){
|
|
|
|
|
context->msg_bytes -= item->store->payloadlen;
|
|
|
|
|
if(item->qos > 0){
|
|
|
|
|
context->msg_count12--;
|
|
|
|
|
context->msg_bytes12 -= (*msg)->store->payloadlen;
|
|
|
|
|
}
|
|
|
|
|
db__msg_store_deref(db, &(*msg)->store);
|
|
|
|
|
}
|
|
|
|
|
if(last){
|
|
|
|
|
last->next = (*msg)->next;
|
|
|
|
|
if(!last->next){
|
|
|
|
|
context->last_inflight_msg = last;
|
|
|
|
|
context->msg_bytes12 -= item->store->payloadlen;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
context->inflight_msgs = (*msg)->next;
|
|
|
|
|
if(!context->inflight_msgs){
|
|
|
|
|
context->last_inflight_msg = NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
mosquitto_property_free_all(&(*msg)->properties);
|
|
|
|
|
mosquitto__free(*msg);
|
|
|
|
|
if(last){
|
|
|
|
|
*msg = last->next;
|
|
|
|
|
}else{
|
|
|
|
|
*msg = context->inflight_msgs;
|
|
|
|
|
db__msg_store_deref(db, &item->store);
|
|
|
|
|
}
|
|
|
|
|
mosquitto_property_free_all(&item->properties);
|
|
|
|
|
mosquitto__free(item);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void db__message_dequeue_first(struct mosquitto *context)
|
|
|
|
@ -288,30 +275,18 @@ void db__message_dequeue_first(struct mosquitto *context)
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
context->queued_msgs = msg->next;
|
|
|
|
|
if (context->last_queued_msg == msg){
|
|
|
|
|
context->last_queued_msg = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (context->last_inflight_msg){
|
|
|
|
|
context->last_inflight_msg->next = msg;
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
}else{
|
|
|
|
|
context->inflight_msgs = msg;
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
}
|
|
|
|
|
msg->next = NULL;
|
|
|
|
|
DL_DELETE(context->queued_msgs, msg);
|
|
|
|
|
DL_APPEND(context->inflight_msgs, msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state expect_state, int qos)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
int msg_index = 0;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){
|
|
|
|
|
msg_index++;
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
if(tail->qos != qos){
|
|
|
|
@ -320,15 +295,16 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
return MOSQ_ERR_PROTOCOL;
|
|
|
|
|
}
|
|
|
|
|
msg_index--;
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
}else{
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
while (context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){
|
|
|
|
|
if(context->send_maximum != 0 && msg_index >= context->send_maximum){
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msg_index++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
@ -358,7 +334,6 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto_client_msg **msgs, **last_msg;
|
|
|
|
|
enum mosquitto_msg_state state = mosq_ms_invalid;
|
|
|
|
|
int rc = 0;
|
|
|
|
|
int i;
|
|
|
|
@ -463,6 +438,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
|
|
|
|
|
msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
|
|
|
|
|
if(!msg) return MOSQ_ERR_NOMEM;
|
|
|
|
|
msg->prev = NULL;
|
|
|
|
|
msg->next = NULL;
|
|
|
|
|
msg->store = stored;
|
|
|
|
|
msg->store->ref_count++;
|
|
|
|
@ -480,18 +456,9 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
msg->properties = properties;
|
|
|
|
|
|
|
|
|
|
if (state == mosq_ms_queued){
|
|
|
|
|
msgs = &(context->queued_msgs);
|
|
|
|
|
last_msg = &(context->last_queued_msg);
|
|
|
|
|
}else{
|
|
|
|
|
msgs = &(context->inflight_msgs);
|
|
|
|
|
last_msg = &(context->last_inflight_msg);
|
|
|
|
|
}
|
|
|
|
|
if(*last_msg){
|
|
|
|
|
(*last_msg)->next = msg;
|
|
|
|
|
(*last_msg) = msg;
|
|
|
|
|
DL_APPEND(context->queued_msgs, msg);
|
|
|
|
|
}else{
|
|
|
|
|
*msgs = msg;
|
|
|
|
|
*last_msg = msg;
|
|
|
|
|
DL_APPEND(context->inflight_msgs, msg);
|
|
|
|
|
}
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
context->msg_bytes += msg->store->payloadlen;
|
|
|
|
@ -544,8 +511,7 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH(context->inflight_msgs, tail){
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
if(tail->qos != qos){
|
|
|
|
|
return MOSQ_ERR_PROTOCOL;
|
|
|
|
@ -561,31 +527,25 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m
|
|
|
|
|
|
|
|
|
|
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *next;
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){
|
|
|
|
|
DL_DELETE(context->inflight_msgs, tail);
|
|
|
|
|
db__msg_store_deref(db, &tail->store);
|
|
|
|
|
next = tail->next;
|
|
|
|
|
mosquitto_property_free_all(&tail->properties);
|
|
|
|
|
mosquitto__free(tail);
|
|
|
|
|
tail = next;
|
|
|
|
|
}
|
|
|
|
|
context->inflight_msgs = NULL;
|
|
|
|
|
context->last_inflight_msg = NULL;
|
|
|
|
|
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){
|
|
|
|
|
DL_DELETE(context->queued_msgs, tail);
|
|
|
|
|
db__msg_store_deref(db, &tail->store);
|
|
|
|
|
next = tail->next;
|
|
|
|
|
mosquitto_property_free_all(&tail->properties);
|
|
|
|
|
mosquitto__free(tail);
|
|
|
|
|
tail = next;
|
|
|
|
|
}
|
|
|
|
|
context->queued_msgs = NULL;
|
|
|
|
|
context->last_queued_msg = NULL;
|
|
|
|
|
context->msg_bytes = 0;
|
|
|
|
|
context->msg_bytes12 = 0;
|
|
|
|
|
context->msg_count = 0;
|
|
|
|
@ -729,22 +689,18 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
*stored = NULL;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH(context->inflight_msgs, tail){
|
|
|
|
|
if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
|
|
|
|
|
*stored = tail->store;
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH(context->queued_msgs, tail){
|
|
|
|
|
if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
|
|
|
|
|
*stored = tail->store;
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
@ -754,17 +710,14 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu
|
|
|
|
|
* retry, and to set incoming messages to expect an appropriate retry. */
|
|
|
|
|
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto_client_msg *prev = NULL;
|
|
|
|
|
struct mosquitto_client_msg *msg, *tmp;
|
|
|
|
|
|
|
|
|
|
msg = context->inflight_msgs;
|
|
|
|
|
context->msg_bytes = 0;
|
|
|
|
|
context->msg_bytes12 = 0;
|
|
|
|
|
context->msg_count = 0;
|
|
|
|
|
context->msg_count12 = 0;
|
|
|
|
|
while(msg){
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(context->inflight_msgs, msg, tmp){
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
context->msg_bytes += msg->store->payloadlen;
|
|
|
|
|
if(msg->qos > 0){
|
|
|
|
@ -792,14 +745,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
if(msg->qos != 2){
|
|
|
|
|
/* Anything <QoS 2 can be completely retried by the client at
|
|
|
|
|
* no harm. */
|
|
|
|
|
db__message_remove(db, context, &msg, prev);
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, msg);
|
|
|
|
|
}else{
|
|
|
|
|
/* Message state can be preserved here because it should match
|
|
|
|
|
* whatever the client has got. */
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
prev = msg;
|
|
|
|
|
if(msg) msg = msg->next;
|
|
|
|
|
}
|
|
|
|
|
/* Messages received when the client was disconnected are put
|
|
|
|
|
* in the mosq_ms_queued state. If we don't change them to the
|
|
|
|
@ -807,34 +758,26 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
* get sent until the client next receives a message - and they
|
|
|
|
|
* will be sent out of order.
|
|
|
|
|
*/
|
|
|
|
|
if(context->queued_msgs){
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
while(msg){
|
|
|
|
|
context->last_queued_msg = msg;
|
|
|
|
|
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
context->msg_bytes += msg->store->payloadlen;
|
|
|
|
|
if(msg->qos > 0){
|
|
|
|
|
context->msg_count12++;
|
|
|
|
|
context->msg_bytes12 += msg->store->payloadlen;
|
|
|
|
|
}
|
|
|
|
|
if (db__ready_for_flight(context, msg->qos)) {
|
|
|
|
|
switch(msg->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
} else {
|
|
|
|
|
msg = msg->next;
|
|
|
|
|
DL_FOREACH_SAFE(context->queued_msgs, msg, tmp){
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
context->msg_bytes += msg->store->payloadlen;
|
|
|
|
|
if(msg->qos > 0){
|
|
|
|
|
context->msg_count12++;
|
|
|
|
|
context->msg_bytes12 += msg->store->payloadlen;
|
|
|
|
|
}
|
|
|
|
|
if(db__ready_for_flight(context, msg->qos)) {
|
|
|
|
|
switch(msg->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -844,7 +787,7 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
|
|
|
|
|
int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
int retain;
|
|
|
|
|
char *topic;
|
|
|
|
|
char *source_id;
|
|
|
|
@ -853,8 +796,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){
|
|
|
|
|
msg_index++;
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
if(tail->store->qos != 2){
|
|
|
|
@ -869,20 +811,20 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
|
|
|
|
|
* keep resending it. That means we don't send it to other
|
|
|
|
|
* clients. */
|
|
|
|
|
if(!topic || !sub__messages_queue(db, source_id, topic, 2, retain, &tail->store)){
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
deleted = true;
|
|
|
|
|
}else{
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while(context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){
|
|
|
|
|
DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){
|
|
|
|
|
if(context->send_maximum != 0 && msg_index >= context->send_maximum){
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msg_index++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
@ -915,7 +857,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
|
|
|
|
|
int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
int rc;
|
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL, *tmp;
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
uint16_t mid;
|
|
|
|
|
int retries;
|
|
|
|
|
int retain;
|
|
|
|
@ -937,8 +879,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
DL_FOREACH_SAFE(context->inflight_msgs, tail, tmp){
|
|
|
|
|
msg_count++;
|
|
|
|
|
expiry_interval = 0;
|
|
|
|
|
if(tail->store->message_expiry_time){
|
|
|
|
@ -947,7 +888,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
}
|
|
|
|
|
if(now > tail->store->message_expiry_time){
|
|
|
|
|
/* Message is expired, must not send. */
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
continue;
|
|
|
|
|
}else{
|
|
|
|
|
expiry_interval = tail->store->message_expiry_time - now;
|
|
|
|
@ -967,7 +908,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
case mosq_ms_publish_qos0:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
|
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -979,13 +920,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_puback;
|
|
|
|
|
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
tmp = tail->next;
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
tail = tmp;
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -997,13 +933,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrec;
|
|
|
|
|
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
tmp = tail->next;
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
tail = tmp;
|
|
|
|
|
db__message_remove(db, context, &context->inflight_msgs, tail);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -1016,8 +947,6 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubrel:
|
|
|
|
@ -1027,8 +956,6 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubcomp:
|
|
|
|
@ -1038,20 +965,24 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
case mosq_ms_invalid:
|
|
|
|
|
case mosq_ms_wait_for_puback:
|
|
|
|
|
case mosq_ms_wait_for_pubrec:
|
|
|
|
|
case mosq_ms_wait_for_pubrel:
|
|
|
|
|
case mosq_ms_wait_for_pubcomp:
|
|
|
|
|
case mosq_ms_queued:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while(context->queued_msgs && (context->send_maximum == 0 || msg_count < context->send_maximum)){
|
|
|
|
|
DL_FOREACH_SAFE(context->queued_msgs, tail, tmp){
|
|
|
|
|
if(context->send_maximum != 0 && msg_count >= context->send_maximum){
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
msg_count++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|