用于创建作业的 Azure Databricks API,成功调用 API 后未创建作业

发布于 2025-01-12 11:09:37 字数 5100 浏览 1 评论 0原文

我使用 python 3.6 对 Azure Databricks 进行 API 调用,以创建一个运行特定笔记本的作业。我已按照 此链接。唯一的区别是我使用的是 python 而不是curl。我编写的代码如下:

import requests
import os
import json


dbrks_create_job_url =  "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net//2.0/jobs/create"

DBRKS_REQ_HEADERS = {
    'Authorization': 'Bearer ' + os.environ['DBRKS_BEARER_TOKEN'],
    'X-Databricks-Azure-Workspace-Resource-Id': '/subscriptions/'+ os.environ['DBRKS_SUBSCRIPTION_ID'] +'/resourceGroups/'+ os.environ['DBRKS_RESOURCE_GROUP'] +'/providers/Microsoft.Databricks/workspaces/' + os.environ['DBRKS_WORKSPACE_NAME'],
    'X-Databricks-Azure-SP-Management-Token': os.environ['DBRKS_MANAGEMENT_TOKEN']}


body_json = """
    {
    "name": "A sample job to trigger from DevOps",
    "tasks": [
        {
        "task_key": "ExecuteNotebook",
        "description": "Execute uploaded notebook including tests",
        "depends_on": [],
        "existing_cluster_id": """ + os.environ["DBRKS_CLUSTER_ID"] + """,
        "notebook_task": {
          "notebook_path": "/Users/myuser/sample-notebook",
          "base_parameters": {}
        },
        "timeout_seconds": 300,
        "max_retries": 1,
        "min_retry_interval_millis": 5000,
        "retry_on_timeout": false
      }
],
    "email_notifications": {},
    "name": "my_test_job",
    "max_concurrent_runs": 1}
"""

print("Request body in json format:")
print(body_json)

response = requests.post(dbrks_create_job_url, headers=DBRKS_REQ_HEADERS, data=body_json) 

if response.status_code == 200:
    print("Job created successfully!")
    print(response.status_code)
    print(response.content)
else:
    print("job failed!")
    raise Exception(response.content)

所有操作系统环境变量都是从我的Azure DevOps管道发送的。但是,您不需要从管道执行脚本。只要您有可以访问 databricks 工作区的服务主体,您就可以从本地计算机执行它。要运行 python 脚本,您可以将这些环境变量替换为您自己的凭据。

解释脚本中的变量:

  • os.environ['DBRKS_INSTANCE']:databricks 实例的名称

  • os.environ['DBRKS_BEARER_TOKEN']:不记名令牌。您需要它来向 databricks 验证您的服务主体或用户。后来我解释了如何获得它。

  • os.environ['DBRKS_MANAGEMENT_TOKEN']:如果您使用的服务主体未添加为 databricks 工作区用户或管理员,则需要此令牌。后来我解释了如何获得它。

  • os.environ['DBRKS_SUBSCRIPTION_ID']:databricks 工作区所在的 Azure 订阅 ID。

  • os.environ['DBRKS_RESOURCE_GROUP']:databricks 工作区的 Azure 资源组的名称。

  • os.environ['DBRKS_WORKSPACE_NAME']:Azure databricks 工作区的名称。

  • os.environ["DBRKS_CLUSTER_ID"]:将在databricks中执行作业的集群ID。

当我运行脚本时,我得到状态代码 200,这意味着它应该正常工作,如下所示:

< img src="https://i.sstatic.net/wAVm5.png" alt="Python 脚本运行成功">

但是,当我查看作业列表时,尽管状态为 200,但没有创建新作业代码已收到!您可以在下面看到我创建的作业不存在。

工作是尽管有 200 状态代码响应,但尚未创建。

我还将 API 端点从 azuredatabricks.net//2.0/jobs/create 更改为azuredatabricks.net//2.1/jobs/create,我仍然成功运行,但没有创建任何作业!我不明白我做错了什么。如果我做错了什么,为什么它不会引发异常并给我 200 状态代码。

