|
|
|
@ -25,7 +25,7 @@ Contributors:
|
|
|
|
|
#include "send_mosq.h"
|
|
|
|
|
#include "time_mosq.h"
|
|
|
|
|
|
|
|
|
|
void mosquitto__message_cleanup(struct mosquitto_message_all **message)
|
|
|
|
|
void message__cleanup(struct mosquitto_message_all **message)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *msg;
|
|
|
|
|
|
|
|
|
@ -38,7 +38,7 @@ void mosquitto__message_cleanup(struct mosquitto_message_all **message)
|
|
|
|
|
mosquitto__free(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mosquitto__message_cleanup_all(struct mosquitto *mosq)
|
|
|
|
|
void message__cleanup_all(struct mosquitto *mosq)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *tmp;
|
|
|
|
|
|
|
|
|
@ -46,12 +46,12 @@ void mosquitto__message_cleanup_all(struct mosquitto *mosq)
|
|
|
|
|
|
|
|
|
|
while(mosq->in_messages){
|
|
|
|
|
tmp = mosq->in_messages->next;
|
|
|
|
|
mosquitto__message_cleanup(&mosq->in_messages);
|
|
|
|
|
message__cleanup(&mosq->in_messages);
|
|
|
|
|
mosq->in_messages = tmp;
|
|
|
|
|
}
|
|
|
|
|
while(mosq->out_messages){
|
|
|
|
|
tmp = mosq->out_messages->next;
|
|
|
|
|
mosquitto__message_cleanup(&mosq->out_messages);
|
|
|
|
|
message__cleanup(&mosq->out_messages);
|
|
|
|
|
mosq->out_messages = tmp;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -80,15 +80,15 @@ int mosquitto_message_copy(struct mosquitto_message *dst, const struct mosquitto
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int mosquitto__message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
|
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
|
int rc;
|
|
|
|
|
assert(mosq);
|
|
|
|
|
|
|
|
|
|
rc = mosquitto__message_remove(mosq, mid, dir, &message);
|
|
|
|
|
rc = message__remove(mosq, mid, dir, &message);
|
|
|
|
|
if(rc == MOSQ_ERR_SUCCESS){
|
|
|
|
|
mosquitto__message_cleanup(&message);
|
|
|
|
|
message__cleanup(&message);
|
|
|
|
|
}
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
@ -106,7 +106,7 @@ void mosquitto_message_free(struct mosquitto_message **message)
|
|
|
|
|
mosquitto__free(msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mosquitto__message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
|
|
|
|
|
void message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
|
|
|
|
|
{
|
|
|
|
|
/* mosq->*_message_mutex should be locked before entering this function */
|
|
|
|
|
assert(mosq);
|
|
|
|
@ -136,7 +136,7 @@ void mosquitto__message_queue(struct mosquitto *mosq, struct mosquitto_message_a
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mosquitto__messages_reconnect_reset(struct mosquitto *mosq)
|
|
|
|
|
void message__reconnect_reset(struct mosquitto *mosq)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
|
struct mosquitto_message_all *prev = NULL;
|
|
|
|
@ -151,11 +151,11 @@ void mosquitto__messages_reconnect_reset(struct mosquitto *mosq)
|
|
|
|
|
if(message->msg.qos != 2){
|
|
|
|
|
if(prev){
|
|
|
|
|
prev->next = message->next;
|
|
|
|
|
mosquitto__message_cleanup(&message);
|
|
|
|
|
message__cleanup(&message);
|
|
|
|
|
message = prev;
|
|
|
|
|
}else{
|
|
|
|
|
mosq->in_messages = message->next;
|
|
|
|
|
mosquitto__message_cleanup(&message);
|
|
|
|
|
message__cleanup(&message);
|
|
|
|
|
message = mosq->in_messages;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
@ -196,7 +196,7 @@ void mosquitto__messages_reconnect_reset(struct mosquitto *mosq)
|
|
|
|
|
pthread_mutex_unlock(&mosq->out_message_mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int mosquitto__message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
|
|
|
|
|
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *cur, *prev = NULL;
|
|
|
|
|
bool found = false;
|
|
|
|
@ -294,9 +294,9 @@ int mosquitto__message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquit
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef WITH_THREADING
|
|
|
|
|
void mosquitto__message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages, pthread_mutex_t *mutex)
|
|
|
|
|
void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages, pthread_mutex_t *mutex)
|
|
|
|
|
#else
|
|
|
|
|
void mosquitto__message_retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages)
|
|
|
|
|
void message__retry_check_actual(struct mosquitto *mosq, struct mosquitto_message_all *messages)
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
time_t now = mosquitto_time();
|
|
|
|
@ -336,14 +336,14 @@ void mosquitto__message_retry_check_actual(struct mosquitto *mosq, struct mosqui
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void mosquitto__message_retry_check(struct mosquitto *mosq)
|
|
|
|
|
void message__retry_check(struct mosquitto *mosq)
|
|
|
|
|
{
|
|
|
|
|
#ifdef WITH_THREADING
|
|
|
|
|
mosquitto__message_retry_check_actual(mosq, mosq->out_messages, &mosq->out_message_mutex);
|
|
|
|
|
mosquitto__message_retry_check_actual(mosq, mosq->in_messages, &mosq->in_message_mutex);
|
|
|
|
|
message__retry_check_actual(mosq, mosq->out_messages, &mosq->out_message_mutex);
|
|
|
|
|
message__retry_check_actual(mosq, mosq->in_messages, &mosq->in_message_mutex);
|
|
|
|
|
#else
|
|
|
|
|
mosquitto__message_retry_check_actual(mosq, mosq->out_messages);
|
|
|
|
|
mosquitto__message_retry_check_actual(mosq, mosq->in_messages);
|
|
|
|
|
message__retry_check_actual(mosq, mosq->out_messages);
|
|
|
|
|
message__retry_check_actual(mosq, mosq->in_messages);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -353,7 +353,7 @@ void mosquitto_message_retry_set(struct mosquitto *mosq, unsigned int message_re
|
|
|
|
|
if(mosq) mosq->message_retry = message_retry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int mosquitto__message_out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
|
|
|
|
|
int message__out_update(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_state state)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_message_all *message;
|
|
|
|
|
assert(mosq);
|
|
|
|
|