db_dump: Update for version 5 file format

Closes #1320. Thanks to Christoph Krey.
pull/1430/head
Roger A. Light 6 years ago
parent 13ac1080a0
commit c269e6f019

@ -1,16 +1,70 @@
include ../../config.mk
CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../.. -I../deps
CFLAGS_FINAL=${CFLAGS} -I.. -I../../ -I../../lib -I../.. -I../deps -DWITH_BROKER -DWITH_PERSISTENCE
OBJS = \
db_dump.o \
print.o \
\
packet_datatypes.o \
packet_mosq.o \
persist_read.o \
persist_read_v234.o \
persist_read_v5.o \
property_mosq.o \
send_disconnect.o \
stubs.o \
time_mosq.o \
utf8_mosq.o
.PHONY: all clean reallyclean
all : mosquitto_db_dump
mosquitto_db_dump : db_dump.o
mosquitto_db_dump : ${OBJS}
${CROSS_COMPILE}${CC} $^ -o $@ ${LDFLAGS} ${LIBS}
db_dump.o : db_dump.c ../persist.h
db_dump.o : db_dump.c db_dump.h ../persist.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
print.o : print.c db_dump.h ../persist.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
net_mosq.o : ../../lib/net_mosq.c ../../lib/net_mosq.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
packet_datatypes.o : ../../lib/packet_datatypes.c ../../lib/packet_mosq.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
packet_mosq.o : ../../lib/packet_mosq.c ../../lib/packet_mosq.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
persist_read.o : ../persist_read.c ../persist.h ../mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
persist_read_v234.o : ../persist_read_v234.c ../persist.h ../mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
persist_read_v5.o : ../persist_read_v5.c ../persist.h ../mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
property_mosq.o : ../../lib/property_mosq.c ../../lib/property_mosq.h
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
read_handle.o : ../../src/read_handle.c
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
stubs.o : stubs.c
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
send_disconnect.o : ../../lib/send_disconnect.c
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
time_mosq.o : ../../lib/time_mosq.c
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
utf8_mosq.o : ../../lib/utf8_mosq.c
${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@
clean :
clean :
-rm -f *.o mosquitto_db_dump

@ -1,5 +1,5 @@
/*
Copyright (c) 2010-2012 Roger Light <roger@atchoo.org>
Copyright (c) 2010-2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
@ -35,9 +35,9 @@ Contributors:
#define _mosquitto_free(A) free((A))
#include <uthash.h>
const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};
#include "db_dump.h"
struct client_chunk
struct client_data
{
UT_hash_handle hh_id;
char *id;
@ -54,432 +54,340 @@ struct msg_store_chunk
uint32_t length;
};
struct db_sub
{
char *client_id;
char *topic;
uint8_t qos;
};
struct db_client
{
char *client_id;
uint16_t last_mid;
time_t disconnect_t;
};
struct db_client_msg
{
char *client_id;
uint8_t qos, retain, direction, state, dup;
dbid_t store_id;
uint16_t mid;
};
struct db_msg
{
dbid_t store_id;
uint32_t payloadlen;
uint16_t source_mid, mid;
uint8_t qos, retain;
uint8_t *payload;
char *source_id;
char *source_username;
char *topic;
uint16_t source_port;
};
static uint32_t db_version;
extern uint32_t db_version;
static int stats = 0;
static int client_stats = 0;
static int do_print = 1;
struct client_chunk *clients_by_id = NULL;
/* Counts */
static long cfg_count = 0;
static long client_count = 0;
static long client_msg_count = 0;
static long msg_store_count = 0;
static long retain_count = 0;
static long sub_count = 0;
/* ====== */
struct client_data *clients_by_id = NULL;
struct msg_store_chunk *msgs_by_id = NULL;
static void
free__db_sub(struct db_sub *sub)
{
if (sub->client_id) {
free(sub->client_id);
}
if (sub->topic) {
free(sub->topic);
}
}
static void
free__db_client(struct db_client *client)
static void free__sub(struct P_sub *chunk)
{
if (client->client_id) {
free(client->client_id);
}
free(chunk->client_id);
free(chunk->topic);
}
static void
free__db_client_msg(struct db_client_msg *msg)
static void free__client(struct P_client *chunk)
{
if (msg->client_id) {
free(msg->client_id);
}
free(chunk->client_id);
}
static void
free__db_msg(struct db_msg *msg)
{
if (msg->source_id) {
free(msg->source_id);
}
if (msg->topic) {
free(msg->topic);
}
if (msg->payload) {
free(msg->payload);
}
}
static void
print_db_client(struct db_client *client, int length)
static void free__client_msg(struct P_client_msg *chunk)
{
printf("DB_CHUNK_CLIENT:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", client->client_id);
printf("\tLast MID: %d\n", client->last_mid);
printf("\tDisconnect time: %ld\n", client->disconnect_t);
free(chunk->client_id);
mosquitto_property_free_all(&chunk->properties);
}
static void
print_db_client_msg(struct db_client_msg *msg, int length)
{
printf("DB_CHUNK_CLIENT_MSG:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", msg->client_id);
printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
printf("\tMID: %d\n", msg->mid);
printf("\tQoS: %d\n", msg->qos);
printf("\tRetain: %d\n", msg->retain);
printf("\tDirection: %d\n", msg->direction);
printf("\tState: %d\n", msg->state);
printf("\tDup: %d\n", msg->dup);
}
static void
print_db_sub(struct db_sub *sub, int length)
static void free__msg_store(struct P_msg_store *chunk)
{
printf("DB_CHUNK_SUB:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", sub->client_id);
printf("\tTopic: %s\n", sub->topic);
printf("\tQoS: %d\n", sub->qos);
//free(chunk->source_id);
free(chunk->topic);
UHPA_FREE(chunk->payload, chunk->F.payloadlen);
mosquitto_property_free_all(&chunk->properties);
}
static void
print_db_msg(struct db_msg *msg, int length)
static int dump__cfg_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
{
printf("DB_CHUNK_MSG_STORE:\n");
printf("\tLength: %d\n", length);
printf("\tStore ID: %" PRIu64 "\n", msg->store_id);
printf("\tSource ID: %s\n", msg->source_id);
printf("\tSource Username: %s\n", msg->source_username);
printf("\tSource Port: %d\n", msg->source_port);
printf("\tSource MID: %d\n", msg->source_mid);
printf("\tMID: %d\n", msg->mid);
printf("\tTopic: %s\n", msg->topic);
printf("\tQoS: %d\n", msg->qos);
printf("\tRetain: %d\n", msg->retain);
printf("\tPayload Length: %d\n", msg->payloadlen);
bool binary = false;
for(int i=0; i<msg->payloadlen; i++){
if(msg->payload[i] == 0) binary = true;
}
if(binary == false && msg->payloadlen<256){
printf("\tPayload: %s\n", msg->payload);
}
}
struct PF_cfg chunk;
int rc;
cfg_count++;
int persist__read_string(FILE *db_fptr, char **str)
{
uint16_t i16temp;
uint16_t slen;
char *s = NULL;
memset(&chunk, 0, sizeof(struct PF_cfg));
if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){
return MOSQ_ERR_INVAL;
if(db_version == 5){
rc = persist__chunk_cfg_read_v5(db_fd, &chunk);
}else{
rc = persist__chunk_cfg_read_v234(db_fd, &chunk);
}
slen = ntohs(i16temp);
if(slen){
s = mosquitto__malloc(slen+1);
if(!s){
fclose(db_fptr);
fprintf(stderr, "Error: Out of memory.\n");
return MOSQ_ERR_NOMEM;
}
if(fread(s, 1, slen, db_fptr) != slen){
mosquitto__free(s);
fprintf(stderr, "Error: %s.\n", strerror(errno));
return MOSQ_ERR_INVAL;
}
s[slen] = '\0';
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
if(do_print) printf("DB_CHUNK_CFG:\n");
if(do_print) printf("\tLength: %d\n", length);
if(do_print) printf("\tShutdown: %d\n", chunk.shutdown);
if(do_print) printf("\tDB ID size: %d\n", chunk.dbid_size);
if(chunk.dbid_size != sizeof(dbid_t)){
fprintf(stderr, "Error: Incompatible database configuration (dbid size is %d bytes, expected %ld)",
chunk.dbid_size, sizeof(dbid_t));
fclose(db_fd);
return 1;
}
if(do_print) printf("\tLast DB ID: %" PRIu64 "\n", chunk.last_db_id);
*str = s;
return MOSQ_ERR_SUCCESS;
return 0;
}
static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd, struct db_client *client)
static int dump__client_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
{
uint16_t i16temp;
struct P_client chunk;
int rc = 0;
struct client_chunk *cc;
struct client_data *cc;
client_count++;
rc = persist__read_string(db_fd, &client->client_id);
memset(&chunk, 0, sizeof(struct P_client));
if(db_version == 5){
rc = persist__chunk_client_read_v5(db_fd, &chunk);
}else{
rc = persist__chunk_client_read_v234(db_fd, &chunk, db_version);
}
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
read_e(db_fd, &i16temp, sizeof(uint16_t));
client->last_mid = ntohs(i16temp);
if(db_version == 2){
client->disconnect_t = time(NULL);
}else{
read_e(db_fd, &client->disconnect_t, sizeof(time_t));
}
if(client_stats){
cc = calloc(1, sizeof(struct client_chunk));
cc = calloc(1, sizeof(struct client_data));
if(!cc){
errno = ENOMEM;
goto error;
fprintf(stderr, "Error: Out of memory.\n");
fclose(db_fd);
free(chunk.client_id);
return 1;
}
cc->id = strdup(client->client_id);
cc->id = strdup(chunk.client_id);
HASH_ADD_KEYPTR(hh_id, clients_by_id, cc->id, strlen(cc->id), cc);
}
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(client->client_id);
return 1;
if(do_print) {
print__client(&chunk, length);
}
free__client(&chunk);
return 0;
}
static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_client_msg *msg)
static int dump__client_msg_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
{
dbid_t i64temp;
uint16_t i16temp;
struct client_chunk *cc;
struct P_client_msg chunk;
struct client_data *cc;
struct msg_store_chunk *msc;
int rc;
rc = persist__read_string(db_fd, &msg->client_id);
client_msg_count++;
memset(&chunk, 0, sizeof(struct P_client_msg));
if(db_version == 5){
rc = persist__chunk_client_msg_read_v5(db_fd, &chunk, length);
}else{
rc = persist__chunk_client_msg_read_v234(db_fd, &chunk);
}
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
read_e(db_fd, &i64temp, sizeof(dbid_t));
msg->store_id = i64temp;
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->mid = ntohs(i16temp);
read_e(db_fd, &msg->qos, sizeof(uint8_t));
read_e(db_fd, &msg->retain, sizeof(uint8_t));
read_e(db_fd, &msg->direction, sizeof(uint8_t));
read_e(db_fd, &msg->state, sizeof(uint8_t));
read_e(db_fd, &msg->dup, sizeof(uint8_t));
if(client_stats){
HASH_FIND(hh_id, clients_by_id, msg->client_id, strlen(msg->client_id), cc);
HASH_FIND(hh_id, clients_by_id, chunk.client_id, strlen(chunk.client_id), cc);
if(cc){
printf("FOUND %s\n", chunk.client_id);
cc->messages++;
cc->message_size += length;
HASH_FIND(hh, msgs_by_id, &msg->store_id, sizeof(dbid_t), msc);
HASH_FIND(hh, msgs_by_id, &chunk.F.store_id, sizeof(dbid_t), msc);
if(msc){
cc->message_size += msc->length;
}
}
}
if(do_print) {
print__client_msg(&chunk, length);
}
free__client_msg(&chunk);
return 0;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(msg->client_id);
return 1;
}
static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_msg *msg)
static int dump__msg_store_chunk_process(struct mosquitto_db *db, FILE *db_fptr, uint32_t length)
{
dbid_t i64temp;
uint32_t i32temp;
uint16_t i16temp;
struct P_msg_store chunk;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store_load *load;
int64_t message_expiry_interval64;
uint32_t message_expiry_interval;
int rc = 0;
struct msg_store_chunk *mcs;
read_e(db_fd, &i64temp, sizeof(dbid_t));
msg->store_id = i64temp;
msg_store_count++;
rc = persist__read_string(db_fd, &msg->source_id);
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
if(db_version == 4){
rc = persist__read_string(db_fd, &msg->source_username);
if(rc){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(msg->source_id);
free(msg->topic);
free(msg->payload);
free(msg->source_id);
return 1;
}
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->source_port = ntohs(i16temp);
memset(&chunk, 0, sizeof(struct P_msg_store));
if(db_version == 5){
rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk, length);
}else{
rc = persist__chunk_msg_store_read_v234(db_fptr, &chunk, db_version);
}
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->source_mid = ntohs(i16temp);
read_e(db_fd, &i16temp, sizeof(uint16_t));
msg->mid = ntohs(i16temp);
rc = persist__read_string(db_fd, &msg->topic);
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
fclose(db_fptr);
return rc;
}
read_e(db_fd, &msg->qos, sizeof(uint8_t));
read_e(db_fd, &msg->retain, sizeof(uint8_t));
read_e(db_fd, &i32temp, sizeof(uint32_t));
msg->payloadlen = ntohl(i32temp);
load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load));
if(!load){
fclose(db_fptr);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
if(msg->payloadlen){
msg->payload = malloc(msg->payloadlen+1);
if(!msg->payload){
fclose(db_fd);
free(msg->source_id);
free(msg->topic);
fprintf(stderr, "Error: Out of memory.");
return 1;
}
memset(msg->payload, 0, msg->payloadlen+1);
if(fread(msg->payload, 1, msg->payloadlen, db_fd) != msg->payloadlen){
fprintf(stderr, "Error: %s.", strerror(errno));
fclose(db_fd);
free(msg->source_id);
free(msg->topic);
free(msg->payload);
return 1;
if(chunk.F.expiry_time > 0){
message_expiry_interval64 = chunk.F.expiry_time - time(NULL);
if(message_expiry_interval64 < 0 || message_expiry_interval64 > UINT32_MAX){
message_expiry_interval = 0;
}else{
message_expiry_interval = (uint32_t)message_expiry_interval64;
}
}else{
message_expiry_interval = 0;
}
rc = db__message_store(db, &chunk.source, chunk.F.source_mid,
chunk.topic, chunk.F.qos, chunk.F.payloadlen,
&chunk.payload, chunk.F.retain, &stored, message_expiry_interval,
chunk.properties, chunk.F.store_id, mosq_mo_client);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
chunk.source.id = NULL;
chunk.source.username = NULL;
if(rc == MOSQ_ERR_SUCCESS){
/*
printf("stored:%p\n", stored);
stored->source_listener = chunk.source.listener;
load->db_id = stored->db_id;
load->store = stored;
*/
HASH_ADD(hh, db->msg_store_load, db_id, sizeof(dbid_t), load);
}else{
mosquitto__free(load);
fclose(db_fptr);
return rc;
}
if(client_stats){
mcs = calloc(1, sizeof(struct msg_store_chunk));
if(!mcs){
errno = ENOMEM;
goto error;
return 1;
}
mcs->store_id = msg->store_id;
mcs->store_id = chunk.F.store_id;
mcs->length = length;
printf("ADD msgs %" PRIu64 "\n", mcs->store_id);
HASH_ADD(hh, msgs_by_id, store_id, sizeof(dbid_t), mcs);
}
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd) fclose(db_fd);
free(msg->source_id);
free(msg->topic);
return 1;
if(do_print){
print__msg_store(&chunk, length);
}
free__msg_store(&chunk);
return 0;
}
static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd)
static int dump__retain_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
{
dbid_t i64temp, store_id;
struct P_retain chunk;
int rc;
if(fread(&i64temp, sizeof(dbid_t), 1, db_fd) != 1){
fprintf(stderr, "Error: %s.", strerror(errno));
retain_count++;
if(do_print) printf("DB_CHUNK_RETAIN:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db_version == 5){
rc = persist__chunk_retain_read_v5(db_fd, &chunk);
}else{
rc = persist__chunk_retain_read_v234(db_fd, &chunk);
}
if(rc){
fclose(db_fd);
return 1;
return rc;
}
store_id = i64temp;
if(do_print) printf("\tStore ID: %" PRIu64 "\n", store_id);
if(do_print) printf("\tStore ID: %" PRIu64 "\n", chunk.F.store_id);
return 0;
}
static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_sub *sub)
static int dump__sub_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length)
{
int rc = 0;
struct client_chunk *cc;
struct P_sub chunk;
struct client_data *cc;
rc = persist__read_string(db_fd, &sub->client_id);
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
sub_count++;
rc = persist__read_string(db_fd, &sub->topic);
memset(&chunk, 0, sizeof(struct P_sub));
if(db_version == 5){
rc = persist__chunk_sub_read_v5(db_fd, &chunk);
}else{
rc = persist__chunk_sub_read_v234(db_fd, &chunk);
}
if(rc){
fprintf(stderr, "Error: Corrupt persistent database.");
fclose(db_fd);
return rc;
}
read_e(db_fd, &sub->qos, sizeof(uint8_t));
if(client_stats){
HASH_FIND(hh_id, clients_by_id, sub->client_id, strlen(sub->client_id), cc);
HASH_FIND(hh_id, clients_by_id, chunk.client_id, strlen(chunk.client_id), cc);
if(cc){
printf("FOUND S %s\n", chunk.client_id);
cc->subscriptions++;
cc->subscription_size += length;
}
}
return rc;
error:
fprintf(stderr, "Error: %s.", strerror(errno));
if(db_fd >= 0) fclose(db_fd);
return 1;
if(do_print) {
print__sub(&chunk, length);
}
free__sub(&chunk);
return 0;
}
int main(int argc, char *argv[])
{
FILE *fd;
char header[15];
int rc = 0;
uint32_t crc;
dbid_t i64temp;
uint32_t i32temp, length;
uint16_t i16temp, chunk;
uint8_t i8temp;
ssize_t rlen;
uint32_t i32temp;
int length;
int chunk;
struct mosquitto_db db;
char *filename;
long cfg_count = 0;
long msg_store_count = 0;
long client_msg_count = 0;
long retain_count = 0;
long sub_count = 0;
long client_count = 0;
struct client_chunk *cc, *cc_tmp;
struct client_data *cc, *cc_tmp;
if(argc == 2){
filename = argv[1];
@ -508,74 +416,34 @@ int main(int argc, char *argv[])
db_version = ntohl(i32temp);
if(do_print) printf("DB version: %d\n", db_version);
while(rlen = fread(&i16temp, sizeof(uint16_t), 1, fd), rlen == 1){
chunk = ntohs(i16temp);
read_e(fd, &i32temp, sizeof(uint32_t));
length = ntohl(i32temp);
if(db_version > MOSQ_DB_VERSION){
if(do_print) printf("Warning: mosquitto_db_dump does not support this DB version, continuing but expecting errors.\n");
}
while(persist__chunk_header_read(fd, &chunk, &length) == MOSQ_ERR_SUCCESS){
switch(chunk){
case DB_CHUNK_CFG:
cfg_count++;
if(do_print) printf("DB_CHUNK_CFG:\n");
if(do_print) printf("\tLength: %d\n", length);
read_e(fd, &i8temp, sizeof(uint8_t)); // shutdown
if(do_print) printf("\tShutdown: %d\n", i8temp);
read_e(fd, &i8temp, sizeof(uint8_t)); // sizeof(dbid_t)
if(do_print) printf("\tDB ID size: %d\n", i8temp);
if(i8temp != sizeof(dbid_t)){
fprintf(stderr, "Error: Incompatible database configuration (dbid size is %d bytes, expected %ld)",
i8temp, sizeof(dbid_t));
fclose(fd);
return 1;
}
read_e(fd, &i64temp, sizeof(dbid_t));
if(do_print) printf("\tLast DB ID: %ld\n", (long)i64temp);
if(dump__cfg_chunk_process(&db, fd, length)) return 1;
break;
case DB_CHUNK_MSG_STORE:
msg_store_count++;
struct db_msg msg = {0};
if(db__msg_store_chunk_restore(&db, fd, length, &msg)) return 1;
if(do_print) {
print_db_msg(&msg, length);
}
free__db_msg(&msg);
if(dump__msg_store_chunk_process(&db, fd, length)) return 1;
break;
case DB_CHUNK_CLIENT_MSG:
client_msg_count++;
struct db_client_msg cmsg = {0};
if(db__client_msg_chunk_restore(&db, fd, length, &cmsg)) return 1;
if(do_print) {
print_db_client_msg(&cmsg, length);
}
free__db_client_msg(&cmsg);
if(dump__client_msg_chunk_process(&db, fd, length)) return 1;
break;
case DB_CHUNK_RETAIN:
retain_count++;
if(do_print) printf("DB_CHUNK_RETAIN:\n");
if(do_print) printf("\tLength: %d\n", length);
if(db__retain_chunk_restore(&db, fd)) return 1;
if(dump__retain_chunk_process(&db, fd, length)) return 1;
break;
case DB_CHUNK_SUB:
sub_count++;
struct db_sub sub = {0};
if(db__sub_chunk_restore(&db, fd, length, &sub)) return 1;
if(do_print) {
print_db_sub(&sub, length);
}
free__db_sub(&sub);
if(dump__sub_chunk_process(&db, fd, length)) return 1;
break;
case DB_CHUNK_CLIENT:
client_count++;
struct db_client client = {0};
if(db__client_chunk_restore(&db, fd, &client)) return 1;
if(do_print) {
print_db_client(&client, length);
}
free__db_client(&client);
if(dump__client_chunk_process(&db, fd, length)) return 1;
break;
default:
@ -584,7 +452,6 @@ int main(int argc, char *argv[])
break;
}
}
if(rlen < 0) goto error;
}else{
fprintf(stderr, "Error: Unrecognised file format.");
rc = 1;

@ -0,0 +1,26 @@
#ifndef DB_DUMP_H
#define DB_DUMP_H
/*
Copyright (c) 2010-2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#include <persist.h>
void print__client(struct P_client *chunk, int length);
void print__client_msg(struct P_client_msg *chunk, int length);
void print__msg_store(struct P_msg_store *chunk, int length);
void print__sub(struct P_sub *chunk, int length);
#endif

@ -0,0 +1,210 @@
/*
Copyright (c) 2010-2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#include <inttypes.h>
#include <stdio.h>
#include <mosquitto_broker_internal.h>
#include <memory_mosq.h>
#include <mqtt_protocol.h>
#include <persist.h>
#include <property_mosq.h>
static void print__properties(mosquitto_property *properties)
{
int i;
if(properties == NULL) return;
printf("\tProperties:\n");
while(properties){
switch(properties->identifier){
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
printf("\t\tPayload format indicator: %d\n", properties->value.i8);
break;
case MQTT_PROP_REQUEST_PROBLEM_INFORMATION:
printf("\t\tRequest problem information: %d\n", properties->value.i8);
break;
case MQTT_PROP_REQUEST_RESPONSE_INFORMATION:
printf("\t\tRequest response information: %d\n", properties->value.i8);
break;
case MQTT_PROP_MAXIMUM_QOS:
printf("\t\tMaximum QoS: %d\n", properties->value.i8);
break;
case MQTT_PROP_RETAIN_AVAILABLE:
printf("\t\tRetain available: %d\n", properties->value.i8);
break;
case MQTT_PROP_WILDCARD_SUB_AVAILABLE:
printf("\t\tWildcard sub available: %d\n", properties->value.i8);
break;
case MQTT_PROP_SUBSCRIPTION_ID_AVAILABLE:
printf("\t\tSubscription ID available: %d\n", properties->value.i8);
break;
case MQTT_PROP_SHARED_SUB_AVAILABLE:
printf("\t\tShared subscription available: %d\n", properties->value.i8);
break;
case MQTT_PROP_SERVER_KEEP_ALIVE:
printf("\t\tServer keep alive: %d\n", properties->value.i16);
break;
case MQTT_PROP_RECEIVE_MAXIMUM:
printf("\t\tReceive maximum: %d\n", properties->value.i16);
break;
case MQTT_PROP_TOPIC_ALIAS_MAXIMUM:
printf("\t\tTopic alias maximum: %d\n", properties->value.i16);
break;
case MQTT_PROP_TOPIC_ALIAS:
printf("\t\tTopic alias: %d\n", properties->value.i16);
break;
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
printf("\t\tMessage expiry interval: %d\n", properties->value.i32);
break;
case MQTT_PROP_SESSION_EXPIRY_INTERVAL:
printf("\t\tSession expiry interval: %d\n", properties->value.i32);
break;
case MQTT_PROP_WILL_DELAY_INTERVAL:
printf("\t\tWill delay interval: %d\n", properties->value.i32);
break;
case MQTT_PROP_MAXIMUM_PACKET_SIZE:
printf("\t\tMaximum packet size: %d\n", properties->value.i32);
break;
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
printf("\t\tSubscription identifier: %d\n", properties->value.varint);
break;
case MQTT_PROP_CONTENT_TYPE:
printf("\t\tContent type: %s\n", properties->value.s.v);
break;
case MQTT_PROP_RESPONSE_TOPIC:
printf("\t\tResponse topic: %s\n", properties->value.s.v);
break;
case MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER:
printf("\t\tAssigned client identifier: %s\n", properties->value.s.v);
break;
case MQTT_PROP_AUTHENTICATION_METHOD:
printf("\t\tAuthentication method: %s\n", properties->value.s.v);
break;
case MQTT_PROP_RESPONSE_INFORMATION:
printf("\t\tResponse information: %s\n", properties->value.s.v);
break;
case MQTT_PROP_SERVER_REFERENCE:
printf("\t\tServer reference: %s\n", properties->value.s.v);
break;
case MQTT_PROP_REASON_STRING:
printf("\t\tReason string: %s\n", properties->value.s.v);
break;
case MQTT_PROP_AUTHENTICATION_DATA:
printf("\t\tAuthentication data: ");
for(i=0; i<properties->value.bin.len; i++){
printf("%02X", properties->value.bin.v[i]);
}
printf("\n");
break;
case MQTT_PROP_CORRELATION_DATA:
printf("\t\tCorrelation data: ");
for(i=0; i<properties->value.bin.len; i++){
printf("%02X", properties->value.bin.v[i]);
}
printf("\n");
break;
case MQTT_PROP_USER_PROPERTY:
printf("\t\tUser property: %s , %s\n", properties->name.v, properties->value.s.v);
break;
default:
printf("\t\tInvalid property type: %d\n", properties->identifier);
break;
}
properties = properties->next;
}
}
void print__client(struct P_client *chunk, int length)
{
printf("DB_CHUNK_CLIENT:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", chunk->client_id);
printf("\tLast MID: %d\n", chunk->F.last_mid);
printf("\tSession expiry time: %" PRIu64 "\n", chunk->F.session_expiry_time);
printf("\tSession expiry interval: %u\n", chunk->F.session_expiry_interval);
}
void print__client_msg(struct P_client_msg *chunk, int length)
{
printf("DB_CHUNK_CLIENT_MSG:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", chunk->client_id);
printf("\tStore ID: %" PRIu64 "\n", chunk->F.store_id);
printf("\tMID: %d\n", chunk->F.mid);
printf("\tQoS: %d\n", chunk->F.qos);
printf("\tRetain: %d\n", (chunk->F.retain_dup&0xF0)>>4);
printf("\tDirection: %d\n", chunk->F.direction);
printf("\tState: %d\n", chunk->F.state);
printf("\tDup: %d\n", chunk->F.retain_dup&0x0F);
print__properties(chunk->properties);
}
void print__msg_store(struct P_msg_store *chunk, int length)
{
printf("DB_CHUNK_MSG_STORE:\n");
printf("\tLength: %d\n", length);
printf("\tStore ID: %" PRIu64 "\n", chunk->F.store_id);
//printf("\tSource ID: %s\n", chunk->source_id);
//printf("\tSource Username: %s\n", chunk->source_username);
printf("\tSource Port: %d\n", chunk->F.source_port);
printf("\tSource MID: %d\n", chunk->F.source_mid);
printf("\tTopic: %s\n", chunk->topic);
printf("\tQoS: %d\n", chunk->F.qos);
printf("\tRetain: %d\n", chunk->F.retain);
printf("\tPayload Length: %d\n", chunk->F.payloadlen);
printf("\tExpiry Time: %" PRIu64 "\n", chunk->F.expiry_time);
bool binary = false;
uint8_t *payload;
payload = UHPA_ACCESS(chunk->payload, chunk->F.payloadlen);
for(int i=0; i<chunk->F.payloadlen; i++){
if(payload[i] == 0) binary = true;
}
if(binary == false && chunk->F.payloadlen<256){
printf("\tPayload: %s\n", payload);
}
print__properties(chunk->properties);
}
void print__sub(struct P_sub *chunk, int length)
{
printf("DB_CHUNK_SUB:\n");
printf("\tLength: %d\n", length);
printf("\tClient ID: %s\n", chunk->client_id);
printf("\tTopic: %s\n", chunk->topic);
printf("\tQoS: %d\n", chunk->F.qos);
printf("\tSubscription ID: %d\n", chunk->F.identifier);
printf("\tOptions: 0x%02X\n", chunk->F.options);
}

@ -0,0 +1,73 @@
#include <stdlib.h>
#include <string.h>
#include "mosquitto_broker_internal.h"
#include "mosquitto_internal.h"
struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
{
return NULL;
}
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
{
return 0;
}
int handle__packet(struct mosquitto_db *db, struct mosquitto *context)
{
return 0;
}
int log__printf(struct mosquitto *mosq, int level, const char *fmt, ...)
{
return 0;
}
void *mosquitto__calloc(size_t nmemb, size_t len)
{
return calloc(nmemb, len);
}
void mosquitto__free(void *p)
{
free(p);
}
FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read)
{
return NULL;
}
void *mosquitto__malloc(size_t len)
{
return malloc(len);
}
char *mosquitto__strdup(const char *s)
{
return strdup(s);
}
ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count)
{
return 0;
}
ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count)
{
return 0;
}
int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, uint32_t identifier, int options, struct mosquitto__subhier **root)
{
return 0;
}
int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored)
{
return 0;
}

@ -131,6 +131,8 @@ struct P_retain{
int persist__read_string_len(FILE *db_fptr, char **str, uint16_t len);
int persist__read_string(FILE *db_fptr, char **str);
int persist__chunk_header_read(FILE *db_fptr, int *chunk, int *length);
int persist__chunk_header_read_v234(FILE *db_fptr, int *chunk, int *length);
int persist__chunk_cfg_read_v234(FILE *db_fptr, struct PF_cfg *chunk);
int persist__chunk_client_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version);

@ -36,7 +36,7 @@ Contributors:
#include "time_mosq.h"
#include "util_mosq.h"
static uint32_t db_version;
uint32_t db_version;
const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};

Loading…
Cancel
Save