-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest_cycle.py
More file actions
112 lines (93 loc) · 4.33 KB
/
request_cycle.py
File metadata and controls
112 lines (93 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import json
import logging
import asyncio
import aiohttp
from aliyun import aliyun_add_authentication, aliyun_beautify_response
from huawei import HuaweiIAMTokenManager, huawei_beautify_response
async def fetch(session, url, headers=None):
async with session.get(url, headers=headers) as response:
return await response.text()
async def process_device(session, device, secret, huawei_token_manager, data_dict):
device_id = device['id']
device_type = device['type']
original_url = device['url']
if device_type == 'aliyun':
authenticated_url = aliyun_add_authentication(original_url, secret)
try:
response_text = await fetch(session, authenticated_url)
except Exception as e:
logging.error(f"Error: {e}")
return False
modified_data_json = aliyun_beautify_response(response_text)
elif device_type == 'huawei':
token = huawei_token_manager.get_iam_token(
username=huawei_token_manager.username,
password=huawei_token_manager.password,
area=huawei_token_manager.area,
domain_name=huawei_token_manager.domain_name
)
headers = {"X-Auth-Token": token}
try:
response_text = await fetch(session, original_url, headers=headers)
except Exception as e:
logging.error(f"Error: {e}")
return False
modified_data_json = huawei_beautify_response(response_text)
else:
logging.error(f"Unknown device type: {device_type}")
return False
if modified_data_json == "[]":
logging.warning(f"No data returned from {device_type}_beautify_response, skipping this update.")
return False
data_dict[device_id] = json.loads(modified_data_json)[0]
return True
async def main():
with open('settings.json', 'r', encoding='utf-8') as file:
config = json.load(file)
device_list_config = next(item for item in config if item["setting"] == "device_list")
request_cycle_config = next(item for item in config if item["setting"] == "request_cycle")
secret = request_cycle_config["access-key-secret"]
huawei_token_manager = HuaweiIAMTokenManager()
huawei_token_manager.username = request_cycle_config["huawei_iam_username"]
huawei_token_manager.password = request_cycle_config["huawei_iam_password"]
huawei_token_manager.domain_name = request_cycle_config["huawei_iam_domain"]
huawei_token_manager.area = request_cycle_config["huawei_iam_area"]
format_values = request_cycle_config["value"]
while True:
error_state = False
# 读取设备列表
device_list = device_list_config.get('value', [])
data_dict = {}
async with aiohttp.ClientSession() as session:
tasks = [process_device(session, device, secret, huawei_token_manager, data_dict) for device in device_list]
results = await asyncio.gather(*tasks)
error_state = any(not result for result in results)
if error_state:
continue
# 读取output.json并解析数据
output_file_path = os.path.expanduser('output.json')
if os.path.exists(output_file_path):
with open(output_file_path, 'r', encoding='utf-8') as file:
data_dict.update(json.load(file))
# 写入output.json
with open(output_file_path, 'w', encoding='utf-8') as file:
json.dump(data_dict, file, ensure_ascii=False, indent=4)
# 准备最终数据
final_data = []
for row in format_values:
new_row = {}
for key, cell in row.items():
if '{' in cell and '}' in cell: # 检测到需要替换的字段
id, key = cell.strip('{}').split(':')
value = data_dict.get(id, {}).get(key, 'N/A') # 如果找不到对应的值,使用'N/A'
new_row[key] = value
else:
new_row[key] = cell
final_data.append(new_row)
# 写入final.json
final_file_path = os.path.expanduser('final.json')
with open(final_file_path, 'w', encoding='utf-8') as outfile:
json.dump(final_data, outfile, ensure_ascii=False, indent=4)
if __name__ == "__main__":
asyncio.run(main())