blob: fc5c6a76efcc883f359b5257a6c2fd79bbfd62e6 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
from truenas_api_client import Client
import requests
import dotenv
import json
import os
env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
if os.path.exists(env_path):
dotenv.load_dotenv(dotenv_path = env_path)
class TrueNASAPIClient:
def __init__(self, host, api_key):
self.base_url = base_url = "http://%s/api/v2.0" % host
self.headers = {
"Authorization": "Bearer " + api_key
}
def base_get(self, endpoint, payload = None):
if payload is None:
payload = {}
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
req = requests.get(self.base_url + endpoint, headers = self.headers, data = payload)
if not req.status_code == 200:
raise ConnectionError("API call failed (%d): '%s'" % (req.status_code, req.content.decode()))
return req.json()
def get_websocket_connections(self):
return self.base_get("/core/sessions")
def get_replication_naming_schemas(self):
return self.base_get("/replication/list_naming_schemas")
def get_jobs(self):
return self.base_get("/core/get_jobs")
def get_replication_jobs(self):
return [i for i in self.get_jobs() if i["method"] == "replication.run"]
def get_running_replication_jobs(self):
return [i for i in self.get_jobs() if i["method"] == "replication.run" and i["progress"]["percent"] != 100 and not i["state"] == "FAILED"]
def get_running_jobs(self):
return [i for i in self.get_jobs() if i["progress"]["percent"] != 100]
def get_replication_tasks(self):
return self.base_get("/replication")
if __name__ == "__main__":
truenas = TrueNASAPIClient(host = os.environ["SLAVE_HOST"], api_key = os.environ["SLAVE_KEY"])
print(json.dumps(truenas.get_replication_tasks(), indent = 4))
|