google.api_core.exceptions.notfound Bucket不存在
当我在气流中运行data_ingestion_gcs_gcs_dag dag时。我指定了使用Docker-Compose对Google帐户的访问,这是下面的代码,我仅插入了代码的第一部分:
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
build:
context: .
dockerfile: ./Dockerfile
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'
# TODO: Please change GCP_PROJECT_ID & GCP_GCS_BUCKET, as per your config
GCP_PROJECT_ID: 'real-dtc-de'
GCP_GCS_BUCKET: 'dtc_data_lake_real-dtc-de'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ~/.google/credentials/:/.google/credentials:ro
这是来自DAG代码的代码,下面显示:
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
BUCKET = os.environ.get("GCP_GCS_BUCKET")
这是DAG的日志:
*** Reading local file: /opt/airflow/logs/data_ingestion_gcs_dag/local_to_gcs_task/2022-06-13T02:47:29.654918+00:00/1.log
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [queued]>
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [queued]>
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 2
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1259} INFO - Executing <Task(PythonOperator): local_to_gcs_task> on 2022-06-13 02:47:29.654918+00:00
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:52} INFO - Started process 1042 to run task
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'data_ingestion_gcs_dag', 'local_to_gcs_task', 'manual__2022-06-13T02:47:29.654918+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/data_ingestion_gcs_dag.py', '--cfg-path', '/tmp/tmp11gg9aoy', '--error-file', '/tmp/tmpjbp6yrks']
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:77} INFO - Job 11: Subtask local_to_gcs_task
[2022-06-13, 02:47:36 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [running]> on host aea7312db396
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=data_ingestion_gcs_dag
AIRFLOW_CTX_TASK_ID=local_to_gcs_task
AIRFLOW_CTX_EXECUTION_DATE=2022-06-13T02:47:29.654918+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-13T02:47:29.654918+00:00
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2594, in upload_from_file
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2396, in _do_upload
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1917, in _do_multipart_upload
transport, data, object_metadata, content_type, timeout=timeout
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 154, in transmit
retriable_request, self._get_status_code, self._retry_strategy
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/_request_helpers.py", line 147, in wait_and_retry
response = func()
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 149, in retriable_request
self._process_response(result)
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/_upload.py", line 113, in _process_response
_helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/_helpers.py", line 104, in require_status_code
*status_codes
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1511, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 185, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/data_ingestion_gcs_dag.py", line 51, in upload_to_gcs
blob.upload_from_filename(local_file)
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2735, in upload_from_filename
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2598, in upload_from_file
_raise_from_invalid_response(exc)
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 4466, in _raise_from_invalid_response
raise exceptions.from_http_status(response.status_code, message, response=response)
google.api_core.exceptions.NotFound: 404 POST https://storage.googleapis.com/upload/storage/v1/b/dtc_data_lake_animated-surfer-338618/o?uploadType=multipart: {
"error": {
"code": 404,
"message": "The specified bucket does not exist.",
"errors": [
{
"message": "The specified bucket does not exist.",
"domain": "global",
"reason": "notFound"
}
]
}
}
When I'm running data_ingestion_gcs_dag DAG in Airflow.I get error that it can not find a specified bucket, however, I rechecked it and the bucket name is fine. I have specified access to Google account with docker-compose, here is code down below, i have inserted only first part of code:
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
build:
context: .
dockerfile: ./Dockerfile
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/google_credentials.json
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/google_credentials.json'
# TODO: Please change GCP_PROJECT_ID & GCP_GCS_BUCKET, as per your config
GCP_PROJECT_ID: 'real-dtc-de'
GCP_GCS_BUCKET: 'dtc_data_lake_real-dtc-de'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ~/.google/credentials/:/.google/credentials:ro
And here is code from DAG code, presented down below:
PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
BUCKET = os.environ.get("GCP_GCS_BUCKET")
Here is logs from DAG:
*** Reading local file: /opt/airflow/logs/data_ingestion_gcs_dag/local_to_gcs_task/2022-06-13T02:47:29.654918+00:00/1.log
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [queued]>
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1032} INFO - Dependencies all met for <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [queued]>
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1238} INFO -
--------------------------------------------------------------------------------
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 of 2
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1240} INFO -
--------------------------------------------------------------------------------
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1259} INFO - Executing <Task(PythonOperator): local_to_gcs_task> on 2022-06-13 02:47:29.654918+00:00
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:52} INFO - Started process 1042 to run task
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'data_ingestion_gcs_dag', 'local_to_gcs_task', 'manual__2022-06-13T02:47:29.654918+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/data_ingestion_gcs_dag.py', '--cfg-path', '/tmp/tmp11gg9aoy', '--error-file', '/tmp/tmpjbp6yrks']
[2022-06-13, 02:47:36 UTC] {standard_task_runner.py:77} INFO - Job 11: Subtask local_to_gcs_task
[2022-06-13, 02:47:36 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: data_ingestion_gcs_dag.local_to_gcs_task manual__2022-06-13T02:47:29.654918+00:00 [running]> on host aea7312db396
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=data_ingestion_gcs_dag
AIRFLOW_CTX_TASK_ID=local_to_gcs_task
AIRFLOW_CTX_EXECUTION_DATE=2022-06-13T02:47:29.654918+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-13T02:47:29.654918+00:00
[2022-06-13, 02:47:36 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2594, in upload_from_file
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2396, in _do_upload
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 1917, in _do_multipart_upload
transport, data, object_metadata, content_type, timeout=timeout
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 154, in transmit
retriable_request, self._get_status_code, self._retry_strategy
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/_request_helpers.py", line 147, in wait_and_retry
response = func()
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/requests/upload.py", line 149, in retriable_request
self._process_response(result)
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/_upload.py", line 113, in _process_response
_helpers.require_status_code(response, (http.client.OK,), self._get_status_code)
File "/home/airflow/.local/lib/python3.7/site-packages/google/resumable_media/_helpers.py", line 104, in require_status_code
*status_codes
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1511, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 174, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 185, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/data_ingestion_gcs_dag.py", line 51, in upload_to_gcs
blob.upload_from_filename(local_file)
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2735, in upload_from_filename
retry=retry,
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 2598, in upload_from_file
_raise_from_invalid_response(exc)
File "/home/airflow/.local/lib/python3.7/site-packages/google/cloud/storage/blob.py", line 4466, in _raise_from_invalid_response
raise exceptions.from_http_status(response.status_code, message, response=response)
google.api_core.exceptions.NotFound: 404 POST https://storage.googleapis.com/upload/storage/v1/b/dtc_data_lake_animated-surfer-338618/o?uploadType=multipart: {
"error": {
"code": 404,
"message": "The specified bucket does not exist.",
"errors": [
{
"message": "The specified bucket does not exist.",
"domain": "global",
"reason": "notFound"
}
]
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论