我无法在AWS EMR上运行Dask纱线群

发布于 2025-02-10 07:32:44 字数 8922 浏览 0 评论 0原文

我想使用YarnCluster在EMR上运行Dask。 我已经在下面使用了Bootstrap脚本,但是我已经在SSH控制台中运行了这些说明。

HELP="Usage: bootstrap-dask [OPTIONS]

Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter

By default it does the following things:
- Installs miniconda
- Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be
  extended using the --conda-packages flag below.
- Packages this environment for distribution to the workers.
- Installs and starts a jupyter notebook server running on port 8888. This can
  be disabled with the --no-jupyter flag below.

    --jupyter / --no-jupyter    Whether to also install and start a Jupyter
                                Notebook Server. Default is True.
    --password, -pw             Set the password for the Jupyter Notebook
                                Server. Default is 'dask-user'.
    --conda-packages            Extra packages to install from conda.

# Parse Inputs. This is specific to this script, and can be ignored
# -----------------------------------------------------------------

# -----------------------------------------------------------------------------
# 1. Check if running on the master node. If not, there's nothing do.
# -----------------------------------------------------------------------------
grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \
|| { echo "Not running on master node, nothing to do" && exit 0; }

# -----------------------------------------------------------------------------
# 2. Install Miniconda
# -----------------------------------------------------------------------------
echo "Installing Miniconda"
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh
bash /tmp/miniconda.sh -b -p $HOME/miniconda
rm /tmp/miniconda.sh
echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc
source $HOME/.bashrc
conda update conda -y

# configure conda environment
#source ~/miniconda/etc/profile.d/conda.sh
#conda activate base

# -----------------------------------------------------------------------------
# 3. Install packages to use in packaged environment
# We install a few packages by default, and allow users to extend this list
# with a CLI flag:
# - dask-yarn >= 0.7.0, for deploying Dask on YARN.
# - pyarrow for working with hdfs, parquet, ORC, etc...
# - s3fs for access to s3
# - conda-pack for packaging the environment for distribution
# - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy
# -----------------------------------------------------------------------------
echo "Installing base packages"
conda install \
-c conda-forge \
-y \
-q \
dask-yarn \
s3fs \
conda-pack \

pip3 install pyarrow

# -----------------------------------------------------------------------------
# 4. Package the environment to be distributed to worker nodes
# -----------------------------------------------------------------------------
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz

# -----------------------------------------------------------------------------
# 5. List all packages in the worker environment
# -----------------------------------------------------------------------------
echo "Packages installed in the worker environment:"
conda list

# -----------------------------------------------------------------------------
# 6. Configure Dask
# This isn't necessary, but for this particular bootstrap script it will make a
# few things easier:
# - Configure the cluster's dashboard link to show the proxied version through
#   jupyter-server-proxy. This allows access to the dashboard with only an ssh
#   tunnel to the notebook.
# - Specify the pre-packaged python environment, so users don't have to
# - Set the default deploy-mode to local, so the dashboard proxying works
# - Specify the location of the native libhdfs library so pyarrow can find it
#   on the workers and the client (if submitting applications).
# ------------------------------------------------------------------------------
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
    link: "/proxy/{port}/status"

  environment: /home/hadoop/environment.tar.gz
  deploy-mode: local

      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/

      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc

# -----------------------------------------------------------------------------
# 8. Install jupyter notebook server and dependencies
# We do this after packaging the worker environments to keep the tar.gz as
# small as possible.
# We install the following packages:
# - notebook: the Jupyter Notebook Server
# - ipywidgets: used to provide an interactive UI for the YarnCluster objects
# - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server
# -----------------------------------------------------------------------------
    echo "Installing Jupyter"
    conda install \
    -c conda-forge \
    -y \
    -q \
    notebook \
    ipywidgets \
    jupyter-server-proxy \

