112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
import sqlite3
|
|
import requests
|
|
import json
|
|
|
|
|
|
def load_kvstore():
|
|
con = sqlite3.connect("centralauth.db")
|
|
cur = con.cursor()
|
|
res=cur.execute("select key,value from kvstore")
|
|
data=res.fetchall()
|
|
kvstore={}
|
|
for key,value in data:
|
|
kvstore[key]=value
|
|
con.close()
|
|
return(kvstore)
|
|
|
|
|
|
def save_kvstore(kvstore):
|
|
con = sqlite3.connect("centralauth.db")
|
|
cur = con.cursor()
|
|
for key in kvstore:
|
|
sql=f"INSERT INTO kvstore (key,value) values('{key}','{kvstore[key]}') ON CONFLICT DO update set value=excluded.value"
|
|
cur.execute(sql)
|
|
con.commit()
|
|
con.close()
|
|
return(kvstore)
|
|
|
|
def get_new_access_token(kvstore):
|
|
# step 1
|
|
url=f"{kvstore['base_url']}/oauth2/authorize/central/api/login?client_id={kvstore['client_id']}"
|
|
payload = json.dumps({
|
|
'username':kvstore['central_username'],
|
|
'password':kvstore['central_password']
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
ses = requests.Session()
|
|
response=ses.post(url,headers=headers,data=payload)
|
|
print (response)
|
|
|
|
# step 2
|
|
payload = json.dumps({'customer_id':kvstore['customer_id']})
|
|
|
|
url = f"{kvstore['base_url']}/oauth2/authorize/central/api/?client_id={kvstore['client_id']}&response_type=code&scope=all"
|
|
|
|
response=ses.post(url,headers=headers,data=payload)
|
|
|
|
response_data=json.loads(response.text)
|
|
|
|
# step 3
|
|
|
|
payload = json.dumps({
|
|
"grant_type": "authorization_code",
|
|
'code':response_data['auth_code'],
|
|
'client_id':kvstore['client_id'],
|
|
'client_secret':kvstore['client_secret'],
|
|
})
|
|
|
|
url = f"{kvstore['base_url']}/oauth2/token"
|
|
|
|
response=ses.post(url,headers=headers,data=payload)
|
|
|
|
response_data=json.loads(response.text)
|
|
|
|
kvstore['refresh_token']=response_data['refresh_token']
|
|
kvstore['access_token']=response_data['access_token']
|
|
|
|
save_kvstore(kvstore)
|
|
return(kvstore)
|
|
|
|
def refresh_access_token(kvstore):
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
url=f"{kvstore['base_url']}/oauth2/token?client_id={kvstore['client_id']}&client_secret={kvstore['client_secret']}&grant_type=refresh_token&refresh_token={kvstore['refresh_token']}"
|
|
ses = requests.Session()
|
|
response=ses.post(url,headers=headers)
|
|
response_data=json.loads(response.text)
|
|
kvstore['refresh_token']=response_data['refresh_token']
|
|
kvstore['access_token']=response_data['access_token']
|
|
save_kvstore(kvstore)
|
|
return(kvstore)
|
|
|
|
def get_centralauth():
|
|
kvstore=load_kvstore()
|
|
|
|
if 'access_token' not in kvstore:
|
|
kvstore=get_new_access_token(kvstore)
|
|
|
|
if 'access_token' in kvstore:
|
|
kvstore=refresh_access_token(kvstore)
|
|
return({'access_token':kvstore['access_token'],'base_url':kvstore['base_url']})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
centralauth=get_centralauth()
|
|
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'Authorization': f'Bearer {centralauth['access_token']}',
|
|
}
|
|
|
|
url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AL5K03S/configuration"
|
|
|
|
url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AKMY253/configuration"
|
|
|
|
response=requests.get(url,headers=headers)
|
|
|
|
print(response.text) |