Other GitHub Projects
Full project available on GitHub: https://github.com/hngjesse
This project addresses common challenges encountered in data logging using the MQTT communication protocol. MQTT was introduced in 1999 by Andy Stanford-Clark and Arlen Nipper for monitoring oil pipelines within SCADA industrial control systems. The goal was to have a protocol that is bandwidth-efficient, lightweight and uses little battery power, because the devices were connected via satellite link, which was extremely expensive at that time.
One classification of MQTT communication is the Publish-subscribe pattern, where devices (clients) publish messages to a central broker, which then distributes these messages to other clients that have subscribed to specific topics. This decouples the message sender from the receiver, allowing for greater flexibility and scalability in data communication.
Any device capable of connecting to a network can act as an MQTT client, making the protocol highly suitable for Internet of Things (IoT) applications. Microcontrollers such as the ESP8266 or ESP32, single-board computers like the Raspberry Pi, and various types of sensors can all publish data to an MQTT broker or subscribe to receive messages from other devices.
Moreover, MQTT broker can listen to multiple topics simultaneously, and multiple clients can subscribe to the same topic or different topics based on their data requirements. This makes MQTT a versatile choice for applications ranging from home automation to industrial monitoring systems.
MQTT has several advantages:
The main working directory of this Python project is Mqtt_loggers. It contains three key subdirectories:
configsdata_storageutils
The core script, mqtt_logger.py, is located in the Mqtt_loggers directory.
This script provides the foundational structure for the entire data logging system. It defines the main
workflow and executes essential functions sequentially or in a continuous loop.
Users are not advised to modify this file unless they fully understand its internal logic.
The configs subdirectory contains JSON configuration files that define communication
and logging parameters for each Mqtt device.
{
"mqtt": {
"host": "192.168.1.14",
"port": 1883,
"topics": ["sensors/#"],
"client_id": "esp32_logger",
"device_name": "water_cooled_pv"
},
"logging": {
"base_folder": "/mnt/data_storage/Mqtt_loggers/data_storage/water_cooled_pv",
"log_retention_days": 7,
"file_suffix": "water_cooled_pv",
"time_step": 3,
"header": [
"timestamp",
"temp_1",
"temp_2",
"temp_3",
"temp_4",
"temp_5",
"temp_6",
"temp_7",
"temp_8",
"temp_9",
"temp_10",
"water_level",
"irradiance",
"ssr_status"
]
}
}
MQTT Section:
host: Local IP address of MQTT broker.port: Comminication endpoint of network service, usually 1883 for MQTT-based communication.topics: List of MQTT topics to subscribe to.client_id: Unique identifier for the MQTT client.device_name: Name of the device for logging purposes.Logging Section:
base_folder: Directory for log files.log_retention_days: How long logs are retained.file_suffix: Label for filenames.header: Defines CSV structure.time_step: Logging interval in seconds.
The utils directory includes:
__init__.pycommon_utils.pyvalidate_config.pydevice_specific_func.py
common_utils.py provides helper functions used by the main script.
validate_config.py verifies JSON files for formatting or logical errors.
The device_specific_func.py file is where users can define custom functions for their Modbus devices.
This modular approach allows users to log data from different devices by simply writing their own device-specific function
without modifying the main logging structure.
import logging
import time
logger = logging.getLogger(__name__)
def water_cooled_pv(data_buffer, csv_file, header):
"""
Write buffered MQTT data into the daily CSV file.
Args:
data_buffer (dict): Latest key-value pairs from MQTT messages.
csv_file (str): Path to the CSV file for the current day.
header (list): Column headers (first is usually 'timestamp').
"""
try:
if not data_buffer:
logger.warning("[water_cooled_pv] No data to log — buffer empty.")
return
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
row = [timestamp]
# Fill in columns in the same order as the header
for h in header[1:]:
row.append(data_buffer.get(h, ""))
with open(csv_file, "a") as f:
f.write(",".join(map(str, row)) + "\n")
logger.info(f"[water_cooled_pv] Logged {len(header)-1} values to CSV: {csv_file}")
except Exception as e:
logger.error(f"[water_cooled_pv] Failed to write CSV: {e}")
Full project available on GitHub: https://github.com/hngjesse
📧 Email: hngjesse@gmail.com
🔗 GitHub: hngjesse