From 1f56afccd86e6c874c481cebdf3fe607b78dd8ed Mon Sep 17 00:00:00 2001 From: Radek Date: Thu, 22 May 2025 09:58:43 +0100 Subject: [PATCH] add concurrency --- get_power_room_n_customer.py | 82 ++++++++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/get_power_room_n_customer.py b/get_power_room_n_customer.py index ade1fee..841efdd 100644 --- a/get_power_room_n_customer.py +++ b/get_power_room_n_customer.py @@ -1,3 +1,4 @@ +import concurrent.futures import requests from collections import defaultdict import argparse @@ -149,13 +150,17 @@ def get_sensor_ids(device_id, sensor_group, debug=False): else: raise Exception(f"Failed to fetch sensors for device {device_id}: {response.status_code}") -def get_sensor_value(device_id, sensor_id, sensor_group, debug=False): +def fetch_sensor_data(device_id, sensor_id, sensor_group, debug=False): """Fetch the current value for a given sensor ID and sensor group.""" - response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}/{sensor_id}', headers=HEADERS, verify=False) + response = requests.get( + f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}/{sensor_id}', + headers=HEADERS, + verify=False + ) if response.status_code == 200: sensor_data = response.json() if debug: - print(f"Sensor data for device {device_id}, sensor {sensor_id}: {sensor_data}") # Debugging statement + print(f"Sensor data for device {device_id}, sensor {sensor_id}: {sensor_data}") sensor_value = sensor_data['graphs'][0].get('sensor_current', 0) sensor_desc = sensor_data['graphs'][0].get('sensor_descr', '') @@ -167,7 +172,22 @@ def get_sensor_value(device_id, sensor_id, sensor_group, debug=False): else: raise Exception(f"Failed to fetch sensor value for sensor {sensor_id}: {response.status_code}") -def main(debug=False, update_db=True): +def fetch_device_data(device_id, debug=False): + """Fetch data for a given device ID.""" + response = requests.get( + f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}', + headers=HEADERS, + verify=False + ) + if response.status_code == 200: + data = response.json().get('devices', [{}])[0] + if debug: + print(f"Data for device {device_id}: {data}") + return data + else: + raise Exception(f"Failed to fetch data for device {device_id}: {response.status_code}") + +def main(debug=False, update_db=True, concurrency_level=5): try: device_ids = get_device_ids(debug) total_current = 0 @@ -178,31 +198,45 @@ def main(debug=False, update_db=True): # Create a SQLite database connection if update_db is True if update_db: - db_file = 'power_data_room_n_customer.db' + db_file = 'power_data.db' conn = create_db_connection(db_file) create_tables(conn) else: conn = None - for device_id in device_ids: - room_number = get_device_location(device_id, debug) - customer_name = get_customer_name(device_id, debug) + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency_level) as executor: + # Submit tasks to the executor for each device + future_to_device = { + executor.submit(fetch_device_data, device_id, debug): device_id for device_id in device_ids + } - # Fetch and sum current values - current_sensor_ids = get_sensor_ids(device_id, 'device_current', debug) - for sensor_id in current_sensor_ids: - sensor_value = get_sensor_value(device_id, sensor_id, 'device_current', debug) - total_current += sensor_value - room_current[room_number] += sensor_value - customer_data[customer_name]['current'] += sensor_value + # Retrieve results as they complete + for future in concurrent.futures.as_completed(future_to_device): + device_id = future_to_device[future] + try: + data = future.result() + # Process the data as needed + room_number = data.get('location', 'Unknown').split(',')[-1].strip() + customer_name = data.get('hostname', 'Unknown').split('-')[0] - # Fetch and sum power values - power_sensor_ids = get_sensor_ids(device_id, 'device_power', debug) - for sensor_id in power_sensor_ids: - sensor_value = get_sensor_value(device_id, sensor_id, 'device_power', debug) - total_power_watts += sensor_value - room_power[room_number] += sensor_value - customer_data[customer_name]['power_watts'] += sensor_value + # Fetch and sum current values + current_sensor_ids = get_sensor_ids(device_id, 'device_current', debug) + for sensor_id in current_sensor_ids: + sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_current', debug) + total_current += sensor_value + room_current[room_number] += sensor_value + customer_data[customer_name]['current'] += sensor_value + + # Fetch and sum power values + power_sensor_ids = get_sensor_ids(device_id, 'device_power', debug) + for sensor_id in power_sensor_ids: + sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_power', debug) + total_power_watts += sensor_value + room_power[room_number] += sensor_value + customer_data[customer_name]['power_watts'] += sensor_value + + except Exception as e: + print(f"Error fetching data for device {device_id}: {e}") total_power_kw = total_power_watts / 1000 # Convert watts to kilowatts print(f"\n\nTotal Current: {round(total_current, 3)} A") @@ -241,6 +275,7 @@ if __name__ == '__main__': parser.add_argument('--debug', action='store_true', help='Enable debug output') parser.add_argument('--runs', type=int, default=None, help='Limit the number of runs') parser.add_argument('--no-db-update', action='store_true', help='Disable updates to the database') + parser.add_argument('--concurrency-level', type=int, default=5, help='Set the concurrency level for API requests') args = parser.parse_args() runs = 0 @@ -248,7 +283,8 @@ if __name__ == '__main__': if args.runs is not None and runs >= args.runs: break - main(debug=args.debug, update_db=not args.no_db_update) + main(debug=args.debug, update_db=not args.no_db_update, concurrency_level=args.concurrency_level) runs += 1 if args.runs is None: time.sleep(240) # Wait for 5 minutes before the next run +