|
|
|
@ -138,7 +138,7 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
|
|
|
|
|
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg)
|
|
|
|
|
{
|
|
|
|
|
msg_data->inflight_count++;
|
|
|
|
|
msg_data->inflight_bytes += msg->base_msg->msg.payloadlen;
|
|
|
|
@ -148,7 +148,7 @@ void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct m
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
|
|
|
|
|
static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg)
|
|
|
|
|
{
|
|
|
|
|
msg_data->inflight_count--;
|
|
|
|
|
msg_data->inflight_bytes -= msg->base_msg->msg.payloadlen;
|
|
|
|
@ -159,7 +159,7 @@ static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_da
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
|
|
|
|
|
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg)
|
|
|
|
|
{
|
|
|
|
|
msg_data->queued_count++;
|
|
|
|
|
msg_data->queued_bytes += msg->base_msg->msg.payloadlen;
|
|
|
|
@ -169,7 +169,7 @@ void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mos
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
|
|
|
|
|
static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg)
|
|
|
|
|
{
|
|
|
|
|
msg_data->queued_count--;
|
|
|
|
|
msg_data->queued_bytes -= msg->base_msg->msg.payloadlen;
|
|
|
|
@ -328,7 +328,7 @@ void db__msg_store_compact(void)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void db__message_remove_inflight(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
|
|
|
|
|
static void db__message_remove_inflight(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item)
|
|
|
|
|
{
|
|
|
|
|
if(!context || !msg_data || !item){
|
|
|
|
|
return;
|
|
|
|
@ -346,7 +346,7 @@ static void db__message_remove_inflight(struct mosquitto *context, struct mosqui
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void db__message_remove_queued(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *item)
|
|
|
|
|
static void db__message_remove_queued(struct mosquitto *context, struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *item)
|
|
|
|
|
{
|
|
|
|
|
if(!context || !msg_data || !item){
|
|
|
|
|
return;
|
|
|
|
@ -366,7 +366,7 @@ static void db__message_remove_queued(struct mosquitto *context, struct mosquitt
|
|
|
|
|
|
|
|
|
|
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto__client_msg *msg;
|
|
|
|
|
|
|
|
|
|
UNUSED(context);
|
|
|
|
|
|
|
|
|
@ -384,7 +384,7 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d
|
|
|
|
|
|
|
|
|
|
int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
bool deleted = false;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
@ -446,7 +446,7 @@ int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mo
|
|
|
|
|
/* Only for QoS 2 messages */
|
|
|
|
|
int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, struct mosquitto__base_msg *base_msg, bool persist)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto__client_msg *msg;
|
|
|
|
|
struct mosquitto_msg_data *msg_data;
|
|
|
|
|
enum mosquitto_msg_state state = mosq_ms_invalid;
|
|
|
|
|
int rc = 0;
|
|
|
|
@ -484,7 +484,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
|
|
|
|
|
msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg));
|
|
|
|
|
if(!msg) return MOSQ_ERR_NOMEM;
|
|
|
|
|
msg->prev = NULL;
|
|
|
|
|
msg->next = NULL;
|
|
|
|
@ -528,7 +528,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
|
|
|
|
|
|
|
|
|
|
int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uint16_t mid, uint8_t qos, bool retain, struct mosquitto__base_msg *base_msg, uint32_t subscription_identifier, bool update, bool persist)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg;
|
|
|
|
|
struct mosquitto__client_msg *msg;
|
|
|
|
|
struct mosquitto_msg_data *msg_data;
|
|
|
|
|
enum mosquitto_msg_state state = mosq_ms_invalid;
|
|
|
|
|
int rc = 0;
|
|
|
|
@ -626,7 +626,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
msg = mosquitto__malloc(sizeof(struct mosquitto_client_msg));
|
|
|
|
|
msg = mosquitto__malloc(sizeof(struct mosquitto__client_msg));
|
|
|
|
|
if(!msg) return MOSQ_ERR_NOMEM;
|
|
|
|
|
msg->prev = NULL;
|
|
|
|
|
msg->next = NULL;
|
|
|
|
@ -707,7 +707,7 @@ int db__message_insert_outgoing(struct mosquitto *context, uint64_t cmsg_id, uin
|
|
|
|
|
|
|
|
|
|
int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos, bool persist)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
struct mosquitto__client_msg *tail;
|
|
|
|
|
|
|
|
|
|
DL_FOREACH(context->msgs_out.inflight, tail){
|
|
|
|
|
if(tail->mid == mid){
|
|
|
|
@ -725,9 +725,9 @@ int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mo
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void db__messages_delete_list(struct mosquitto_client_msg **head)
|
|
|
|
|
static void db__messages_delete_list(struct mosquitto__client_msg **head)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(*head, tail, tmp){
|
|
|
|
|
DL_DELETE(*head, tail);
|
|
|
|
@ -966,7 +966,7 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
|
|
|
|
|
|
|
|
|
|
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto__base_msg **base_msg)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail;
|
|
|
|
|
struct mosquitto__client_msg *tail;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
@ -992,7 +992,7 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu
|
|
|
|
|
* retry, and to set incoming messages to expect an appropriate retry. */
|
|
|
|
|
static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *msg, *tmp;
|
|
|
|
|
|
|
|
|
|
context->msgs_out.inflight_bytes = 0;
|
|
|
|
|
context->msgs_out.inflight_bytes12 = 0;
|
|
|
|
@ -1059,7 +1059,7 @@ static int db__message_reconnect_reset_outgoing(struct mosquitto *context)
|
|
|
|
|
/* Called on reconnect to set incoming messages to expect an appropriate retry. */
|
|
|
|
|
static int db__message_reconnect_reset_incoming(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *msg, *tmp;
|
|
|
|
|
|
|
|
|
|
context->msgs_in.inflight_bytes = 0;
|
|
|
|
|
context->msgs_in.inflight_bytes12 = 0;
|
|
|
|
@ -1128,7 +1128,7 @@ int db__message_reconnect_reset(struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
int db__message_remove_incoming(struct mosquitto* context, uint16_t mid)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
|
|
|
|
|
if(!context) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
@ -1148,7 +1148,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid)
|
|
|
|
|
|
|
|
|
|
int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
int retain;
|
|
|
|
|
char *topic;
|
|
|
|
|
char *source_id;
|
|
|
|
@ -1207,7 +1207,7 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
|
|
|
|
|
|
|
|
|
|
void db__expire_all_messages(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *msg, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *msg, *tmp;
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
|
|
|
|
|
if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){
|
|
|
|
@ -1238,7 +1238,7 @@ void db__expire_all_messages(struct mosquitto *context)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto_client_msg *msg)
|
|
|
|
|
static int db__message_write_inflight_out_single(struct mosquitto *context, struct mosquitto__client_msg *msg)
|
|
|
|
|
{
|
|
|
|
|
mosquitto_property *base_msg_props = NULL;
|
|
|
|
|
int rc;
|
|
|
|
@ -1337,7 +1337,7 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
|
|
|
|
|
|
|
|
|
|
int db__message_write_inflight_out_all(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
|
|
if(context->state != mosq_cs_active || !net__is_connected(context)){
|
|
|
|
@ -1354,7 +1354,7 @@ int db__message_write_inflight_out_all(struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
int db__message_write_inflight_out_latest(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *next;
|
|
|
|
|
struct mosquitto__client_msg *tail, *next;
|
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
|
|
if(context->state != mosq_cs_active
|
|
|
|
@ -1399,7 +1399,7 @@ int db__message_write_inflight_out_latest(struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
int db__message_write_queued_in(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
|
|
if(context->state != mosq_cs_active){
|
|
|
|
@ -1430,7 +1430,7 @@ int db__message_write_queued_in(struct mosquitto *context)
|
|
|
|
|
|
|
|
|
|
int db__message_write_queued_out(struct mosquitto *context)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_client_msg *tail, *tmp;
|
|
|
|
|
struct mosquitto__client_msg *tail, *tmp;
|
|
|
|
|
|
|
|
|
|
if(context->state != mosq_cs_active){
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|