scsd-configs/code/centralauth.py

114 lines
3.4 KiB
Python

import sqlite3
import requests
import json
def load_kvstore():
con = sqlite3.connect("/home/johnp/scsd-configs/git/code/centralauth.db")
cur = con.cursor()
res=cur.execute("select key,value from kvstore")
data=res.fetchall()
kvstore={}
for key,value in data:
kvstore[key]=value
con.close()
return(kvstore)
def save_kvstore(kvstore):
con = sqlite3.connect("/home/johnp/scsd-configs/git/code/centralauth.db")
cur = con.cursor()
for key in kvstore:
sql=f"INSERT INTO kvstore (key,value) values('{key}','{kvstore[key]}') ON CONFLICT DO update set value=excluded.value"
cur.execute(sql)
con.commit()
con.close()
return(kvstore)
def get_new_access_token(kvstore):
# step 1
url=f"{kvstore['base_url']}/oauth2/authorize/central/api/login?client_id={kvstore['client_id']}"
payload = json.dumps({
'username':kvstore['central_username'],
'password':kvstore['central_password']
})
headers = {
'Content-Type': 'application/json'
}
ses = requests.Session()
response=ses.post(url,headers=headers,data=payload)
print (response)
# step 2
payload = json.dumps({'customer_id':kvstore['customer_id']})
url = f"{kvstore['base_url']}/oauth2/authorize/central/api/?client_id={kvstore['client_id']}&response_type=code&scope=all"
response=ses.post(url,headers=headers,data=payload)
response_data=json.loads(response.text)
# step 3
payload = json.dumps({
"grant_type": "authorization_code",
'code':response_data['auth_code'],
'client_id':kvstore['client_id'],
'client_secret':kvstore['client_secret'],
})
url = f"{kvstore['base_url']}/oauth2/token"
response=ses.post(url,headers=headers,data=payload)
response_data=json.loads(response.text)
kvstore['refresh_token']=response_data['refresh_token']
kvstore['access_token']=response_data['access_token']
save_kvstore(kvstore)
return(kvstore)
def refresh_access_token(kvstore):
headers = {
'Content-Type': 'application/json'
}
url=f"{kvstore['base_url']}/oauth2/token?client_id={kvstore['client_id']}&client_secret={kvstore['client_secret']}&grant_type=refresh_token&refresh_token={kvstore['refresh_token']}"
ses = requests.Session()
response=ses.post(url,headers=headers)
response_data=json.loads(response.text)
if response.status_code == 400 and ("error_description" in response_data and response_data["error_description"]=="Invalid refresh_token"):
return(get_new_access_token(kvstore))
kvstore['refresh_token']=response_data['refresh_token']
kvstore['access_token']=response_data['access_token']
save_kvstore(kvstore)
return(kvstore)
def get_centralauth():
kvstore=load_kvstore()
if 'access_token' not in kvstore:
kvstore=get_new_access_token(kvstore)
if 'access_token' in kvstore:
kvstore=refresh_access_token(kvstore)
return({'access_token':kvstore['access_token'],'base_url':kvstore['base_url']})
if __name__ == "__main__":
centralauth=get_centralauth()
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {centralauth['access_token']}',
}
url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AL5K03S/configuration"
url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AKMY253/configuration"
response=requests.get(url,headers=headers)
print(response.text)