Mqtt Logger Documentation


Introduction

This project addresses common challenges encountered in data logging using the MQTT communication protocol. MQTT was introduced in 1999 by Andy Stanford-Clark and Arlen Nipper for monitoring oil pipelines within SCADA industrial control systems. The goal was to have a protocol that is bandwidth-efficient, lightweight and uses little battery power, because the devices were connected via satellite link, which was extremely expensive at that time.

One classification of MQTT communication is the Publish-subscribe pattern, where devices (clients) publish messages to a central broker, which then distributes these messages to other clients that have subscribed to specific topics. This decouples the message sender from the receiver, allowing for greater flexibility and scalability in data communication.

Any device capable of connecting to a network can act as an MQTT client, making the protocol highly suitable for Internet of Things (IoT) applications. Microcontrollers such as the ESP8266 or ESP32, single-board computers like the Raspberry Pi, and various types of sensors can all publish data to an MQTT broker or subscribe to receive messages from other devices.

Moreover, MQTT broker can listen to multiple topics simultaneously, and multiple clients can subscribe to the same topic or different topics based on their data requirements. This makes MQTT a versatile choice for applications ranging from home automation to industrial monitoring systems.

MQTT has several advantages:


Coding and File Structure

The main working directory of this Python project is Mqtt_loggers. It contains three key subdirectories:

The core script, mqtt_logger.py, is located in the Mqtt_loggers directory. This script provides the foundational structure for the entire data logging system. It defines the main workflow and executes essential functions sequentially or in a continuous loop. Users are not advised to modify this file unless they fully understand its internal logic.


Configuration JSON File

The configs subdirectory contains JSON configuration files that define communication and logging parameters for each Mqtt device.

Example

{
  "mqtt": {
    "host": "192.168.1.14",
    "port": 1883,
    "topics": ["sensors/#"],
    "client_id": "esp32_logger",
    "device_name": "water_cooled_pv"
  },
  "logging": {
    "base_folder": "/mnt/data_storage/Mqtt_loggers/data_storage/water_cooled_pv",
    "log_retention_days": 7,
    "file_suffix": "water_cooled_pv",
    "time_step": 3,
    "header": [
      "timestamp",
      "temp_1",
      "temp_2",
      "temp_3",
      "temp_4",
      "temp_5",
      "temp_6",
      "temp_7",
      "temp_8",
      "temp_9",
      "temp_10",
      "water_level",
      "irradiance",
      "ssr_status"
    ]
  }
}

Parameter Explanation

MQTT Section:

Logging Section:


Utilities Folder

The utils directory includes:

common_utils.py provides helper functions used by the main script. validate_config.py verifies JSON files for formatting or logical errors. The device_specific_func.py file is where users can define custom functions for their Modbus devices.
This modular approach allows users to log data from different devices by simply writing their own device-specific function without modifying the main logging structure.

Example Device-Specific Function


import logging
import time

logger = logging.getLogger(__name__)

def water_cooled_pv(data_buffer, csv_file, header):
    """
    Write buffered MQTT data into the daily CSV file.

    Args:
        data_buffer (dict): Latest key-value pairs from MQTT messages.
        csv_file (str): Path to the CSV file for the current day.
        header (list): Column headers (first is usually 'timestamp').
    """
    try:
        if not data_buffer:
            logger.warning("[water_cooled_pv] No data to log — buffer empty.")
            return

        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        row = [timestamp]

        # Fill in columns in the same order as the header
        for h in header[1:]:
            row.append(data_buffer.get(h, ""))

        with open(csv_file, "a") as f:
            f.write(",".join(map(str, row)) + "\n")

        logger.info(f"[water_cooled_pv] Logged {len(header)-1} values to CSV: {csv_file}")

    except Exception as e:
        logger.error(f"[water_cooled_pv] Failed to write CSV: {e}")

Other GitHub Projects

Full project available on GitHub: https://github.com/hngjesse


Contact

📧 Email: hngjesse@gmail.com
🔗 GitHub: hngjesse