add concurrency
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import concurrent.futures
|
||||||
import requests
|
import requests
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import argparse
|
import argparse
|
||||||
@@ -149,13 +150,17 @@ def get_sensor_ids(device_id, sensor_group, debug=False):
|
|||||||
else:
|
else:
|
||||||
raise Exception(f"Failed to fetch sensors for device {device_id}: {response.status_code}")
|
raise Exception(f"Failed to fetch sensors for device {device_id}: {response.status_code}")
|
||||||
|
|
||||||
def get_sensor_value(device_id, sensor_id, sensor_group, debug=False):
|
def fetch_sensor_data(device_id, sensor_id, sensor_group, debug=False):
|
||||||
"""Fetch the current value for a given sensor ID and sensor group."""
|
"""Fetch the current value for a given sensor ID and sensor group."""
|
||||||
response = requests.get(f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}/{sensor_id}', headers=HEADERS, verify=False)
|
response = requests.get(
|
||||||
|
f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}/health/{sensor_group}/{sensor_id}',
|
||||||
|
headers=HEADERS,
|
||||||
|
verify=False
|
||||||
|
)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
sensor_data = response.json()
|
sensor_data = response.json()
|
||||||
if debug:
|
if debug:
|
||||||
print(f"Sensor data for device {device_id}, sensor {sensor_id}: {sensor_data}") # Debugging statement
|
print(f"Sensor data for device {device_id}, sensor {sensor_id}: {sensor_data}")
|
||||||
sensor_value = sensor_data['graphs'][0].get('sensor_current', 0)
|
sensor_value = sensor_data['graphs'][0].get('sensor_current', 0)
|
||||||
sensor_desc = sensor_data['graphs'][0].get('sensor_descr', '')
|
sensor_desc = sensor_data['graphs'][0].get('sensor_descr', '')
|
||||||
|
|
||||||
@@ -167,7 +172,22 @@ def get_sensor_value(device_id, sensor_id, sensor_group, debug=False):
|
|||||||
else:
|
else:
|
||||||
raise Exception(f"Failed to fetch sensor value for sensor {sensor_id}: {response.status_code}")
|
raise Exception(f"Failed to fetch sensor value for sensor {sensor_id}: {response.status_code}")
|
||||||
|
|
||||||
def main(debug=False, update_db=True):
|
def fetch_device_data(device_id, debug=False):
|
||||||
|
"""Fetch data for a given device ID."""
|
||||||
|
response = requests.get(
|
||||||
|
f'http://{LIBRENMS_IP}/api/v0/devices/{device_id}',
|
||||||
|
headers=HEADERS,
|
||||||
|
verify=False
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json().get('devices', [{}])[0]
|
||||||
|
if debug:
|
||||||
|
print(f"Data for device {device_id}: {data}")
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
raise Exception(f"Failed to fetch data for device {device_id}: {response.status_code}")
|
||||||
|
|
||||||
|
def main(debug=False, update_db=True, concurrency_level=5):
|
||||||
try:
|
try:
|
||||||
device_ids = get_device_ids(debug)
|
device_ids = get_device_ids(debug)
|
||||||
total_current = 0
|
total_current = 0
|
||||||
@@ -178,31 +198,45 @@ def main(debug=False, update_db=True):
|
|||||||
|
|
||||||
# Create a SQLite database connection if update_db is True
|
# Create a SQLite database connection if update_db is True
|
||||||
if update_db:
|
if update_db:
|
||||||
db_file = 'power_data_room_n_customer.db'
|
db_file = 'power_data.db'
|
||||||
conn = create_db_connection(db_file)
|
conn = create_db_connection(db_file)
|
||||||
create_tables(conn)
|
create_tables(conn)
|
||||||
else:
|
else:
|
||||||
conn = None
|
conn = None
|
||||||
|
|
||||||
for device_id in device_ids:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency_level) as executor:
|
||||||
room_number = get_device_location(device_id, debug)
|
# Submit tasks to the executor for each device
|
||||||
customer_name = get_customer_name(device_id, debug)
|
future_to_device = {
|
||||||
|
executor.submit(fetch_device_data, device_id, debug): device_id for device_id in device_ids
|
||||||
|
}
|
||||||
|
|
||||||
# Fetch and sum current values
|
# Retrieve results as they complete
|
||||||
current_sensor_ids = get_sensor_ids(device_id, 'device_current', debug)
|
for future in concurrent.futures.as_completed(future_to_device):
|
||||||
for sensor_id in current_sensor_ids:
|
device_id = future_to_device[future]
|
||||||
sensor_value = get_sensor_value(device_id, sensor_id, 'device_current', debug)
|
try:
|
||||||
total_current += sensor_value
|
data = future.result()
|
||||||
room_current[room_number] += sensor_value
|
# Process the data as needed
|
||||||
customer_data[customer_name]['current'] += sensor_value
|
room_number = data.get('location', 'Unknown').split(',')[-1].strip()
|
||||||
|
customer_name = data.get('hostname', 'Unknown').split('-')[0]
|
||||||
|
|
||||||
# Fetch and sum power values
|
# Fetch and sum current values
|
||||||
power_sensor_ids = get_sensor_ids(device_id, 'device_power', debug)
|
current_sensor_ids = get_sensor_ids(device_id, 'device_current', debug)
|
||||||
for sensor_id in power_sensor_ids:
|
for sensor_id in current_sensor_ids:
|
||||||
sensor_value = get_sensor_value(device_id, sensor_id, 'device_power', debug)
|
sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_current', debug)
|
||||||
total_power_watts += sensor_value
|
total_current += sensor_value
|
||||||
room_power[room_number] += sensor_value
|
room_current[room_number] += sensor_value
|
||||||
customer_data[customer_name]['power_watts'] += sensor_value
|
customer_data[customer_name]['current'] += sensor_value
|
||||||
|
|
||||||
|
# Fetch and sum power values
|
||||||
|
power_sensor_ids = get_sensor_ids(device_id, 'device_power', debug)
|
||||||
|
for sensor_id in power_sensor_ids:
|
||||||
|
sensor_value = fetch_sensor_data(device_id, sensor_id, 'device_power', debug)
|
||||||
|
total_power_watts += sensor_value
|
||||||
|
room_power[room_number] += sensor_value
|
||||||
|
customer_data[customer_name]['power_watts'] += sensor_value
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error fetching data for device {device_id}: {e}")
|
||||||
|
|
||||||
total_power_kw = total_power_watts / 1000 # Convert watts to kilowatts
|
total_power_kw = total_power_watts / 1000 # Convert watts to kilowatts
|
||||||
print(f"\n\nTotal Current: {round(total_current, 3)} A")
|
print(f"\n\nTotal Current: {round(total_current, 3)} A")
|
||||||
@@ -241,6 +275,7 @@ if __name__ == '__main__':
|
|||||||
parser.add_argument('--debug', action='store_true', help='Enable debug output')
|
parser.add_argument('--debug', action='store_true', help='Enable debug output')
|
||||||
parser.add_argument('--runs', type=int, default=None, help='Limit the number of runs')
|
parser.add_argument('--runs', type=int, default=None, help='Limit the number of runs')
|
||||||
parser.add_argument('--no-db-update', action='store_true', help='Disable updates to the database')
|
parser.add_argument('--no-db-update', action='store_true', help='Disable updates to the database')
|
||||||
|
parser.add_argument('--concurrency-level', type=int, default=5, help='Set the concurrency level for API requests')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
runs = 0
|
runs = 0
|
||||||
@@ -248,7 +283,8 @@ if __name__ == '__main__':
|
|||||||
if args.runs is not None and runs >= args.runs:
|
if args.runs is not None and runs >= args.runs:
|
||||||
break
|
break
|
||||||
|
|
||||||
main(debug=args.debug, update_db=not args.no_db_update)
|
main(debug=args.debug, update_db=not args.no_db_update, concurrency_level=args.concurrency_level)
|
||||||
runs += 1
|
runs += 1
|
||||||
if args.runs is None:
|
if args.runs is None:
|
||||||
time.sleep(240) # Wait for 5 minutes before the next run
|
time.sleep(240) # Wait for 5 minutes before the next run
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user