|
|
@ -48,6 +48,7 @@ Contributors:
|
|
|
|
|
|
|
|
|
|
|
|
#include "mosquitto_broker_internal.h"
|
|
|
|
#include "mosquitto_broker_internal.h"
|
|
|
|
#include "memory_mosq.h"
|
|
|
|
#include "memory_mosq.h"
|
|
|
|
|
|
|
|
#include "mqtt_protocol.h"
|
|
|
|
#include "packet_mosq.h"
|
|
|
|
#include "packet_mosq.h"
|
|
|
|
#include "send_mosq.h"
|
|
|
|
#include "send_mosq.h"
|
|
|
|
#include "sys_tree.h"
|
|
|
|
#include "sys_tree.h"
|
|
|
@ -69,7 +70,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l)
|
|
|
|
static struct lws_sorted_usec_list sul;
|
|
|
|
static struct lws_sorted_usec_list sul;
|
|
|
|
#endif
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg)
|
|
|
|
static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
struct mosquitto_msg_store *stored;
|
|
|
|
struct mosquitto_msg_store *stored;
|
|
|
|
uint16_t mid;
|
|
|
|
uint16_t mid;
|
|
|
@ -95,7 +96,7 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5
|
|
|
|
msg->properties = NULL;
|
|
|
|
msg->properties = NULL;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if(db__message_store(context, stored, 0, 0, mosq_mo_broker)) return 1;
|
|
|
|
if(db__message_store(context, stored, message_expiry, 0, mosq_mo_broker)) return 1;
|
|
|
|
|
|
|
|
|
|
|
|
if(msg->qos){
|
|
|
|
if(msg->qos){
|
|
|
|
mid = mosquitto__mid_generate(context);
|
|
|
|
mid = mosquitto__mid_generate(context);
|
|
|
@ -106,20 +107,50 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t *message_expiry)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
mosquitto_property *p, *previous = NULL;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*message_expiry = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(!proplist) return;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
p = *proplist;
|
|
|
|
|
|
|
|
while(p){
|
|
|
|
|
|
|
|
if(p->identifier == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL){
|
|
|
|
|
|
|
|
*message_expiry = p->value.i32;
|
|
|
|
|
|
|
|
if(p == *proplist){
|
|
|
|
|
|
|
|
*proplist = p->next;
|
|
|
|
|
|
|
|
}else{
|
|
|
|
|
|
|
|
previous->next = p->next;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
property__free(&p);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
previous = p;
|
|
|
|
|
|
|
|
p = p->next;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void queue_plugin_msgs(void)
|
|
|
|
void queue_plugin_msgs(void)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
struct mosquitto_message_v5 *msg, *tmp;
|
|
|
|
struct mosquitto_message_v5 *msg, *tmp;
|
|
|
|
struct mosquitto *context;
|
|
|
|
struct mosquitto *context;
|
|
|
|
|
|
|
|
uint32_t message_expiry;
|
|
|
|
|
|
|
|
|
|
|
|
DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){
|
|
|
|
DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){
|
|
|
|
DL_DELETE(db.plugin_msgs, msg);
|
|
|
|
DL_DELETE(db.plugin_msgs, msg);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
read_message_expiry_interval(&msg->properties, &message_expiry);
|
|
|
|
|
|
|
|
|
|
|
|
if(msg->clientid){
|
|
|
|
if(msg->clientid){
|
|
|
|
HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context);
|
|
|
|
HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context);
|
|
|
|
if(context){
|
|
|
|
if(context){
|
|
|
|
single_publish(context, msg);
|
|
|
|
single_publish(context, msg, message_expiry);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}else{
|
|
|
|
}else{
|
|
|
|
db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties);
|
|
|
|
db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
mosquitto__free(msg->topic);
|
|
|
|
mosquitto__free(msg->topic);
|
|
|
|
mosquitto__free(msg->payload);
|
|
|
|
mosquitto__free(msg->payload);
|
|
|
|