最后一点是能够重新生成我面临的问题:要获取上面的两个变量 DBRKS_BEARER_TOKEN 和 DBRKS_MANAGEMENT_TOKEN,您可以运行以下脚本并手动替换 os.environ['DBRKS_BEARER_TOKEN'] 和 os.environ['DBRKS_MANAGEMENT_TOKEN' ] 以及脚本执行后的打印值:

import requests
import json
import os


TOKEN_BASE_URL = 'https://login.microsoftonline.com/' + os.environ['SVCDirectoryID'] + '/oauth2/token'
TOKEN_REQ_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
TOKEN_REQ_BODY = {
       'grant_type': 'client_credentials',
       'client_id': os.environ['SVCApplicationID'],
       'client_secret': os.environ['SVCSecretKey']}



def dbrks_management_token():
        TOKEN_REQ_BODY['resource'] = 'https://management.core.windows.net/'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']


def dbrks_bearer_token():
        TOKEN_REQ_BODY['resource'] = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']

DBRKS_BEARER_TOKEN = dbrks_bearer_token()
DBRKS_MANAGEMENT_TOKEN = dbrks_management_token()

os.environ['DBRKS_BEARER_TOKEN'] = DBRKS_BEARER_TOKEN 
os.environ['DBRKS_MANAGEMENT_TOKEN'] = DBRKS_MANAGEMENT_TOKEN 

print("DBRKS_BEARER_TOKEN",os.environ['DBRKS_BEARER_TOKEN'])
print("DBRKS_MANAGEMENT_TOKEN",os.environ['DBRKS_MANAGEMENT_TOKEN'])
  • SVCDirectoryID 是 Azure Active Directory (AAD) 服务主体租户 Id
  • SVCApplicationID 是 AAD 服务主体客户端 ID 的值。
  • SVCSecretKey 是 AAD 服务主体密钥。

感谢您的宝贵意见。

I am using python 3.6 to make API calls to Azure Databricks to create a job to run a specific notebook. I have followed the instruction of using the API at this link. The only difference is I am using python rather than curl. The code I have written is as follows:

import requests
import os
import json


dbrks_create_job_url =  "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net//2.0/jobs/create"

DBRKS_REQ_HEADERS = {
    'Authorization': 'Bearer ' + os.environ['DBRKS_BEARER_TOKEN'],
    'X-Databricks-Azure-Workspace-Resource-Id': '/subscriptions/'+ os.environ['DBRKS_SUBSCRIPTION_ID'] +'/resourceGroups/'+ os.environ['DBRKS_RESOURCE_GROUP'] +'/providers/Microsoft.Databricks/workspaces/' + os.environ['DBRKS_WORKSPACE_NAME'],
    'X-Databricks-Azure-SP-Management-Token': os.environ['DBRKS_MANAGEMENT_TOKEN']}


body_json = """
    {
    "name": "A sample job to trigger from DevOps",
    "tasks": [
        {
        "task_key": "ExecuteNotebook",
        "description": "Execute uploaded notebook including tests",
        "depends_on": [],
        "existing_cluster_id": """ + os.environ["DBRKS_CLUSTER_ID"] + """,
        "notebook_task": {
          "notebook_path": "/Users/myuser/sample-notebook",
          "base_parameters": {}
        },
        "timeout_seconds": 300,
        "max_retries": 1,
        "min_retry_interval_millis": 5000,
        "retry_on_timeout": false
      }
],
    "email_notifications": {},
    "name": "my_test_job",
    "max_concurrent_runs": 1}
"""

print("Request body in json format:")
print(body_json)

response = requests.post(dbrks_create_job_url, headers=DBRKS_REQ_HEADERS, data=body_json) 

if response.status_code == 200:
    print("Job created successfully!")
    print(response.status_code)
    print(response.content)
else:
    print("job failed!")
    raise Exception(response.content)

All the OS environment variables are sent from my Azure DevOps pipeline. However, you don't need to execute the script from a pipeline. You can execute it from your local machine as long as you have a service principal with access to a databricks workspace. To run the python script, you can replace those environment variables with your own credentials.