# -----------------------------------------------------------------------------
# 9. List all packages in the client environment
# -----------------------------------------------------------------------------
echo "Packages installed in the client environment:"
conda list

# -----------------------------------------------------------------------------
# 10. Configure Jupyter Notebook
# -----------------------------------------------------------------------------
echo "Configuring Jupyter"
mkdir -p $HOME/.jupyter
HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"`
cat <<EOF >> $HOME/.jupyter/jupyter_notebook_config.py
c.NotebookApp.password = u'$HASHED_PASSWORD'
c.NotebookApp.open_browser = False
c.NotebookApp.ip = ''
c.NotebookApp.port = 8888

# -----------------------------------------------------------------------------
# 11. Define an upstart service for the Jupyter Notebook Server
# This sets the notebook server up to properly run as a background service.
# -----------------------------------------------------------------------------
echo "Configuring Jupyter Notebook Upstart Service"
cat <<EOF > /tmp/jupyter-notebook.service
Description=Jupyter Notebook
ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/
sudo systemctl enable jupyter-notebook.service

# -----------------------------------------------------------------------------
# 12. Start the Jupyter Notebook Server
# -----------------------------------------------------------------------------
echo "Starting Jupyter Notebook Server"
sudo systemctl daemon-reload
sudo systemctl restart jupyter-notebook.service

#$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

之后,我使用$ home/miniconda/bin/bin/jupyter-notebook启动jupyter笔记本 - allow-root-config = $ home/.jupyter/jupyter/jupyter_notebook_config.py jupyter笔记本成功开始。 当我在笔记本上运行此代码时,

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)


AttributeError                            Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)

File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    832 elif isinstance(getattr(address, "scheduler_address", None), str):
    833     # It's a LocalCluster or LocalCluster-compatible object
    834     self.cluster = address
--> 835     status = getattr(self.cluster, "status")
    836     if status and status in [Status.closed, Status.closing]:
    837         raise RuntimeError(
    838             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    839         )

AttributeError: 'YarnCluster' object has no attribute 'status'

它会像使用LocalCluster而不是YarnCluster时一样 。我被困在这里几天,请帮忙。还如何配置工作者节​​点。

I want run dask on EMR using YarnCluster.
I have used below bootstrap script but I have run these instructions in SSH console.

HELP="Usage: bootstrap-dask [OPTIONS]

Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter

By default it does the following things:
- Installs miniconda
- Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be
  extended using the --conda-packages flag below.
- Packages this environment for distribution to the workers.
- Installs and starts a jupyter notebook server running on port 8888. This can
  be disabled with the --no-jupyter flag below.

    --jupyter / --no-jupyter    Whether to also install and start a Jupyter
                                Notebook Server. Default is True.
    --password, -pw             Set the password for the Jupyter Notebook
                                Server. Default is 'dask-user'.
    --conda-packages            Extra packages to install from conda.

# Parse Inputs. This is specific to this script, and can be ignored
# -----------------------------------------------------------------

# -----------------------------------------------------------------------------
# 1. Check if running on the master node. If not, there's nothing do.
# -----------------------------------------------------------------------------
grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \
|| { echo "Not running on master node, nothing to do" && exit 0; }

# -----------------------------------------------------------------------------
# 2. Install Miniconda
# -----------------------------------------------------------------------------
echo "Installing Miniconda"
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh
bash /tmp/miniconda.sh -b -p $HOME/miniconda
rm /tmp/miniconda.sh
echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc
source $HOME/.bashrc
conda update conda -y

# configure conda environment
#source ~/miniconda/etc/profile.d/conda.sh
#conda activate base

# -----------------------------------------------------------------------------
# 3. Install packages to use in packaged environment
# We install a few packages by default, and allow users to extend this list
# with a CLI flag:
# - dask-yarn >= 0.7.0, for deploying Dask on YARN.
# - pyarrow for working with hdfs, parquet, ORC, etc...
# - s3fs for access to s3
# - conda-pack for packaging the environment for distribution
# - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy
# -----------------------------------------------------------------------------
echo "Installing base packages"
conda install \
-c conda-forge \
-y \
-q \
dask-yarn \
s3fs \
conda-pack \

