Azure: Script for granting the LHO Service Principal the required permissions in a target workspace
In this document we outline a python script which uses the Laksehouse Optimizer API to browse through all the published Azure subscriptions, Databricks workspaces, jobs, clusters and DLT pipelines and grant the LHM SP permissions for each of these assets:
attaching to clusters
viewing jobs
viewing DLT pipelines
The script requires another Service Principal with the admin role in each target workspace, called Admin Service Principal in this document.
To run it you need a python3 env with requests
installed.
To install the requests
module run the following:
pip3 install requests
Script file:
import requests
import os
import logging
import time
import re
# set up logging
log_level = os.getenv("LOG_LEVEL", "WARN")
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# variables
# Service Principal that grants
dbxSPInfo = {
'clientId': os.getenv('DBX_CLIENTID'),
'clientSecret': os.getenv('DBX_CLIENT_SECRET'),
'tenantId': os.getenv('DBX_TENANTID')
}
# Service principal that will be granted
lhmSPInfo = {
'clientId': os.getenv('LHM_CLIENTID'),
'clientSecret': os.getenv('LHM_CLIENT_SECRET'),
'tenantId': os.getenv('LHM_TENANTID')
}
userInfo = os.getenv('USER_NAME')
lhmURL = os.getenv('LHM_URL')
subscription_id = os.getenv("SUBSCRIPTION_ID")
workspace_host = os.getenv("WORKSPACE_HOST")
# login azure and obtain token for dbx calls
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
azure_databricks_scope = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default'
# SP used to grant credentials. It should be admin in dbx
dbx_admin_sp_data = {
'client_id': dbxSPInfo['clientId'],
'grant_type': 'client_credentials',
'scope': azure_databricks_scope,
'client_secret': dbxSPInfo['clientSecret']
}
databricks_api_throttle_in_ms = os.getenv('DBX_API_THROTTLE_TIMEOUT_IN_MS', 50)
timeout = databricks_api_throttle_in_ms / 1000
excluded = os.getenv('EXCLUDE', '').lower().split()
# Extract the Workspace ID from the Databricks Host URL using regular expressions
try:
match = re.search(r"adb-(\d+)\..*", workspace_host)
if match:
workspace_id = match.group(1)
else:
workspace_id = None
except Exception:
workspace_id = None
# Token for ServicePrincipal with admin access ( used in order to grant permissions)
dbx_admin_token_response = requests.post(
f"https://login.microsoftonline.com/{dbxSPInfo.get('tenantId')}/oauth2/v2.0/token",
headers=headers, data=dbx_admin_sp_data)
# This token will be used in "Grant" api requests
dbx_admin_sp_api_token = dbx_admin_token_response.json().get('access_token')
class Subscription:
def __init__(self, subscription_id: str, display_name: str):
self.subscription_id = subscription_id
self.display_name = display_name
def __str__(self):
return f"{self.display_name}"
class DatabricksWorkspace:
def __init__(self, subscription: Subscription, workspace_id: str, display_name: str, workspace_host: str,
is_premium: bool):
self.subscription_id = subscription.subscription_id
self.subscription_name = subscription.display_name
self.workspace_id = workspace_id
self.display_name = display_name
self.workspace_host = workspace_host
self.is_premium = is_premium
def __str__(self):
return f"{self.display_name}"
def process_error_response(api_response):
response_text = api_response.headers.get('x-databricks-reason-phrase', api_response.text)
if response_text is not None and len(api_response.text) > 0:
processed_response = response_text
else:
processed_response = api_response.reason
return f"[{api_response.status_code} - {processed_response}]"
def identity_to_be_granted():
if userInfo is None:
return lhmSPInfo['clientId']
else:
return userInfo
def get_grant_payload(permission: str):
identity = identity_to_be_granted()
identity_key = 'user_name'
if userInfo is None:
'service_principal_name'
return {'access_control_list': [
{
identity_key: identity,
'permission_level': permission
}
]}
def process_grant_response(api_response, workspace_display_name: str, grant_type: str, entity_type: str,
entity_name: str):
if api_response.status_code != 200:
logger.error(
f"[workspace={workspace_display_name}] Could not grant {grant_type} to {identity_to_be_granted()} for {entity_type} {entity_name} [{api_response.status_code}] - {process_error_response(api_response)}")
else:
logger.debug(
f"[workspace={workspace_display_name}] Successfully granted {grant_type} to {identity_to_be_granted()} for {entity_type} {entity_name}")
# utility code
def grant_cluster_permission(host, display_name, cluster):
logger.debug(
f"[workspace={display_name}] Granting permission CAN_ATTACH_TO to {identity_to_be_granted()} for cluster {cluster.get('cluster_name')}")
payload = get_grant_payload('CAN_ATTACH_TO')
api_response = requests.patch(f"https://{host}/api/2.0/permissions/clusters/{cluster.get('cluster_id')}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'}, json=payload)
process_grant_response(api_response, display_name, "CAN_ATTACH_TO", "cluster", cluster.get('cluster_name'))
def grant_job_permission(host, display_name, job):
job_id = job.get('job_id')
job_name = job.get('settings', {}).get('name', job_id)
logger.debug(
f"[workspace={display_name}] Granting permission CAN_VIEW to {identity_to_be_granted()} for job {job_name}")
payload = get_grant_payload('CAN_VIEW')
api_response = requests.patch(f"https://{host}/api/2.0/permissions/jobs/{job_id}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'}, json=payload)
process_grant_response(api_response, display_name, "CAN_VIEW", "job", job_name)
def grant_pipeline_permission(host, display_name, pipeline):
logger.debug(
f"[workspace={display_name}] Granting permission CAN_VIEW to {identity_to_be_granted()} for pipeline {pipeline.get('name')}")
payload = get_grant_payload('CAN_VIEW')
api_response = requests.patch(f"https://{host}/api/2.0/permissions/pipelines/{pipeline.get('pipeline_id')}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'}, json=payload)
process_grant_response(api_response, display_name, "CAN_VIEW", "pipeline", pipeline.get('name'))
def grant_warehouse_permission(host, display_name, warehouse):
logger.debug(
f"[workspace={display_name}] Granting permission CAN_USE to {identity_to_be_granted()} for warehouse {warehouse.get('name')}")
payload = get_grant_payload('CAN_USE')
api_response = requests.patch(f"https://{host}/api/2.0/permissions/sql/warehouses/{warehouse.get('id')}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'}, json=payload)
process_grant_response(api_response, display_name, "CAN_USE", "warehouse", warehouse.get('name'))
def grant_catalog_permission(host, display_name, catalog):
logger.debug(
f"[workspace={display_name}] Granting permissions [READ_VOLUME, SELECT, USE_CATALOG, USE_SCHEMA] to {identity_to_be_granted()} for catalog {catalog.get('name')}")
payload = {'changes': [{'principal': userInfo, 'add': ['READ_VOLUME', 'SELECT', 'USE_CATALOG', 'USE_SCHEMA']}]}
api_response = requests.patch(f"https://{host}/api/2.1/unity-catalog/permissions/catalog/{catalog.get('name')}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'}, json=payload)
process_grant_response(api_response, display_name, "READ_VOLUME, SELECT, USE_CATALOG, USE_SCHEMA", "catalog", catalog.get('name'))
def process_clusters_in_workspace(workspace: DatabricksWorkspace):
cluster_api_response = requests.get(f"https://{workspace.workspace_host}/api/2.0/clusters/list",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'})
if cluster_api_response.status_code != 200:
logger.error(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - Cluster request failure {process_error_response(cluster_api_response)}")
else:
clusters: list = cluster_api_response.json().get('clusters', [])
if len(clusters) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - No clusters in workspace. {cluster_api_response.json()}")
# TODO filter to retrieve only APC
filtered_clusters = list(filter(lambda cl: cl.get('cluster_source') != 'JOB', clusters))
for cluster in filtered_clusters:
time.sleep(timeout)
grant_cluster_permission(workspace.workspace_host, workspace.display_name, cluster)
def process_all_jobs_in_workspace(workspace: DatabricksWorkspace):
jobs = list()
has_more = True
offset = 0
try:
while has_more:
response = process_jobs_in_workspace(workspace, offset)
has_more = response.get('has_more', False)
jobs.extend(response.get('jobs', []))
if has_more:
offset = offset + 25
time.sleep(timeout)
except Exception as e:
logger.error(e)
else:
if len(jobs) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - No jobs in workspace ")
for job in jobs:
time.sleep(timeout)
grant_job_permission(workspace.workspace_host, workspace.display_name, job)
def process_jobs_in_workspace(workspace, offset: int):
jobs_api_response = requests.get(f"https://{workspace.workspace_host}/api/2.1/jobs/list?limit=25&offset={offset}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'})
if jobs_api_response.status_code != 200:
raise Exception(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - Jobs request failure {process_error_response(jobs_api_response)}")
else:
jobs_response: list = jobs_api_response.json()
if len(jobs_response) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - List jobs returned an empty list using offset {offset}. {jobs_response}")
return jobs_response
def process_all_pipelines_in_workspace(workspace: DatabricksWorkspace):
pipelines = list()
next_page_token = ""
try:
while next_page_token is not None:
pipelines_api_response = process_pipelines_in_workspace(workspace, next_page_token)
next_page_token = pipelines_api_response.get('next_page_token', None)
new_pipelines = pipelines_api_response.get('statuses', [])
if len(new_pipelines) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - List pipelines returned an empty list. {pipelines_api_response}")
pipelines.extend(new_pipelines)
time.sleep(timeout)
except Exception as e:
logger.error(e)
else:
if len(pipelines) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - No pipelines in workspace")
for pipeline in pipelines:
time.sleep(timeout)
grant_pipeline_permission(workspace.workspace_host, workspace.display_name, pipeline)
def process_pipelines_in_workspace(workspace: DatabricksWorkspace, next_page_token):
if next_page_token is not None and next_page_token != "":
token = f"&page_token={next_page_token}"
else:
token = ""
pipelines_api_response = requests.get(
f"https://{workspace.workspace_host}/api/2.0/pipelines?max_results=100{token}",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'})
if pipelines_api_response.status_code != 200:
raise Exception(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - Pipelines request failure {process_error_response(pipelines_api_response)}")
else:
pipelines_response: dict = pipelines_api_response.json()
return pipelines_response
def process_warehouses_in_workspace(workspace: DatabricksWorkspace):
warehouses_api_response = requests.get(f"https://{workspace.workspace_host}/api/2.0/sql/warehouses",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'})
if warehouses_api_response.status_code != 200:
logger.error(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - Warehouses request failure {process_error_response(warehouses_api_response)}")
else:
warehouses: list = warehouses_api_response.json().get('warehouses', [])
if len(warehouses) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - No warehouses in workspace. {warehouses_api_response.json()}")
for warehouse in warehouses:
time.sleep(timeout)
grant_warehouse_permission(workspace.workspace_host, workspace.display_name, warehouse)
def process_unity_catalogs_in_workspace(workspace: DatabricksWorkspace):
catalogs_api_response = requests.get(f"https://{workspace.workspace_host}/api/2.1/unity-catalog/catalogs",
headers={'Authorization': f'Bearer {dbx_admin_sp_api_token}'})
if catalogs_api_response.status_code != 200:
logger.error(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - Unity Catalog request failure {process_error_response(catalogs_api_response)}")
else:
catalogs: list = catalogs_api_response.json().get('catalogs', [])
if len(catalogs) == 0:
logger.warning(
f"[subscription={workspace.subscription_id}][workspace={workspace.display_name}] - No unity catalogs in workspace. {catalogs_api_response.json()}")
for catalog in catalogs:
time.sleep(timeout)
grant_catalog_permission(workspace.workspace_host, workspace.display_name, catalog)
def process_all_entities_in_workspace(workspace: DatabricksWorkspace):
msg = f"all databricks entities in workspace {workspace}" + (f", excluding: {excluded}" if excluded != [] else "")
logger.info(f"Processing {msg}")
if 'clusters' not in excluded:
process_clusters_in_workspace(workspace)
if 'jobs' not in excluded:
process_all_jobs_in_workspace(workspace)
if 'pipelines' not in excluded:
process_all_pipelines_in_workspace(workspace)
if 'warehouses' not in excluded:
process_warehouses_in_workspace(workspace)
if 'catalogs' not in excluded:
process_unity_catalogs_in_workspace(workspace)
logger.info(f"Finished processing {msg}")
def list_lhm_subscriptions(sub_id: str):
logger.debug("Listing all subscriptions used by LHM app")
subscriptions_response = requests.get(f"{lhmURL}/api/1.0/subscriptions", headers=lhmSPInfo)
if subscriptions_response.status_code != 200:
logger.error(f"Get subscriptions failed [{subscriptions_response.status_code}] - {subscriptions_response.text}")
subscriptions = list(
map(lambda sub: Subscription(sub['subscriptionId'], sub['displayName']), subscriptions_response.json()))
if len(subscriptions) == 0:
logger.warning(f"No subscriptions used by LHM app were found.")
if sub_id is None:
subscriptions_metadata = list(map(lambda sub: str(sub), subscriptions))
logger.debug(f"Found {len(subscriptions)} subscriptions used by LHM app. {str(subscriptions_metadata)}")
return subscriptions
else:
logger.debug(f"Searching {sub_id} in subscriptions used by LHM app")
filtered_subscriptions = list(filter(lambda sub: sub.subscription_id == sub_id, subscriptions))
if len(filtered_subscriptions) == 0:
logger.warning(f"Subscription {sub_id} not found in subscriptions used by LHM app")
else:
logger.info(f"Subscription {sub_id} was successfully found in subscriptions used by LHM app")
return filtered_subscriptions
def list_lhm_workspaces_in_subscription(subscription: Subscription, dbx_id: str):
logger.debug(f"Listing all workspaces of subscription {subscription} used in LHM app")
workspaces_response = requests.get(f'{lhmURL}/api/1.0/subscriptions/{subscription.subscription_id}/workspaces',
headers=lhmSPInfo)
if workspaces_response.status_code != 200:
raise Exception(f"Workspace request failure for subscription {subscription} - {workspaces_response.text}")
workspaces: list = list(
map(lambda workspace: DatabricksWorkspace(subscription, workspace['workspaceId'], workspace['displayName'],
workspace['workspaceHost'],
workspace['isPremium']), workspaces_response.json()))
premium_workspaces = list(filter(lambda workspace: workspace.is_premium, workspaces))
if len(premium_workspaces) == 0:
raise Exception(f"Subscription {subscription.display_name} - No premium workspaces in subscription")
if dbx_id is None:
workspaces_names = list(map(lambda workspace: workspace.display_name, premium_workspaces))
logger.info(
f"Found {len(premium_workspaces)} premium workspaces in subscription {subscription}. {', '.join(workspaces_names)}")
return premium_workspaces
else:
filtered_workspaces = list(filter(lambda workspace: workspace.workspace_id == dbx_id, premium_workspaces))
if len(filtered_workspaces) == 0:
logger.warning(f"Workspace {dbx_id} not found in workspaces used by LHM app in subscription {subscription}")
else:
logger.info(
f"Workspace {dbx_id} was successfully found in workspaces used by LHM app in subscription {subscription}")
return filtered_workspaces
# rest of script
def main():
subscriptions = list()
premium_workspaces = list()
if workspace_id is not None and subscription_id is not None:
logger.info(
f"Application configured to grant permissions only for workspace {workspace_id} of subscription: {subscription_id}")
workspace = DatabricksWorkspace(Subscription(subscription_id, subscription_id), workspace_id, workspace_id,
workspace_host, True)
process_all_entities_in_workspace(workspace)
elif subscription_id is not None:
logger.info(
f"Application configured to grant permissions for all workspaces of subscription: {subscription_id}")
subscriptions.append(Subscription(subscription_id, subscription_id))
elif workspace_id is not None:
logger.info(f"Application configured to grant permissions only for workspace {workspace_id}")
workspace = DatabricksWorkspace(Subscription("unknown", "unknown"), workspace_id, workspace_id, workspace_host,
True)
process_all_entities_in_workspace(workspace)
else:
logger.info(f"Application configure to grant permissions for all subscriptions and workspaces used by LHM app")
subscriptions = list_lhm_subscriptions(subscription_id)
for subscription in subscriptions:
try:
premium_workspaces = list_lhm_workspaces_in_subscription(subscription, workspace_id)
except Exception as e:
logger.error(e)
for workspace in premium_workspaces:
process_all_entities_in_workspace(workspace)
if len(premium_workspaces) > 0:
logger.info(f"Processed all workspaces of subscription {subscription} ")
logger.info("Processed all entities.\nProcess finished.")
if __name__ == '__main__':
main()
To run it you need to set the needed environment variables:
LHM_URL
- URL pointing to LHM. Needs to be in the form:https://lhm.example.com
DBX_*
- details of the Admin Service Principal which is able to grant permissionsLHM_*
- details of the LHM Service Principal which needs to be granted permissionsUSER_NAME
- if an user needs to be granted permissions. (LHM_*
values must be provided ifWORKSPACE_HOST
is not set )
Optional configuration support:
LOG_LEVEL
- Default is WARNDBX_API_THROTTLE_TIMEOUT_IN_MS
- default to 50. Variable to set timeout between dbx requests.SUBSCRIPTION_ID
- If subscriptionId is provided, permissions will be granted for all workspaces in this subscriptionWORKSPACE_HOST
- If workspaceHost is provided, permissions will be granted only for this workspace.
Example command line where the above script is called grant_sp.py
:
export DBX_CLIENTID=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXX
export DBX_CLIENT_SECRET=<Client secret for the admin SP>
export DBX_TENANTID=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXX
export LHM_CLIENTID=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXX
export LHM_CLIENT_SECRET=<Client secret for the LHM SP>
export LHM_TENANTID=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXX
export LHM_URL=https://my-demo-lhm.example.org
# Optional configuration support
# export LOG_LEVEL=INFO # DEBUG, WARN, ERROR
# export SUBSCRIPTION_ID=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXX
# export WORKSPACE_HOST=adb-{my-workspace}.azuredatabricks.net
python3 grant_sp.py