|
|
|
@ -339,7 +339,7 @@ void db__msg_store_compact(void)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void db__message_remove(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
|
|
|
|
|
static void db__message_remove_from_inflight(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
|
|
|
|
|
{
|
|
|
|
|
if(!msg_data || !item){
|
|
|
|
|
return;
|
|
|
|
@ -390,7 +390,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
|
|
|
|
|
return MOSQ_ERR_PROTOCOL;
|
|
|
|
|
}
|
|
|
|
|
msg_index--;
|
|
|
|
|
db__message_remove(&context->msgs_out, tail);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_out, tail);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -894,7 +894,7 @@ static int db__message_reconnect_reset_incoming(struct mosquitto *context)
|
|
|
|
|
if(msg->qos != 2){
|
|
|
|
|
/* Anything <QoS 2 can be completely retried by the client at
|
|
|
|
|
* no harm. */
|
|
|
|
|
db__message_remove(&context->msgs_in, msg);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_in, msg);
|
|
|
|
|
}else{
|
|
|
|
|
/* Message state can be preserved here because it should match
|
|
|
|
|
* whatever the client has got. */
|
|
|
|
@ -950,7 +950,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid)
|
|
|
|
|
if(tail->store->qos != 2){
|
|
|
|
|
return MOSQ_ERR_PROTOCOL;
|
|
|
|
|
}
|
|
|
|
|
db__message_remove(&context->msgs_in, tail);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_in, tail);
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -986,12 +986,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
|
|
|
|
|
* keep resending it. That means we don't send it to other
|
|
|
|
|
* clients. */
|
|
|
|
|
if(topic == NULL){
|
|
|
|
|
db__message_remove(&context->msgs_in, tail);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_in, tail);
|
|
|
|
|
deleted = true;
|
|
|
|
|
}else{
|
|
|
|
|
rc = sub__messages_queue(source_id, topic, 2, retain, &tail->store);
|
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_NO_SUBSCRIBERS){
|
|
|
|
|
db__message_remove(&context->msgs_in, tail);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_in, tail);
|
|
|
|
|
deleted = true;
|
|
|
|
|
}else{
|
|
|
|
|
return 1;
|
|
|
|
@ -1041,7 +1041,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
|
|
|
|
|
if(msg->direction == mosq_md_out && msg->qos > 0){
|
|
|
|
|
util__increment_send_quota(context);
|
|
|
|
|
}
|
|
|
|
|
db__message_remove(&context->msgs_out, msg);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_out, msg);
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}else{
|
|
|
|
|
expiry_interval = (uint32_t)(msg->store->message_expiry_time - db.now_real_s);
|
|
|
|
@ -1061,7 +1061,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
|
|
|
|
|
case mosq_ms_publish_qos0:
|
|
|
|
|
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
|
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
db__message_remove(&context->msgs_out, msg);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_out, msg);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -1074,7 +1074,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
|
|
|
|
|
msg->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
msg->state = mosq_ms_wait_for_puback;
|
|
|
|
|
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
db__message_remove(&context->msgs_out, msg);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_out, msg);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -1087,7 +1087,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
|
|
|
|
|
msg->dup = 1; /* Any retry attempts are a duplicate. */
|
|
|
|
|
msg->state = mosq_ms_wait_for_pubrec;
|
|
|
|
|
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
|
|
|
|
|
db__message_remove(&context->msgs_out, msg);
|
|
|
|
|
db__message_remove_from_inflight(&context->msgs_out, msg);
|
|
|
|
|
}else{
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|