import sqlite3 import requests import json def load_kvstore(): con = sqlite3.connect("/home/johnp/scsd-configs/git/code/centralauth.db") cur = con.cursor() res=cur.execute("select key,value from kvstore") data=res.fetchall() kvstore={} for key,value in data: kvstore[key]=value con.close() return(kvstore) def save_kvstore(kvstore): con = sqlite3.connect("/home/johnp/scsd-configs/git/code/centralauth.db") cur = con.cursor() for key in kvstore: sql=f"INSERT INTO kvstore (key,value) values('{key}','{kvstore[key]}') ON CONFLICT DO update set value=excluded.value" cur.execute(sql) con.commit() con.close() return(kvstore) def get_new_access_token(kvstore): # step 1 url=f"{kvstore['base_url']}/oauth2/authorize/central/api/login?client_id={kvstore['client_id']}" payload = json.dumps({ 'username':kvstore['central_username'], 'password':kvstore['central_password'] }) headers = { 'Content-Type': 'application/json' } ses = requests.Session() response=ses.post(url,headers=headers,data=payload) print (response) # step 2 payload = json.dumps({'customer_id':kvstore['customer_id']}) url = f"{kvstore['base_url']}/oauth2/authorize/central/api/?client_id={kvstore['client_id']}&response_type=code&scope=all" response=ses.post(url,headers=headers,data=payload) response_data=json.loads(response.text) # step 3 payload = json.dumps({ "grant_type": "authorization_code", 'code':response_data['auth_code'], 'client_id':kvstore['client_id'], 'client_secret':kvstore['client_secret'], }) url = f"{kvstore['base_url']}/oauth2/token" response=ses.post(url,headers=headers,data=payload) response_data=json.loads(response.text) kvstore['refresh_token']=response_data['refresh_token'] kvstore['access_token']=response_data['access_token'] save_kvstore(kvstore) return(kvstore) def refresh_access_token(kvstore): headers = { 'Content-Type': 'application/json' } url=f"{kvstore['base_url']}/oauth2/token?client_id={kvstore['client_id']}&client_secret={kvstore['client_secret']}&grant_type=refresh_token&refresh_token={kvstore['refresh_token']}" ses = requests.Session() response=ses.post(url,headers=headers) response_data=json.loads(response.text) if response.status_code == 400 and ("error_description" in response_data and response_data["error_description"]=="Invalid refresh_token"): return(get_new_access_token(kvstore)) kvstore['refresh_token']=response_data['refresh_token'] kvstore['access_token']=response_data['access_token'] save_kvstore(kvstore) return(kvstore) def get_centralauth(): kvstore=load_kvstore() if 'access_token' not in kvstore: kvstore=get_new_access_token(kvstore) if 'access_token' in kvstore: kvstore=refresh_access_token(kvstore) return({'access_token':kvstore['access_token'],'base_url':kvstore['base_url']}) if __name__ == "__main__": centralauth=get_centralauth() headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {centralauth['access_token']}', } url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AL5K03S/configuration" url = f"{centralauth['base_url']}/configuration/v1/devices/SG3AKMY253/configuration" response=requests.get(url,headers=headers) print(response.text)