Explaining the variables in the script:

  • os.environ['DBRKS_INSTANCE']: Name of the databricks instance

  • os.environ['DBRKS_BEARER_TOKEN']: the bearer token. You need this to authenticate your service principal or your user to databricks. Later I have explained how you can get it.

  • os.environ['DBRKS_MANAGEMENT_TOKEN']: If the service principle you are using is not added as databricks workspace users or admins, you need this token. Later I have explained how you can get it.

  • os.environ['DBRKS_SUBSCRIPTION_ID']: The Azure subscription Id where databricks workspace is.

  • os.environ['DBRKS_RESOURCE_GROUP']: Name of the Azure resource group of the databricks workspace.

  • os.environ['DBRKS_WORKSPACE_NAME']: Name of the Azure databricks workspace.

  • os.environ["DBRKS_CLUSTER_ID"]: The cluster Id which will execute the job in databricks.

When I run my script, I get the status code 200 which mean it should have worked properly as shown below:

Python script runs successfully

However, when I look into list of jobs, no new job is created despite the 200 status code received! You can see below the job I have created is not there.

Job is not created despite the 200 status code response.

I also changed the API endpoint from azuredatabricks.net//2.0/jobs/create to azuredatabricks.net//2.1/jobs/create, still I get successful run but no job is being created! I can't understand what I am doing wrong. And if I am doing something wrong, how come it doesn't raise exception and gives me 200 status code.

One final point to be able to regenerate the problem I am facing: To get the above two variables for DBRKS_BEARER_TOKEN and DBRKS_MANAGEMENT_TOKEN, you can run the following script and manually replace os.environ['DBRKS_BEARER_TOKEN'] and os.environ['DBRKS_MANAGEMENT_TOKEN'] with the printed values after script execution:

import requests
import json
import os


TOKEN_BASE_URL = 'https://login.microsoftonline.com/' + os.environ['SVCDirectoryID'] + '/oauth2/token'
TOKEN_REQ_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
TOKEN_REQ_BODY = {
       'grant_type': 'client_credentials',
       'client_id': os.environ['SVCApplicationID'],
       'client_secret': os.environ['SVCSecretKey']}



def dbrks_management_token():
        TOKEN_REQ_BODY['resource'] = 'https://management.core.windows.net/'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']


def dbrks_bearer_token():
        TOKEN_REQ_BODY['resource'] = '2ff814a6-3304-4ab8-85cb-cd0e6f879c1d'
        response = requests.get(TOKEN_BASE_URL, headers=TOKEN_REQ_HEADERS, data=TOKEN_REQ_BODY)
        if response.status_code == 200:
            print(response.status_code)
        else:
            raise Exception(response.text)
        return response.json()['access_token']

DBRKS_BEARER_TOKEN = dbrks_bearer_token()
DBRKS_MANAGEMENT_TOKEN = dbrks_management_token()

os.environ['DBRKS_BEARER_TOKEN'] = DBRKS_BEARER_TOKEN 
os.environ['DBRKS_MANAGEMENT_TOKEN'] = DBRKS_MANAGEMENT_TOKEN 

print("DBRKS_BEARER_TOKEN",os.environ['DBRKS_BEARER_TOKEN'])
print("DBRKS_MANAGEMENT_TOKEN",os.environ['DBRKS_MANAGEMENT_TOKEN'])
  • SVCDirectoryID is Azure Active Directory (AAD) service principal tenant Id
  • SVCApplicationID is the value of AAD service principal client Id.
  • SVCSecretKey is AAD service principal secret key.

Thank you for your valuable input.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

我三岁 2025-01-19 11:09:37

您混淆了 API 版本 - tasks 数组只能与 Jobs API 2.1,但您使用的是 Jobs API 2.0。另一个错误是主机名和主机名之间有 // 。小路。

只需将 dbrks_create_job_url 更改为 "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net/api/2.1/jobs/create"

You're mixing up the API versions - the tasks array could be used only with Jobs API 2.1, but you're using Jobs API 2.0. Another error is that you have // between host name & path.

Just change dbrks_create_job_url to "https://"+os.environ['DBRKS_INSTANCE']+".azuredatabricks.net/api/2.1/jobs/create"

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文