|
|
|
@ -208,9 +208,6 @@ void db__msg_store_deref(struct mosquitto_db *db, struct mosquitto_msg_store **s
|
|
|
|
|
|
|
|
|
|
static void db__message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last)
|
|
|
|
|
{
|
|
|
|
|
int i;
|
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
|
|
|
|
|
if(!context || !msg || !(*msg)){
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -221,12 +218,12 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
|
|
|
|
|
if(last){
|
|
|
|
|
last->next = (*msg)->next;
|
|
|
|
|
if(!last->next){
|
|
|
|
|
context->last_msg = last;
|
|
|
|
|
context->last_inflight_msg = last;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
context->msgs = (*msg)->next;
|
|
|
|
|
if(!context->msgs){
|
|
|
|
|
context->last_msg = NULL;
|
|
|
|
|
context->inflight_msgs = (*msg)->next;
|
|
|
|
|
if(!context->inflight_msgs){
|
|
|
|
|
context->last_inflight_msg = NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
context->msg_count--;
|
|
|
|
@ -237,74 +234,71 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex
|
|
|
|
|
if(last){
|
|
|
|
|
*msg = last->next;
|
|
|
|
|
}else{
|
|
|
|
|
*msg = context->msgs;
|
|
|
|
|
*msg = context->inflight_msgs;
|
|
|
|
|
}
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
i = 0;
|
|
|
|
|
while(tail && tail->state == mosq_ms_queued && i<max_inflight){
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
tail->state = mosq_ms_send_pubrec;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
void db__message_dequeue_first(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
context->queued_msgs = msg->next;
|
|
|
|
|
if (context->last_queued_msg == msg){
|
|
|
|
|
context->last_queued_msg = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (context->last_inflight_msg){
|
|
|
|
|
context->last_inflight_msg->next = msg;
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
}else{
|
|
|
|
|
context->inflight_msgs = msg;
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
}
|
|
|
|
|
msg->next = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *last = NULL;
|
|
|
|
|
int msg_index = 0;
|
|
|
|
|
bool deleted = false;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
msg_index++;
|
|
|
|
|
if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
msg_index--;
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
deleted = true;
|
|
|
|
|
}else{
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
if(msg_index > max_inflight && deleted){
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
while (context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){
|
|
|
|
|
msg_index++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}else{
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
send__pubrec(context, tail->mid);
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -314,6 +308,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto_client_msg **msgs, **last_msg;
|
|
|
|
|
enum mosquitto_msg_state state = mosq_ms_invalid;
|
|
|
|
|
int rc = 0;
|
|
|
|
|
int i;
|
|
|
|
@ -422,12 +417,20 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
|
|
|
|
|
msg->dup = false;
|
|
|
|
|
msg->qos = qos;
|
|
|
|
|
msg->retain = retain;
|
|
|
|
|
if(context->last_msg){
|
|
|
|
|
context->last_msg->next = msg;
|
|
|
|
|
context->last_msg = msg;
|
|
|
|
|
|
|
|
|
|
if (state == mosq_ms_queued){
|
|
|
|
|
msgs = &(context->queued_msgs);
|
|
|
|
|
last_msg = &(context->last_queued_msg);
|
|
|
|
|
}else{
|
|
|
|
|
msgs = &(context->inflight_msgs);
|
|
|
|
|
last_msg = &(context->last_inflight_msg);
|
|
|
|
|
}
|
|
|
|
|
if(*last_msg){
|
|
|
|
|
(*last_msg)->next = msg;
|
|
|
|
|
(*last_msg) = msg;
|
|
|
|
|
}else{
|
|
|
|
|
context->msgs = msg;
|
|
|
|
|
context->last_msg = msg;
|
|
|
|
|
*msgs = msg;
|
|
|
|
|
*last_msg = msg;
|
|
|
|
|
}
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
if(qos > 0){
|
|
|
|
@ -478,7 +481,7 @@ int db__message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_m
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
tail->state = state;
|
|
|
|
@ -496,15 +499,25 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
db__msg_store_deref(db, &tail->store);
|
|
|
|
|
next = tail->next;
|
|
|
|
|
mosquitto__free(tail);
|
|
|
|
|
tail = next;
|
|
|
|
|
}
|
|
|
|
|
context->inflight_msgs = NULL;
|
|
|
|
|
context->last_inflight_msg = NULL;
|
|
|
|
|
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
db__msg_store_deref(db, &tail->store);
|
|
|
|
|
next = tail->next;
|
|
|
|
|
mosquitto__free(tail);
|
|
|
|
|
tail = next;
|
|
|
|
|
}
|
|
|
|
|
context->msgs = NULL;
|
|
|
|
|
context->last_msg = NULL;
|
|
|
|
|
context->queued_msgs = NULL;
|
|
|
|
|
context->last_queued_msg = NULL;
|
|
|
|
|
context->msg_count = 0;
|
|
|
|
|
context->msg_count12 = 0;
|
|
|
|
|
|
|
|
|
@ -616,7 +629,7 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
*stored = NULL;
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
if(tail->store->source_mid == mid && tail->direction == mosq_md_in){
|
|
|
|
|
*stored = tail->store;
|
|
|
|
@ -636,11 +649,11 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
struct mosquitto_client_msg *prev = NULL;
|
|
|
|
|
int count;
|
|
|
|
|
|
|
|
|
|
msg = context->msgs;
|
|
|
|
|
msg = context->inflight_msgs;
|
|
|
|
|
context->msg_count = 0;
|
|
|
|
|
context->msg_count12 = 0;
|
|
|
|
|
while(msg){
|
|
|
|
|
context->last_msg = msg;
|
|
|
|
|
context->last_inflight_msg = msg;
|
|
|
|
|
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
if(msg->qos > 0){
|
|
|
|
@ -648,22 +661,20 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(msg->direction == mosq_md_out){
|
|
|
|
|
if(msg->state != mosq_ms_queued){
|
|
|
|
|
switch(msg->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
if(msg->state == mosq_ms_wait_for_pubcomp){
|
|
|
|
|
msg->state = mosq_ms_resend_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
switch(msg->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
msg->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
if(msg->state == mosq_ms_wait_for_pubcomp){
|
|
|
|
|
msg->state = mosq_ms_resend_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
if(msg->qos != 2){
|
|
|
|
@ -684,11 +695,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
* get sent until the client next receives a message - and they
|
|
|
|
|
* will be sent out of order.
|
|
|
|
|
*/
|
|
|
|
|
if(context->msgs){
|
|
|
|
|
count = 0;
|
|
|
|
|
msg = context->msgs;
|
|
|
|
|
while(msg && (max_inflight == 0 || count < max_inflight)){
|
|
|
|
|
if(msg->state == mosq_ms_queued){
|
|
|
|
|
if(context->queued_msgs){
|
|
|
|
|
count = context->msg_count;
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
while(msg){
|
|
|
|
|
context->last_queued_msg = msg;
|
|
|
|
|
|
|
|
|
|
count++;
|
|
|
|
|
context->msg_count++;
|
|
|
|
|
if(msg->qos > 0){
|
|
|
|
|
context->msg_count12++;
|
|
|
|
|
}
|
|
|
|
|
if (max_inflight == 0 || count <= max_inflight){
|
|
|
|
|
switch(msg->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
msg->state = mosq_ms_publish_qos0;
|
|
|
|
@ -700,9 +718,11 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte
|
|
|
|
|
msg->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
msg = context->queued_msgs;
|
|
|
|
|
} else {
|
|
|
|
|
msg = msg->next;
|
|
|
|
|
}
|
|
|
|
|
msg = msg->next;
|
|
|
|
|
count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -722,30 +742,9 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
msg_index++;
|
|
|
|
|
if(tail->state == mosq_ms_queued && msg_index <= max_inflight){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
send__pubrec(context, tail->mid);
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(tail->mid == mid && tail->direction == dir){
|
|
|
|
|
qos = tail->store->qos;
|
|
|
|
|
topic = tail->store->topic;
|
|
|
|
@ -766,8 +765,31 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
if(msg_index > max_inflight && deleted){
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while(context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){
|
|
|
|
|
msg_index++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}else{
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
send__pubrec(context, tail->mid);
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if(deleted){
|
|
|
|
@ -799,103 +821,119 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tail = context->msgs;
|
|
|
|
|
tail = context->inflight_msgs;
|
|
|
|
|
while(tail){
|
|
|
|
|
if(tail->direction == mosq_md_in){
|
|
|
|
|
msg_count++;
|
|
|
|
|
}
|
|
|
|
|
if(tail->state != mosq_ms_queued){
|
|
|
|
|
mid = tail->mid;
|
|
|
|
|
retries = tail->dup;
|
|
|
|
|
retain = tail->retain;
|
|
|
|
|
topic = tail->store->topic;
|
|
|
|
|
qos = tail->qos;
|
|
|
|
|
payloadlen = tail->store->payloadlen;
|
|
|
|
|
payload = UHPA_ACCESS_PAYLOAD(tail->store);
|
|
|
|
|
|
|
|
|
|
switch(tail->state){
|
|
|
|
|
case mosq_ms_publish_qos0:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
msg_count++;
|
|
|
|
|
mid = tail->mid;
|
|
|
|
|
retries = tail->dup;
|
|
|
|
|
retain = tail->retain;
|
|
|
|
|
topic = tail->store->topic;
|
|
|
|
|
qos = tail->qos;
|
|
|
|
|
payloadlen = tail->store->payloadlen;
|
|
|
|
|
payload = UHPA_ACCESS_PAYLOAD(tail->store);
|
|
|
|
|
|
|
|
|
|
switch(tail->state){
|
|
|
|
|
case mosq_ms_publish_qos0:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
db__message_remove(db, context, &tail, last);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos1:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_puback;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos2:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrec;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos1:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_puback;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
case mosq_ms_send_pubrec:
|
|
|
|
|
rc = send__pubrec(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_publish_qos2:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->timestamp = mosquitto_time();
|
|
|
|
|
tail->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrec;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_send_pubrec:
|
|
|
|
|
rc = send__pubrec(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
case mosq_ms_resend_pubrel:
|
|
|
|
|
rc = send__pubrel(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubcomp;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubrel:
|
|
|
|
|
rc = send__pubrel(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubcomp;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
case mosq_ms_resend_pubcomp:
|
|
|
|
|
rc = send__pubcomp(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case mosq_ms_resend_pubcomp:
|
|
|
|
|
rc = send__pubcomp(context, mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
while(context->queued_msgs && (max_inflight == 0 || msg_count < max_inflight)){
|
|
|
|
|
msg_count++;
|
|
|
|
|
tail = context->queued_msgs;
|
|
|
|
|
if(tail->direction == mosq_md_out){
|
|
|
|
|
switch(tail->qos){
|
|
|
|
|
case 0:
|
|
|
|
|
tail->state = mosq_ms_publish_qos0;
|
|
|
|
|
break;
|
|
|
|
|
case 1:
|
|
|
|
|
tail->state = mosq_ms_publish_qos1;
|
|
|
|
|
break;
|
|
|
|
|
case 2:
|
|
|
|
|
tail->state = mosq_ms_publish_qos2;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
}else{
|
|
|
|
|
/* state == mosq_ms_queued */
|
|
|
|
|
if(tail->direction == mosq_md_in && (max_inflight == 0 || msg_count < max_inflight)){
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
tail->state = mosq_ms_send_pubrec;
|
|
|
|
|
if(tail->qos == 2){
|
|
|
|
|
tail->state = mosq_ms_send_pubrec;
|
|
|
|
|
db__message_dequeue_first(context);
|
|
|
|
|
rc = send__pubrec(context, tail->mid);
|
|
|
|
|
if(!rc){
|
|
|
|
|
tail->state = mosq_ms_wait_for_pubrel;
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
last = tail;
|
|
|
|
|
tail = tail->next;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|