pip3 install pyarrow

# -----------------------------------------------------------------------------
# 4. Package the environment to be distributed to worker nodes
# -----------------------------------------------------------------------------
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz

# -----------------------------------------------------------------------------
# 5. List all packages in the worker environment
# -----------------------------------------------------------------------------
echo "Packages installed in the worker environment:"
conda list

# -----------------------------------------------------------------------------
# 6. Configure Dask
# This isn't necessary, but for this particular bootstrap script it will make a
# few things easier:
# - Configure the cluster's dashboard link to show the proxied version through
#   jupyter-server-proxy. This allows access to the dashboard with only an ssh
#   tunnel to the notebook.
# - Specify the pre-packaged python environment, so users don't have to
# - Set the default deploy-mode to local, so the dashboard proxying works
# - Specify the location of the native libhdfs library so pyarrow can find it
#   on the workers and the client (if submitting applications).
# ------------------------------------------------------------------------------
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
    link: "/proxy/{port}/status"

  environment: /home/hadoop/environment.tar.gz
  deploy-mode: local

      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/

      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc

# -----------------------------------------------------------------------------
# 8. Install jupyter notebook server and dependencies
# We do this after packaging the worker environments to keep the tar.gz as
# small as possible.
# We install the following packages:
# - notebook: the Jupyter Notebook Server
# - ipywidgets: used to provide an interactive UI for the YarnCluster objects
# - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server
# -----------------------------------------------------------------------------
    echo "Installing Jupyter"
    conda install \
    -c conda-forge \
    -y \
    -q \
    notebook \
    ipywidgets \
    jupyter-server-proxy \

# -----------------------------------------------------------------------------
# 9. List all packages in the client environment
# -----------------------------------------------------------------------------
echo "Packages installed in the client environment:"
conda list

# -----------------------------------------------------------------------------
# 10. Configure Jupyter Notebook
# -----------------------------------------------------------------------------
echo "Configuring Jupyter"
mkdir -p $HOME/.jupyter
HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"`
cat <<EOF >> $HOME/.jupyter/jupyter_notebook_config.py
c.NotebookApp.password = u'$HASHED_PASSWORD'
c.NotebookApp.open_browser = False
c.NotebookApp.ip = ''
c.NotebookApp.port = 8888

# -----------------------------------------------------------------------------
# 11. Define an upstart service for the Jupyter Notebook Server
# This sets the notebook server up to properly run as a background service.
# -----------------------------------------------------------------------------
echo "Configuring Jupyter Notebook Upstart Service"
cat <<EOF > /tmp/jupyter-notebook.service
Description=Jupyter Notebook
ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/
sudo systemctl enable jupyter-notebook.service

# -----------------------------------------------------------------------------
# 12. Start the Jupyter Notebook Server
# -----------------------------------------------------------------------------
echo "Starting Jupyter Notebook Server"
sudo systemctl daemon-reload
sudo systemctl restart jupyter-notebook.service

#$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

after this i start jupyter notebook using $HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
jupyter notebook start successfully.
When i run this code on notebook

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)

it gives error like

AttributeError                            Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)

File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    832 elif isinstance(getattr(address, "scheduler_address", None), str):
    833     # It's a LocalCluster or LocalCluster-compatible object
    834     self.cluster = address
--> 835     status = getattr(self.cluster, "status")
    836     if status and status in [Status.closed, Status.closing]:
    837         raise RuntimeError(
    838             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    839         )

AttributeError: 'YarnCluster' object has no attribute 'status'

Also when I use LocalCluster instead of YarnCluster it run perfectly. I am stuck here for days please help. Also how we configure worker nodes.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。



需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。