From c269e6f019d2a395c2137f4d0819ff57e5cf804d Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 1 Aug 2019 15:49:07 +0100 Subject: [PATCH] db_dump: Update for version 5 file format Closes #1320. Thanks to Christoph Krey. --- src/db_dump/Makefile | 62 ++++- src/db_dump/db_dump.c | 565 ++++++++++++++++-------------------------- src/db_dump/db_dump.h | 26 ++ src/db_dump/print.c | 210 ++++++++++++++++ src/db_dump/stubs.c | 73 ++++++ src/persist.h | 2 + src/persist_read.c | 2 +- 7 files changed, 586 insertions(+), 354 deletions(-) create mode 100644 src/db_dump/db_dump.h create mode 100644 src/db_dump/print.c create mode 100644 src/db_dump/stubs.c diff --git a/src/db_dump/Makefile b/src/db_dump/Makefile index 202af870..1f9977bb 100644 --- a/src/db_dump/Makefile +++ b/src/db_dump/Makefile @@ -1,16 +1,70 @@ include ../../config.mk -CFLAGS_FINAL=${CFLAGS} -I.. -I../../lib -I../.. -I../deps +CFLAGS_FINAL=${CFLAGS} -I.. -I../../ -I../../lib -I../.. -I../deps -DWITH_BROKER -DWITH_PERSISTENCE + +OBJS = \ + db_dump.o \ + print.o \ + \ + packet_datatypes.o \ + packet_mosq.o \ + persist_read.o \ + persist_read_v234.o \ + persist_read_v5.o \ + property_mosq.o \ + send_disconnect.o \ + stubs.o \ + time_mosq.o \ + utf8_mosq.o .PHONY: all clean reallyclean all : mosquitto_db_dump -mosquitto_db_dump : db_dump.o +mosquitto_db_dump : ${OBJS} ${CROSS_COMPILE}${CC} $^ -o $@ ${LDFLAGS} ${LIBS} -db_dump.o : db_dump.c ../persist.h +db_dump.o : db_dump.c db_dump.h ../persist.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +print.o : print.c db_dump.h ../persist.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +net_mosq.o : ../../lib/net_mosq.c ../../lib/net_mosq.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +packet_datatypes.o : ../../lib/packet_datatypes.c ../../lib/packet_mosq.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +packet_mosq.o : ../../lib/packet_mosq.c ../../lib/packet_mosq.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +persist_read.o : ../persist_read.c ../persist.h ../mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +persist_read_v234.o : ../persist_read_v234.c ../persist.h ../mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +persist_read_v5.o : ../persist_read_v5.c ../persist.h ../mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +property_mosq.o : ../../lib/property_mosq.c ../../lib/property_mosq.h + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +read_handle.o : ../../src/read_handle.c + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +stubs.o : stubs.c + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +send_disconnect.o : ../../lib/send_disconnect.c + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +time_mosq.o : ../../lib/time_mosq.c + ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ + +utf8_mosq.o : ../../lib/utf8_mosq.c ${CROSS_COMPILE}${CC} $(CFLAGS_FINAL) -c $< -o $@ -clean : +clean : -rm -f *.o mosquitto_db_dump diff --git a/src/db_dump/db_dump.c b/src/db_dump/db_dump.c index 618a6784..5928017e 100644 --- a/src/db_dump/db_dump.c +++ b/src/db_dump/db_dump.c @@ -1,5 +1,5 @@ /* -Copyright (c) 2010-2012 Roger Light +Copyright (c) 2010-2019 Roger Light All rights reserved. This program and the accompanying materials are made available under the terms of the Eclipse Public License v1.0 @@ -35,9 +35,9 @@ Contributors: #define _mosquitto_free(A) free((A)) #include -const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'}; +#include "db_dump.h" -struct client_chunk +struct client_data { UT_hash_handle hh_id; char *id; @@ -54,432 +54,340 @@ struct msg_store_chunk uint32_t length; }; -struct db_sub -{ - char *client_id; - char *topic; - uint8_t qos; -}; - -struct db_client -{ - char *client_id; - uint16_t last_mid; - time_t disconnect_t; -}; - -struct db_client_msg -{ - char *client_id; - uint8_t qos, retain, direction, state, dup; - dbid_t store_id; - uint16_t mid; -}; - -struct db_msg -{ - dbid_t store_id; - uint32_t payloadlen; - uint16_t source_mid, mid; - uint8_t qos, retain; - uint8_t *payload; - char *source_id; - char *source_username; - char *topic; - uint16_t source_port; -}; - -static uint32_t db_version; +extern uint32_t db_version; static int stats = 0; static int client_stats = 0; static int do_print = 1; -struct client_chunk *clients_by_id = NULL; +/* Counts */ +static long cfg_count = 0; +static long client_count = 0; +static long client_msg_count = 0; +static long msg_store_count = 0; +static long retain_count = 0; +static long sub_count = 0; +/* ====== */ + + +struct client_data *clients_by_id = NULL; struct msg_store_chunk *msgs_by_id = NULL; -static void -free__db_sub(struct db_sub *sub) -{ - if (sub->client_id) { - free(sub->client_id); - } - if (sub->topic) { - free(sub->topic); - } -} -static void -free__db_client(struct db_client *client) +static void free__sub(struct P_sub *chunk) { - if (client->client_id) { - free(client->client_id); - } + free(chunk->client_id); + free(chunk->topic); } -static void -free__db_client_msg(struct db_client_msg *msg) +static void free__client(struct P_client *chunk) { - if (msg->client_id) { - free(msg->client_id); - } + free(chunk->client_id); } -static void -free__db_msg(struct db_msg *msg) -{ - if (msg->source_id) { - free(msg->source_id); - } - if (msg->topic) { - free(msg->topic); - } - if (msg->payload) { - free(msg->payload); - } -} -static void -print_db_client(struct db_client *client, int length) +static void free__client_msg(struct P_client_msg *chunk) { - printf("DB_CHUNK_CLIENT:\n"); - printf("\tLength: %d\n", length); - printf("\tClient ID: %s\n", client->client_id); - printf("\tLast MID: %d\n", client->last_mid); - printf("\tDisconnect time: %ld\n", client->disconnect_t); + free(chunk->client_id); + mosquitto_property_free_all(&chunk->properties); } -static void -print_db_client_msg(struct db_client_msg *msg, int length) -{ - printf("DB_CHUNK_CLIENT_MSG:\n"); - printf("\tLength: %d\n", length); - printf("\tClient ID: %s\n", msg->client_id); - printf("\tStore ID: %" PRIu64 "\n", msg->store_id); - printf("\tMID: %d\n", msg->mid); - printf("\tQoS: %d\n", msg->qos); - printf("\tRetain: %d\n", msg->retain); - printf("\tDirection: %d\n", msg->direction); - printf("\tState: %d\n", msg->state); - printf("\tDup: %d\n", msg->dup); -} -static void -print_db_sub(struct db_sub *sub, int length) +static void free__msg_store(struct P_msg_store *chunk) { - printf("DB_CHUNK_SUB:\n"); - printf("\tLength: %d\n", length); - printf("\tClient ID: %s\n", sub->client_id); - printf("\tTopic: %s\n", sub->topic); - printf("\tQoS: %d\n", sub->qos); + //free(chunk->source_id); + free(chunk->topic); + UHPA_FREE(chunk->payload, chunk->F.payloadlen); + mosquitto_property_free_all(&chunk->properties); } -static void -print_db_msg(struct db_msg *msg, int length) + +static int dump__cfg_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length) { - printf("DB_CHUNK_MSG_STORE:\n"); - printf("\tLength: %d\n", length); - printf("\tStore ID: %" PRIu64 "\n", msg->store_id); - printf("\tSource ID: %s\n", msg->source_id); - printf("\tSource Username: %s\n", msg->source_username); - printf("\tSource Port: %d\n", msg->source_port); - printf("\tSource MID: %d\n", msg->source_mid); - printf("\tMID: %d\n", msg->mid); - printf("\tTopic: %s\n", msg->topic); - printf("\tQoS: %d\n", msg->qos); - printf("\tRetain: %d\n", msg->retain); - printf("\tPayload Length: %d\n", msg->payloadlen); - - bool binary = false; - for(int i=0; ipayloadlen; i++){ - if(msg->payload[i] == 0) binary = true; - } - if(binary == false && msg->payloadlen<256){ - printf("\tPayload: %s\n", msg->payload); - } -} + struct PF_cfg chunk; + int rc; + cfg_count++; -int persist__read_string(FILE *db_fptr, char **str) -{ - uint16_t i16temp; - uint16_t slen; - char *s = NULL; + memset(&chunk, 0, sizeof(struct PF_cfg)); - if(fread(&i16temp, 1, sizeof(uint16_t), db_fptr) != sizeof(uint16_t)){ - return MOSQ_ERR_INVAL; + if(db_version == 5){ + rc = persist__chunk_cfg_read_v5(db_fd, &chunk); + }else{ + rc = persist__chunk_cfg_read_v234(db_fd, &chunk); } - - slen = ntohs(i16temp); - if(slen){ - s = mosquitto__malloc(slen+1); - if(!s){ - fclose(db_fptr); - fprintf(stderr, "Error: Out of memory.\n"); - return MOSQ_ERR_NOMEM; - } - if(fread(s, 1, slen, db_fptr) != slen){ - mosquitto__free(s); - fprintf(stderr, "Error: %s.\n", strerror(errno)); - return MOSQ_ERR_INVAL; - } - s[slen] = '\0'; + if(rc){ + fprintf(stderr, "Error: Corrupt persistent database."); + fclose(db_fd); + return rc; + } + + if(do_print) printf("DB_CHUNK_CFG:\n"); + if(do_print) printf("\tLength: %d\n", length); + if(do_print) printf("\tShutdown: %d\n", chunk.shutdown); + if(do_print) printf("\tDB ID size: %d\n", chunk.dbid_size); + if(chunk.dbid_size != sizeof(dbid_t)){ + fprintf(stderr, "Error: Incompatible database configuration (dbid size is %d bytes, expected %ld)", + chunk.dbid_size, sizeof(dbid_t)); + fclose(db_fd); + return 1; } + if(do_print) printf("\tLast DB ID: %" PRIu64 "\n", chunk.last_db_id); - *str = s; - return MOSQ_ERR_SUCCESS; + return 0; } -static int db__client_chunk_restore(struct mosquitto_db *db, FILE *db_fd, struct db_client *client) +static int dump__client_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length) { - uint16_t i16temp; + struct P_client chunk; int rc = 0; - struct client_chunk *cc; + struct client_data *cc; + + client_count++; - rc = persist__read_string(db_fd, &client->client_id); + memset(&chunk, 0, sizeof(struct P_client)); + + if(db_version == 5){ + rc = persist__chunk_client_read_v5(db_fd, &chunk); + }else{ + rc = persist__chunk_client_read_v234(db_fd, &chunk, db_version); + } if(rc){ fprintf(stderr, "Error: Corrupt persistent database."); fclose(db_fd); return rc; } - read_e(db_fd, &i16temp, sizeof(uint16_t)); - client->last_mid = ntohs(i16temp); - - if(db_version == 2){ - client->disconnect_t = time(NULL); - }else{ - read_e(db_fd, &client->disconnect_t, sizeof(time_t)); - } - if(client_stats){ - cc = calloc(1, sizeof(struct client_chunk)); + cc = calloc(1, sizeof(struct client_data)); if(!cc){ - errno = ENOMEM; - goto error; + fprintf(stderr, "Error: Out of memory.\n"); + fclose(db_fd); + free(chunk.client_id); + return 1; } - cc->id = strdup(client->client_id); + cc->id = strdup(chunk.client_id); HASH_ADD_KEYPTR(hh_id, clients_by_id, cc->id, strlen(cc->id), cc); } - return rc; -error: - fprintf(stderr, "Error: %s.", strerror(errno)); - if(db_fd) fclose(db_fd); - free(client->client_id); - return 1; + if(do_print) { + print__client(&chunk, length); + } + free__client(&chunk); + + return 0; } -static int db__client_msg_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_client_msg *msg) + +static int dump__client_msg_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length) { - dbid_t i64temp; - uint16_t i16temp; - struct client_chunk *cc; + struct P_client_msg chunk; + struct client_data *cc; struct msg_store_chunk *msc; int rc; - rc = persist__read_string(db_fd, &msg->client_id); + client_msg_count++; + + memset(&chunk, 0, sizeof(struct P_client_msg)); + if(db_version == 5){ + rc = persist__chunk_client_msg_read_v5(db_fd, &chunk, length); + }else{ + rc = persist__chunk_client_msg_read_v234(db_fd, &chunk); + } if(rc){ fprintf(stderr, "Error: Corrupt persistent database."); fclose(db_fd); return rc; } - read_e(db_fd, &i64temp, sizeof(dbid_t)); - msg->store_id = i64temp; - - read_e(db_fd, &i16temp, sizeof(uint16_t)); - msg->mid = ntohs(i16temp); - - read_e(db_fd, &msg->qos, sizeof(uint8_t)); - read_e(db_fd, &msg->retain, sizeof(uint8_t)); - read_e(db_fd, &msg->direction, sizeof(uint8_t)); - read_e(db_fd, &msg->state, sizeof(uint8_t)); - read_e(db_fd, &msg->dup, sizeof(uint8_t)); - if(client_stats){ - HASH_FIND(hh_id, clients_by_id, msg->client_id, strlen(msg->client_id), cc); + HASH_FIND(hh_id, clients_by_id, chunk.client_id, strlen(chunk.client_id), cc); if(cc){ + printf("FOUND %s\n", chunk.client_id); cc->messages++; cc->message_size += length; - HASH_FIND(hh, msgs_by_id, &msg->store_id, sizeof(dbid_t), msc); + HASH_FIND(hh, msgs_by_id, &chunk.F.store_id, sizeof(dbid_t), msc); if(msc){ cc->message_size += msc->length; } } } + if(do_print) { + print__client_msg(&chunk, length); + } + free__client_msg(&chunk); return 0; -error: - fprintf(stderr, "Error: %s.", strerror(errno)); - if(db_fd) fclose(db_fd); - free(msg->client_id); - return 1; } -static int db__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_msg *msg) + +static int dump__msg_store_chunk_process(struct mosquitto_db *db, FILE *db_fptr, uint32_t length) { - dbid_t i64temp; - uint32_t i32temp; - uint16_t i16temp; + struct P_msg_store chunk; + struct mosquitto_msg_store *stored = NULL; + struct mosquitto_msg_store_load *load; + int64_t message_expiry_interval64; + uint32_t message_expiry_interval; int rc = 0; struct msg_store_chunk *mcs; - read_e(db_fd, &i64temp, sizeof(dbid_t)); - msg->store_id = i64temp; + msg_store_count++; - rc = persist__read_string(db_fd, &msg->source_id); - if(rc){ - fprintf(stderr, "Error: Corrupt persistent database."); - fclose(db_fd); - return rc; - } - if(db_version == 4){ - rc = persist__read_string(db_fd, &msg->source_username); - if(rc){ - fprintf(stderr, "Error: %s.", strerror(errno)); - fclose(db_fd); - free(msg->source_id); - free(msg->topic); - free(msg->payload); - free(msg->source_id); - return 1; - } - read_e(db_fd, &i16temp, sizeof(uint16_t)); - msg->source_port = ntohs(i16temp); + memset(&chunk, 0, sizeof(struct P_msg_store)); + if(db_version == 5){ + rc = persist__chunk_msg_store_read_v5(db_fptr, &chunk, length); + }else{ + rc = persist__chunk_msg_store_read_v234(db_fptr, &chunk, db_version); } - - - read_e(db_fd, &i16temp, sizeof(uint16_t)); - msg->source_mid = ntohs(i16temp); - - read_e(db_fd, &i16temp, sizeof(uint16_t)); - msg->mid = ntohs(i16temp); - - rc = persist__read_string(db_fd, &msg->topic); if(rc){ fprintf(stderr, "Error: Corrupt persistent database."); - fclose(db_fd); + fclose(db_fptr); return rc; } - read_e(db_fd, &msg->qos, sizeof(uint8_t)); - read_e(db_fd, &msg->retain, sizeof(uint8_t)); - - read_e(db_fd, &i32temp, sizeof(uint32_t)); - msg->payloadlen = ntohl(i32temp); + load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load)); + if(!load){ + fclose(db_fptr); + mosquitto__free(chunk.source.id); + mosquitto__free(chunk.source.username); + mosquitto__free(chunk.topic); + UHPA_FREE(chunk.payload, chunk.F.payloadlen); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } - if(msg->payloadlen){ - msg->payload = malloc(msg->payloadlen+1); - if(!msg->payload){ - fclose(db_fd); - free(msg->source_id); - free(msg->topic); - fprintf(stderr, "Error: Out of memory."); - return 1; - } - memset(msg->payload, 0, msg->payloadlen+1); - if(fread(msg->payload, 1, msg->payloadlen, db_fd) != msg->payloadlen){ - fprintf(stderr, "Error: %s.", strerror(errno)); - fclose(db_fd); - free(msg->source_id); - free(msg->topic); - free(msg->payload); - return 1; + if(chunk.F.expiry_time > 0){ + message_expiry_interval64 = chunk.F.expiry_time - time(NULL); + if(message_expiry_interval64 < 0 || message_expiry_interval64 > UINT32_MAX){ + message_expiry_interval = 0; + }else{ + message_expiry_interval = (uint32_t)message_expiry_interval64; } + }else{ + message_expiry_interval = 0; + } + + rc = db__message_store(db, &chunk.source, chunk.F.source_mid, + chunk.topic, chunk.F.qos, chunk.F.payloadlen, + &chunk.payload, chunk.F.retain, &stored, message_expiry_interval, + chunk.properties, chunk.F.store_id, mosq_mo_client); + + mosquitto__free(chunk.source.id); + mosquitto__free(chunk.source.username); + chunk.source.id = NULL; + chunk.source.username = NULL; + + if(rc == MOSQ_ERR_SUCCESS){ + /* + printf("stored:%p\n", stored); + stored->source_listener = chunk.source.listener; + load->db_id = stored->db_id; + load->store = stored; + */ + + HASH_ADD(hh, db->msg_store_load, db_id, sizeof(dbid_t), load); + }else{ + mosquitto__free(load); + fclose(db_fptr); + return rc; } if(client_stats){ mcs = calloc(1, sizeof(struct msg_store_chunk)); if(!mcs){ errno = ENOMEM; - goto error; + return 1; } - mcs->store_id = msg->store_id; + mcs->store_id = chunk.F.store_id; mcs->length = length; + printf("ADD msgs %" PRIu64 "\n", mcs->store_id); HASH_ADD(hh, msgs_by_id, store_id, sizeof(dbid_t), mcs); } - return rc; -error: - fprintf(stderr, "Error: %s.", strerror(errno)); - if(db_fd) fclose(db_fd); - free(msg->source_id); - free(msg->topic); - return 1; + if(do_print){ + print__msg_store(&chunk, length); + } + free__msg_store(&chunk); + + return 0; } -static int db__retain_chunk_restore(struct mosquitto_db *db, FILE *db_fd) + +static int dump__retain_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length) { - dbid_t i64temp, store_id; + struct P_retain chunk; + int rc; - if(fread(&i64temp, sizeof(dbid_t), 1, db_fd) != 1){ - fprintf(stderr, "Error: %s.", strerror(errno)); + retain_count++; + if(do_print) printf("DB_CHUNK_RETAIN:\n"); + if(do_print) printf("\tLength: %d\n", length); + + if(db_version == 5){ + rc = persist__chunk_retain_read_v5(db_fd, &chunk); + }else{ + rc = persist__chunk_retain_read_v234(db_fd, &chunk); + } + if(rc){ fclose(db_fd); - return 1; + return rc; } - store_id = i64temp; - if(do_print) printf("\tStore ID: %" PRIu64 "\n", store_id); + + if(do_print) printf("\tStore ID: %" PRIu64 "\n", chunk.F.store_id); return 0; } -static int db__sub_chunk_restore(struct mosquitto_db *db, FILE *db_fd, uint32_t length, struct db_sub *sub) + +static int dump__sub_chunk_process(struct mosquitto_db *db, FILE *db_fd, uint32_t length) { int rc = 0; - struct client_chunk *cc; + struct P_sub chunk; + struct client_data *cc; - rc = persist__read_string(db_fd, &sub->client_id); - if(rc){ - fprintf(stderr, "Error: Corrupt persistent database."); - fclose(db_fd); - return rc; - } + sub_count++; - rc = persist__read_string(db_fd, &sub->topic); + memset(&chunk, 0, sizeof(struct P_sub)); + if(db_version == 5){ + rc = persist__chunk_sub_read_v5(db_fd, &chunk); + }else{ + rc = persist__chunk_sub_read_v234(db_fd, &chunk); + } if(rc){ fprintf(stderr, "Error: Corrupt persistent database."); fclose(db_fd); return rc; } - read_e(db_fd, &sub->qos, sizeof(uint8_t)); - if(client_stats){ - HASH_FIND(hh_id, clients_by_id, sub->client_id, strlen(sub->client_id), cc); + HASH_FIND(hh_id, clients_by_id, chunk.client_id, strlen(chunk.client_id), cc); if(cc){ + printf("FOUND S %s\n", chunk.client_id); cc->subscriptions++; cc->subscription_size += length; } } - return rc; -error: - fprintf(stderr, "Error: %s.", strerror(errno)); - if(db_fd >= 0) fclose(db_fd); - return 1; + if(do_print) { + print__sub(&chunk, length); + } + free__sub(&chunk); + + return 0; } + int main(int argc, char *argv[]) { FILE *fd; char header[15]; int rc = 0; uint32_t crc; - dbid_t i64temp; - uint32_t i32temp, length; - uint16_t i16temp, chunk; - uint8_t i8temp; - ssize_t rlen; + uint32_t i32temp; + int length; + int chunk; struct mosquitto_db db; char *filename; - long cfg_count = 0; - long msg_store_count = 0; - long client_msg_count = 0; - long retain_count = 0; - long sub_count = 0; - long client_count = 0; - struct client_chunk *cc, *cc_tmp; + struct client_data *cc, *cc_tmp; if(argc == 2){ filename = argv[1]; @@ -508,74 +416,34 @@ int main(int argc, char *argv[]) db_version = ntohl(i32temp); if(do_print) printf("DB version: %d\n", db_version); - while(rlen = fread(&i16temp, sizeof(uint16_t), 1, fd), rlen == 1){ - chunk = ntohs(i16temp); - read_e(fd, &i32temp, sizeof(uint32_t)); - length = ntohl(i32temp); + if(db_version > MOSQ_DB_VERSION){ + if(do_print) printf("Warning: mosquitto_db_dump does not support this DB version, continuing but expecting errors.\n"); + } + + while(persist__chunk_header_read(fd, &chunk, &length) == MOSQ_ERR_SUCCESS){ switch(chunk){ case DB_CHUNK_CFG: - cfg_count++; - if(do_print) printf("DB_CHUNK_CFG:\n"); - if(do_print) printf("\tLength: %d\n", length); - read_e(fd, &i8temp, sizeof(uint8_t)); // shutdown - if(do_print) printf("\tShutdown: %d\n", i8temp); - read_e(fd, &i8temp, sizeof(uint8_t)); // sizeof(dbid_t) - if(do_print) printf("\tDB ID size: %d\n", i8temp); - if(i8temp != sizeof(dbid_t)){ - fprintf(stderr, "Error: Incompatible database configuration (dbid size is %d bytes, expected %ld)", - i8temp, sizeof(dbid_t)); - fclose(fd); - return 1; - } - read_e(fd, &i64temp, sizeof(dbid_t)); - if(do_print) printf("\tLast DB ID: %ld\n", (long)i64temp); + if(dump__cfg_chunk_process(&db, fd, length)) return 1; break; case DB_CHUNK_MSG_STORE: - msg_store_count++; - struct db_msg msg = {0}; - if(db__msg_store_chunk_restore(&db, fd, length, &msg)) return 1; - if(do_print) { - print_db_msg(&msg, length); - } - free__db_msg(&msg); + if(dump__msg_store_chunk_process(&db, fd, length)) return 1; break; case DB_CHUNK_CLIENT_MSG: - client_msg_count++; - struct db_client_msg cmsg = {0}; - if(db__client_msg_chunk_restore(&db, fd, length, &cmsg)) return 1; - if(do_print) { - print_db_client_msg(&cmsg, length); - } - free__db_client_msg(&cmsg); + if(dump__client_msg_chunk_process(&db, fd, length)) return 1; break; case DB_CHUNK_RETAIN: - retain_count++; - if(do_print) printf("DB_CHUNK_RETAIN:\n"); - if(do_print) printf("\tLength: %d\n", length); - if(db__retain_chunk_restore(&db, fd)) return 1; + if(dump__retain_chunk_process(&db, fd, length)) return 1; break; case DB_CHUNK_SUB: - sub_count++; - struct db_sub sub = {0}; - if(db__sub_chunk_restore(&db, fd, length, &sub)) return 1; - if(do_print) { - print_db_sub(&sub, length); - } - free__db_sub(&sub); + if(dump__sub_chunk_process(&db, fd, length)) return 1; break; case DB_CHUNK_CLIENT: - client_count++; - struct db_client client = {0}; - if(db__client_chunk_restore(&db, fd, &client)) return 1; - if(do_print) { - print_db_client(&client, length); - } - free__db_client(&client); + if(dump__client_chunk_process(&db, fd, length)) return 1; break; default: @@ -584,7 +452,6 @@ int main(int argc, char *argv[]) break; } } - if(rlen < 0) goto error; }else{ fprintf(stderr, "Error: Unrecognised file format."); rc = 1; diff --git a/src/db_dump/db_dump.h b/src/db_dump/db_dump.h new file mode 100644 index 00000000..8b233606 --- /dev/null +++ b/src/db_dump/db_dump.h @@ -0,0 +1,26 @@ +#ifndef DB_DUMP_H +#define DB_DUMP_H +/* +Copyright (c) 2010-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include + +void print__client(struct P_client *chunk, int length); +void print__client_msg(struct P_client_msg *chunk, int length); +void print__msg_store(struct P_msg_store *chunk, int length); +void print__sub(struct P_sub *chunk, int length); + +#endif diff --git a/src/db_dump/print.c b/src/db_dump/print.c new file mode 100644 index 00000000..0830c532 --- /dev/null +++ b/src/db_dump/print.c @@ -0,0 +1,210 @@ +/* +Copyright (c) 2010-2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include +#include + +#include +#include +#include +#include +#include + + +static void print__properties(mosquitto_property *properties) +{ + int i; + + if(properties == NULL) return; + + printf("\tProperties:\n"); + + while(properties){ + switch(properties->identifier){ + case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + printf("\t\tPayload format indicator: %d\n", properties->value.i8); + break; + case MQTT_PROP_REQUEST_PROBLEM_INFORMATION: + printf("\t\tRequest problem information: %d\n", properties->value.i8); + break; + case MQTT_PROP_REQUEST_RESPONSE_INFORMATION: + printf("\t\tRequest response information: %d\n", properties->value.i8); + break; + case MQTT_PROP_MAXIMUM_QOS: + printf("\t\tMaximum QoS: %d\n", properties->value.i8); + break; + case MQTT_PROP_RETAIN_AVAILABLE: + printf("\t\tRetain available: %d\n", properties->value.i8); + break; + case MQTT_PROP_WILDCARD_SUB_AVAILABLE: + printf("\t\tWildcard sub available: %d\n", properties->value.i8); + break; + case MQTT_PROP_SUBSCRIPTION_ID_AVAILABLE: + printf("\t\tSubscription ID available: %d\n", properties->value.i8); + break; + case MQTT_PROP_SHARED_SUB_AVAILABLE: + printf("\t\tShared subscription available: %d\n", properties->value.i8); + break; + + case MQTT_PROP_SERVER_KEEP_ALIVE: + printf("\t\tServer keep alive: %d\n", properties->value.i16); + break; + case MQTT_PROP_RECEIVE_MAXIMUM: + printf("\t\tReceive maximum: %d\n", properties->value.i16); + break; + case MQTT_PROP_TOPIC_ALIAS_MAXIMUM: + printf("\t\tTopic alias maximum: %d\n", properties->value.i16); + break; + case MQTT_PROP_TOPIC_ALIAS: + printf("\t\tTopic alias: %d\n", properties->value.i16); + break; + + case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + printf("\t\tMessage expiry interval: %d\n", properties->value.i32); + break; + case MQTT_PROP_SESSION_EXPIRY_INTERVAL: + printf("\t\tSession expiry interval: %d\n", properties->value.i32); + break; + case MQTT_PROP_WILL_DELAY_INTERVAL: + printf("\t\tWill delay interval: %d\n", properties->value.i32); + break; + case MQTT_PROP_MAXIMUM_PACKET_SIZE: + printf("\t\tMaximum packet size: %d\n", properties->value.i32); + break; + + case MQTT_PROP_SUBSCRIPTION_IDENTIFIER: + printf("\t\tSubscription identifier: %d\n", properties->value.varint); + break; + + case MQTT_PROP_CONTENT_TYPE: + printf("\t\tContent type: %s\n", properties->value.s.v); + break; + case MQTT_PROP_RESPONSE_TOPIC: + printf("\t\tResponse topic: %s\n", properties->value.s.v); + break; + case MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + printf("\t\tAssigned client identifier: %s\n", properties->value.s.v); + break; + case MQTT_PROP_AUTHENTICATION_METHOD: + printf("\t\tAuthentication method: %s\n", properties->value.s.v); + break; + case MQTT_PROP_RESPONSE_INFORMATION: + printf("\t\tResponse information: %s\n", properties->value.s.v); + break; + case MQTT_PROP_SERVER_REFERENCE: + printf("\t\tServer reference: %s\n", properties->value.s.v); + break; + case MQTT_PROP_REASON_STRING: + printf("\t\tReason string: %s\n", properties->value.s.v); + break; + + case MQTT_PROP_AUTHENTICATION_DATA: + printf("\t\tAuthentication data: "); + for(i=0; ivalue.bin.len; i++){ + printf("%02X", properties->value.bin.v[i]); + } + printf("\n"); + break; + case MQTT_PROP_CORRELATION_DATA: + printf("\t\tCorrelation data: "); + for(i=0; ivalue.bin.len; i++){ + printf("%02X", properties->value.bin.v[i]); + } + printf("\n"); + break; + + case MQTT_PROP_USER_PROPERTY: + printf("\t\tUser property: %s , %s\n", properties->name.v, properties->value.s.v); + break; + + default: + printf("\t\tInvalid property type: %d\n", properties->identifier); + break; + } + + properties = properties->next; + } +} + + +void print__client(struct P_client *chunk, int length) +{ + printf("DB_CHUNK_CLIENT:\n"); + printf("\tLength: %d\n", length); + printf("\tClient ID: %s\n", chunk->client_id); + printf("\tLast MID: %d\n", chunk->F.last_mid); + printf("\tSession expiry time: %" PRIu64 "\n", chunk->F.session_expiry_time); + printf("\tSession expiry interval: %u\n", chunk->F.session_expiry_interval); +} + + +void print__client_msg(struct P_client_msg *chunk, int length) +{ + printf("DB_CHUNK_CLIENT_MSG:\n"); + printf("\tLength: %d\n", length); + printf("\tClient ID: %s\n", chunk->client_id); + printf("\tStore ID: %" PRIu64 "\n", chunk->F.store_id); + printf("\tMID: %d\n", chunk->F.mid); + printf("\tQoS: %d\n", chunk->F.qos); + printf("\tRetain: %d\n", (chunk->F.retain_dup&0xF0)>>4); + printf("\tDirection: %d\n", chunk->F.direction); + printf("\tState: %d\n", chunk->F.state); + printf("\tDup: %d\n", chunk->F.retain_dup&0x0F); + print__properties(chunk->properties); +} + + +void print__msg_store(struct P_msg_store *chunk, int length) +{ + printf("DB_CHUNK_MSG_STORE:\n"); + printf("\tLength: %d\n", length); + printf("\tStore ID: %" PRIu64 "\n", chunk->F.store_id); + //printf("\tSource ID: %s\n", chunk->source_id); + //printf("\tSource Username: %s\n", chunk->source_username); + printf("\tSource Port: %d\n", chunk->F.source_port); + printf("\tSource MID: %d\n", chunk->F.source_mid); + printf("\tTopic: %s\n", chunk->topic); + printf("\tQoS: %d\n", chunk->F.qos); + printf("\tRetain: %d\n", chunk->F.retain); + printf("\tPayload Length: %d\n", chunk->F.payloadlen); + printf("\tExpiry Time: %" PRIu64 "\n", chunk->F.expiry_time); + + bool binary = false; + uint8_t *payload; + + payload = UHPA_ACCESS(chunk->payload, chunk->F.payloadlen); + for(int i=0; iF.payloadlen; i++){ + if(payload[i] == 0) binary = true; + } + if(binary == false && chunk->F.payloadlen<256){ + printf("\tPayload: %s\n", payload); + } + print__properties(chunk->properties); +} + + +void print__sub(struct P_sub *chunk, int length) +{ + printf("DB_CHUNK_SUB:\n"); + printf("\tLength: %d\n", length); + printf("\tClient ID: %s\n", chunk->client_id); + printf("\tTopic: %s\n", chunk->topic); + printf("\tQoS: %d\n", chunk->F.qos); + printf("\tSubscription ID: %d\n", chunk->F.identifier); + printf("\tOptions: 0x%02X\n", chunk->F.options); +} + + diff --git a/src/db_dump/stubs.c b/src/db_dump/stubs.c new file mode 100644 index 00000000..929e4aeb --- /dev/null +++ b/src/db_dump/stubs.c @@ -0,0 +1,73 @@ +#include +#include + +#include "mosquitto_broker_internal.h" +#include "mosquitto_internal.h" + +struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) +{ + return NULL; +} + +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) +{ + return 0; +} + + +int handle__packet(struct mosquitto_db *db, struct mosquitto *context) +{ + return 0; +} + +int log__printf(struct mosquitto *mosq, int level, const char *fmt, ...) +{ + return 0; +} + + +void *mosquitto__calloc(size_t nmemb, size_t len) +{ + return calloc(nmemb, len); +} + +void mosquitto__free(void *p) +{ + free(p); +} + +FILE *mosquitto__fopen(const char *path, const char *mode, bool restrict_read) +{ + return NULL; +} + +void *mosquitto__malloc(size_t len) +{ + return malloc(len); +} + +char *mosquitto__strdup(const char *s) +{ + return strdup(s); +} + +ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count) +{ + return 0; +} + +ssize_t net__write(struct mosquitto *mosq, void *buf, size_t count) +{ + return 0; +} + +int sub__add(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int qos, uint32_t identifier, int options, struct mosquitto__subhier **root) +{ + return 0; +} + +int sub__messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) +{ + return 0; +} + diff --git a/src/persist.h b/src/persist.h index 9e2eedd8..db743424 100644 --- a/src/persist.h +++ b/src/persist.h @@ -131,6 +131,8 @@ struct P_retain{ int persist__read_string_len(FILE *db_fptr, char **str, uint16_t len); int persist__read_string(FILE *db_fptr, char **str); +int persist__chunk_header_read(FILE *db_fptr, int *chunk, int *length); + int persist__chunk_header_read_v234(FILE *db_fptr, int *chunk, int *length); int persist__chunk_cfg_read_v234(FILE *db_fptr, struct PF_cfg *chunk); int persist__chunk_client_read_v234(FILE *db_fptr, struct P_client *chunk, int db_version); diff --git a/src/persist_read.c b/src/persist_read.c index 1ae2f7a6..55a9e84b 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -36,7 +36,7 @@ Contributors: #include "time_mosq.h" #include "util_mosq.h" -static uint32_t db_version; +uint32_t db_version; const unsigned char magic[15] = {0x00, 0xB5, 0x00, 'm','o','s','q','u','i','t','t','o',' ','d','b'};