通过Web Server访问通过Cloud Nat限制IP地址后,无法访问Composer REST API
我正在尝试通过GCF触发作曲家DAG。当Web服务器访问网络访问控制允许所有IP连接时,我能够触发它,但是在限制IP之后,云函数将抛出403-禁止错误。
以下是我遵循的步骤。
- 创建了VPC网络,Cloud Router和Cloud NAT。
- 通过无服务器VPC连接器重新路由云功能的所有出口流量。
- 白名单在作曲家Web服务器访问控制中,Cloud Nat的静态外部IP。
- 更改了稳定API的作曲家CFG值。
我为GCF使用的服务帐户低于权限。
- 作曲家用户
- 编辑器
- IAP扣除的Web应用程序
以下是我正在使用的GCF代码。
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = False
def trigger_dag(data, context=None):
# Fill in with your Composer info here
# Navigate to your webserver's login page and get this from the URL
# Or use the script found at
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
client_id = 'EXAMPLE.apps.googleusercontent.com'
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = 'EXAMPLE-tp'
# The name of the DAG you wish to trigger
dag_name = 'cf_test'
if USE_EXPERIMENTAL_API:
endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
json_data = {'conf': data, 'replace_microseconds': 'false'}
else:
endpoint = f'api/v1/dags/{dag_name}/dagRuns'
json_data = {'conf': data}
webserver_url = (
'https://'
+ webserver_id
+ '.appspot.com/'
+ endpoint
)
# Make a POST request to IAP which then Triggers the DAG
make_iap_request(
webserver_url, client_id, method='POST', json=json_data)
def make_iap_request(url, client_id, method='GET', **kwargs):
# Set the default timeout, if missing
if 'timeout' not in kwargs:
kwargs['timeout'] = 90
# Obtain an OpenID Connect (OIDC) token from metadata server or using service
# account.
google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
# Fetch the Identity-Aware Proxy-protected URL, including an
# Authorization header containing "Bearer " followed by a
# Google-issued OpenID Connect token for the service account.
resp = requests.request(
method, url,
headers={'Authorization': 'Bearer {}'.format(
google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
print(resp.text)
raise Exception('Service account does not have permission to '
'access the IAP-protected application.')
elif resp.status_code != 200:
raise Exception(
'Bad response from application: {!r} / {!r} / {!r}'.format(
resp.status_code, resp.headers, resp.text))
else:
return resp.text
有人可以帮我解决这个问题吗?
I am trying to trigger a composer DAG through GCF. I am able to trigger it while web server access control for network was allowing all IPs to connect but after restricting the IP the Cloud Function is throwing 403- Forbidden Error.
Below are the steps I have followed.
- Created a VPC network, Cloud router and Cloud NAT.
- Rerouted all the egress traffic of cloud function through serverless VPC connector.
- Whitelisted Cloud NAT's static external IP in composer webserver access control.
- Changed composer cfg values for Stable API.
The service account I am using for GCF has below permissions.
- Composer User
- Editor
- IAP-secured Web App User
Below is the GCF code I am using.
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = False
def trigger_dag(data, context=None):
# Fill in with your Composer info here
# Navigate to your webserver's login page and get this from the URL
# Or use the script found at
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
client_id = 'EXAMPLE.apps.googleusercontent.com'
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = 'EXAMPLE-tp'
# The name of the DAG you wish to trigger
dag_name = 'cf_test'
if USE_EXPERIMENTAL_API:
endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
json_data = {'conf': data, 'replace_microseconds': 'false'}
else:
endpoint = f'api/v1/dags/{dag_name}/dagRuns'
json_data = {'conf': data}
webserver_url = (
'https://'
+ webserver_id
+ '.appspot.com/'
+ endpoint
)
# Make a POST request to IAP which then Triggers the DAG
make_iap_request(
webserver_url, client_id, method='POST', json=json_data)
def make_iap_request(url, client_id, method='GET', **kwargs):
# Set the default timeout, if missing
if 'timeout' not in kwargs:
kwargs['timeout'] = 90
# Obtain an OpenID Connect (OIDC) token from metadata server or using service
# account.
google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
# Fetch the Identity-Aware Proxy-protected URL, including an
# Authorization header containing "Bearer " followed by a
# Google-issued OpenID Connect token for the service account.
resp = requests.request(
method, url,
headers={'Authorization': 'Bearer {}'.format(
google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
print(resp.text)
raise Exception('Service account does not have permission to '
'access the IAP-protected application.')
elif resp.status_code != 200:
raise Exception(
'Bad response from application: {!r} / {!r} / {!r}'.format(
resp.status_code, resp.headers, resp.text))
else:
return resp.text
Can Someone please help me to sort this out?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论