import concurrent.futures import requests from collections import defaultdict import argparse import sqlite3 from datetime import datetime, timedelta import time # Configuration API_KEY = '' LIBRENMS_IP = '' HEADERS = {'X-Auth-Token': API_KEY} def create_db_connection(db_file): """Create a database connection to a SQLite database.""" conn = None try: conn = sqlite3.connect(db_file) return conn except sqlite3.Error as e: print(e) return conn def create_tables(conn): """Create tables for storing the data.""" try: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS building_totals ( id INTEGER PRIMARY KEY AUTOINCREMENT, total_current REAL, total_power REAL, timestamp TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS room_breakdown ( id INTEGER PRIMARY KEY AUTOINCREMENT, room_number TEXT, current REAL, power REAL, timestamp TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS customer_breakdown ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_name TEXT, current REAL, power REAL, timestamp TEXT ) ''') conn.commit() except sqlite3.Error as e: print(e) def round_to_nearest_5_minutes(dt): """Round a datetime object to the nearest 5-minute interval.""" # Calculate the number of seconds since the last 5-minute interval seconds_since_last_interval = dt.minute % 5 * 60 + dt.second + dt.microsecond / 1e6 # Round to the nearest 5-minute interval if seconds_since_last_interval < 150: # 150 seconds = 2.5 minutes rounded_dt = dt - timedelta(seconds=seconds_since_last_interval) else: rounded_dt = dt + timedelta(seconds=300 - seconds_since_last_interval) # 300 seconds = 5 minutes # Set seconds and microseconds to zero rounded_dt = rounded_dt.replace(second=0, microsecond=0) return rounded_dt def insert_building_total(conn, total_current, total_power): """Insert building total data into the database.""" try: cursor = conn.cursor() rounded_timestamp = round_to_nearest_5_minutes(datetime.now()) cursor.execute(''' INSERT INTO building_totals (total_current, total_power, timestamp) VALUES (?, ?, ?) ''', (round(total_current, 3), round(total_power, 3), rounded_timestamp.isoformat())) conn.commit() except sqlite3.Error as e: print(e) def insert_room_breakdown(conn, room_number, current, power): """Insert room breakdown data into the database.""" try: cursor = conn.cursor() rounded_timestamp = round_to_nearest_5_minutes(datetime.now()) cursor.execute(''' INSERT INTO room_breakdown (room_number, current, power, timestamp) VALUES (?, ?, ?, ?) ''', (room_number, round(current, 3), round(power, 3), rounded_timestamp.isoformat())) conn.commit() except sqlite3.Error as e: print(e) def insert_customer_breakdown(conn, customer_name, current, power): """Insert customer breakdown data into the database.""" try: cursor = conn.cursor() rounded_timestamp = round_to_nearest_5_minutes(datetime.now()) cursor.execute(''' INSERT INTO customer_breakdown (customer_name, current, power, timestamp) VALUES (?, ?, ?, ?) ''', (customer_name, round(current, 3), round(power, 3), rounded_timestamp.isoformat())) conn.commit() except sqlite3.Error as e: print(e) def get_device_ids(debug=False): """Fetch all device IDs from the LibreNMS API.""" response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices', headers=HEADERS, verify=False) if response.status_code == 200: devices = response.json().get('devices', []) if debug: print(f"Devices: {devices}") # Debugging statement power_devices = [device['device_id'] for device in devices if device.get('type') == 'power'] return power_devices else: raise Exception(f"Failed to fetch devices: {response.status_code}") def get_device_location(device_id, debug=False): """Fetch the location for a given device ID.""" response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}', headers=HEADERS, verify=False) if response.status_code == 200: location = response.json().get('devices', [{}])[0].get('location', '') if debug: print(f"Location for device {device_id}: {location}") # Debugging statement # Extract room number from location room_number = location.split(',')[-1].strip() if ',' in location else 'Unknown' return room_number else: raise Exception(f"Failed to fetch location for device {device_id}: {response.status_code}") def get_customer_name(device_id, debug=False): """Fetch the customer name for a given device ID.""" response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}', headers=HEADERS, verify=False) if response.status_code == 200: hostname = response.json().get('devices', [{}])[0].get('hostname', '') if debug: print(f"Hostname for device {device_id}: {hostname}") customer_name = hostname.split('-')[0] if '-' in hostname else 'Unknown' return customer_name else: raise Exception(f"Failed to fetch hostname for device {device_id}: {response.status_code}") def get_sensor_ids(device_id, sensor_group, debug=False): """Fetch sensor IDs for a given device ID and sensor group.""" response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}', headers=HEADERS, verify=False) if response.status_code == 200: graphs = response.json().get('graphs', []) if debug: print(f"Graphs for device {device_id}: {graphs}") # Debugging statement # Filter sensors based on descriptions if sensor_group == 'device_current': relevant_sensors = [ sensor['sensor_id'] for sensor in graphs if sensor['desc'] in ["Input Phase 1.1", "Input Phase 1.2", "Input Phase 1.3", "Phase 1"] ] elif sensor_group == 'device_power': relevant_sensors = [ sensor['sensor_id'] for sensor in graphs if sensor['desc'] in ["Active power #1", "Total power", "Power, Total"] ] return relevant_sensors else: raise Exception(f"Failed to fetch sensors for device {device_id}: {response.status_code}") def fetch_sensor_data(device_id, sensor_id, sensor_group, debug=False): """Fetch the current value for a given sensor ID and sensor group.""" response = requests.get( f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}/{sensor_id}', headers=HEADERS, verify=False ) if response.status_code == 200: sensor_data = response.json() if debug: print(f"Sensor data for device {device_id}, sensor {sensor_id}: {sensor_data}") sensor_value = sensor_data['graphs'][0].get('sensor_current', 0) sensor_desc = sensor_data['graphs'][0].get('sensor_descr', '') # Divide by 100 if the sensor is from an nLogic PDU and is a current sensor if sensor_group == 'device_current' and sensor_desc in ["Input Phase 1.1", "Input Phase 1.2", "Input Phase 1.3"]: sensor_value /= 100 return sensor_value else: raise Exception(f"Failed to fetch sensor value for sensor {sensor_id}: {response.status_code}") def fetch_device_data(device_id, debug=False): """Fetch data for a given device ID.""" response = requests.get( f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}', headers=HEADERS, verify=False ) if response.status_code == 200: data = response.json().get('devices', [{}])[0] if debug: print(f"Data for device {device_id}: {data}") return data else: raise Exception(f"Failed to fetch data for device {device_id}: {response.status_code}") def main(debug=False, update_db=True, concurrency_level=5): try: device_ids = get_device_ids(debug) total_current = 0 total_power_watts = 0 room_current = defaultdict(float) room_power = defaultdict(float) customer_data = defaultdict(lambda: {'current': 0, 'power_watts': 0}) # Create a SQLite database connection if update_db is True if update_db: db_file = 'power_data.db' conn = create_db_connection(db_file) create_tables(conn) else: conn = None with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency_level) as executor: # Submit tasks to the executor for each device future_to_device = { executor.submit(fetch_device_data, device_id, debug): device_id for device_id in device_ids } # Retrieve results as they complete for future in concurrent.futures.as_completed(future_to_device): device_id = future_to_device[future] try: data = future.result() # Process the data as needed room_number = data.get('location', 'Unknown').split(',')[-1].strip() customer_name = data.get('hostname', 'Unknown').split('-')[0] # Fetch and sum current values current_sensor_ids = get_sensor_ids(device_id, 'device_current', debug) for sensor_id in current_sensor_ids: sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_current', debug) total_current += sensor_value room_current[room_number] += sensor_value customer_data[customer_name]['current'] += sensor_value # Fetch and sum power values power_sensor_ids = get_sensor_ids(device_id, 'device_power', debug) for sensor_id in power_sensor_ids: sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_power', debug) total_power_watts += sensor_value room_power[room_number] += sensor_value customer_data[customer_name]['power_watts'] += sensor_value except Exception as e: print(f"Error fetching data for device {device_id}: {e}") total_power_kw = total_power_watts / 1000 # Convert watts to kilowatts print(f"\n\nTotal Current: {round(total_current, 3)} A") print(f"Total Power: {round(total_power_kw, 3)} kW") # Insert building total data into the database if update_db is True if update_db: insert_building_total(conn, total_current, total_power_kw) print("\n\nBreakdown by Room:\n") for room, current in room_current.items(): power_kw = room_power[room] / 1000 # Convert watts to kilowatts print(f"Room {room}: Current = {round(current, 3)} A, Power = {round(power_kw, 3)} kW") # Insert room breakdown data into the database if update_db is True if update_db: insert_room_breakdown(conn, room, current, power_kw) print("\n\nCustomer Breakdown:\n") for customer_name, data in customer_data.items(): power_kw = data['power_watts'] / 1000 # Convert watts to kilowatts print(f"Customer {customer_name} - Current: {round(data['current'], 3)} A, Power: {round(power_kw, 3)} kW") # Insert customer breakdown data into the database if update_db is True if update_db: insert_customer_breakdown(conn, customer_name, data['current'], power_kw) # Close the database connection if it was opened if update_db: conn.close() except Exception as e: print(str(e)) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Fetch and display power and current data from LibreNMS.') parser.add_argument('--debug', action='store_true', help='Enable debug output') parser.add_argument('--runs', type=int, default=None, help='Limit the number of runs') parser.add_argument('--no-db-update', action='store_true', help='Disable updates to the database') parser.add_argument('--concurrency-level', type=int, default=5, help='Set the concurrency level for API requests') args = parser.parse_args() runs = 0 while True: if args.runs is not None and runs >= args.runs: break main(debug=args.debug, update_db=not args.no_db_update, concurrency_level=args.concurrency_level) runs += 1 if args.runs is None: time.sleep(250) # Wait for 5 minutes before the next run