使用Apache气流检查文件是否存在于Azure DataLake上的最佳方法是什么?

发布于 2025-01-26 12:45:10 字数 117 浏览 3 评论 0 原文

我有一个DAG,该DAG应检查是否已将文件上传到特定目录中的Azure Datalake。如果是这样,它允许其他DAG运行。

我考虑过使用FileSensor,但我认为FSCONNID参数不足以针对数据验证

I have a DAG that shall check if a file has been uploaded to Azure DataLake in a specific directory. If so, it allow other DAGs to run.

I thought about using a FileSensor, but I assume a fsconnid parameter is not enough to authenticate against a DataLake

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

み格子的夏天 2025-02-02 12:45:10

Azure provider but you can easily implement one since the

我没有测试它,但这应该可以工作:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
    """
    Sense for files in Azure Data Lake

    :param path: The Azure Data Lake path to find the objects. Supports glob
        strings (templated)
    :param azure_data_lake_conn_id: The Azure Data Lake conn
    """

    template_fields: Sequence[str] = ('path',)
    ui_color = '#901dd2'

    def __init__(
        self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.path = path
        self.azure_data_lake_conn_id = azure_data_lake_conn_id

    def poke(self, context: "Context") -> bool:
        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
        self.log.info('Poking for file in path: %s', self.path)
        try:
            hook.check_for_file(file_path=self.path)
            return True
        except FileNotFoundError:
            pass
        return False

用法示例:

MyAzureDataLakeSensor(
    task_id='adls_sense',
    path='folder/file.csv',
    azure_data_lake_conn_id='azure_data_lake_default',
    mode='reschedule'
)

There is no AzureDataLakeSensor in the Azure provider but you can easily implement one since the AzureDataLakeHook has check_for_file function so all needed is to wrap this function with Sensor class implementing poke() function of BaseSensorOperator. By doing so you can use Microsoft Azure Data Lake Connection directly.

I didn't test it but this should work:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
    """
    Sense for files in Azure Data Lake

    :param path: The Azure Data Lake path to find the objects. Supports glob
        strings (templated)
    :param azure_data_lake_conn_id: The Azure Data Lake conn
    """

    template_fields: Sequence[str] = ('path',)
    ui_color = '#901dd2'

    def __init__(
        self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.path = path
        self.azure_data_lake_conn_id = azure_data_lake_conn_id

    def poke(self, context: "Context") -> bool:
        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
        self.log.info('Poking for file in path: %s', self.path)
        try:
            hook.check_for_file(file_path=self.path)
            return True
        except FileNotFoundError:
            pass
        return False

Usage example:

MyAzureDataLakeSensor(
    task_id='adls_sense',
    path='folder/file.csv',
    azure_data_lake_conn_id='azure_data_lake_default',
    mode='reschedule'
)
独孤求败 2025-02-02 12:45:10

首先,看看

我们可以看到,有专门的操作员对 Azure DataLake存储不幸的是,目前似乎只有 ADLSDELETEEPERATOR 可用。

adlsdeleteOperator 使用 azuredatalakehook 您应该在自己的自定义操作员中重复使用以检查文件是否存在。

我对您的建议是创建一个 checkoperator 使用ADLS钩检查输入中提供的文件是否存在 check_for_file hook的函数。

更新:正如注释中指出的那样,Checkoperator似乎是通过与SQL查询绑定的,并已弃用。使用自己的自定义传感器或自定义操作员是必经之路。

First of all, have a look at official Microsoft Operators for Airflow.

We can see that there are dedicated Operators to Azure DataLake Storage, unfortunately, only the ADLSDeleteOperator seems available at the moment.

This ADLSDeleteOperator uses a AzureDataLakeHook which you should reuse in your own custom operator to check for file presence.

My advice for you is to create a Child class of CheckOperator using the ADLS hook check if the file provided in input exists with check_for_file function of the hook.

UPDATE: as pointed in comments, CheckOperator seems to by tied to SQL queries and is deprecated. Using your own custom Sensor or custom Operator is the way to go.

无名指的心愿 2025-02-02 12:45:10

我使用拟议的API遇到了严重的问题。因此,我将Microsoft API嵌入了气流中。这很好。然后,您需要做的就是使用此操作员并通过Account_url和Access_Token。

from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator

class AzureDataLakeSensor(BaseSensorOperator):

   def __init__(self, path, filename, account_url, access_token, **kwargs):
      super().__init__(**kwargs)
      self._client = DataLakeServiceClient(
            account_url=account_url,
            credential=access_token
      )

      self.path = path
      self.filename = filename

  def poke(self, context):
      container = self._client.get_file_system_client(file_system="raw")
      dir_client = container.get_directory_client(self.path)
      file = dir_client.get_file_client(self.filename)
      return file.exists()

I had severe issues using the proposed API. So I embedded the Microsoft API into Airflow. This was working fine. All you need to do then is to use this operator and pass account_url and access_token.

from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator

class AzureDataLakeSensor(BaseSensorOperator):

   def __init__(self, path, filename, account_url, access_token, **kwargs):
      super().__init__(**kwargs)
      self._client = DataLakeServiceClient(
            account_url=account_url,
            credential=access_token
      )

      self.path = path
      self.filename = filename

  def poke(self, context):
      container = self._client.get_file_system_client(file_system="raw")
      dir_client = container.get_directory_client(self.path)
      file = dir_client.get_file_client(self.filename)
      return file.exists()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文