我无法在AWS EMR上运行Dask纱线群
我想使用YarnCluster在EMR上运行Dask。 我已经在下面使用了Bootstrap脚本,但是我已经在SSH控制台中运行了这些说明。
#!/bin/bash
HELP="Usage: bootstrap-dask [OPTIONS]
Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter
By default it does the following things:
- Installs miniconda
- Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be
extended using the --conda-packages flag below.
- Packages this environment for distribution to the workers.
- Installs and starts a jupyter notebook server running on port 8888. This can
be disabled with the --no-jupyter flag below.
Options:
--jupyter / --no-jupyter Whether to also install and start a Jupyter
Notebook Server. Default is True.
--password, -pw Set the password for the Jupyter Notebook
Server. Default is 'dask-user'.
--conda-packages Extra packages to install from conda.
"
# Parse Inputs. This is specific to this script, and can be ignored
# -----------------------------------------------------------------
# -----------------------------------------------------------------------------
# 1. Check if running on the master node. If not, there's nothing do.
# -----------------------------------------------------------------------------
grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \
|| { echo "Not running on master node, nothing to do" && exit 0; }
# -----------------------------------------------------------------------------
# 2. Install Miniconda
# -----------------------------------------------------------------------------
echo "Installing Miniconda"
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh
bash /tmp/miniconda.sh -b -p $HOME/miniconda
rm /tmp/miniconda.sh
echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc
source $HOME/.bashrc
conda update conda -y
# configure conda environment
#source ~/miniconda/etc/profile.d/conda.sh
#conda activate base
# -----------------------------------------------------------------------------
# 3. Install packages to use in packaged environment
#
# We install a few packages by default, and allow users to extend this list
# with a CLI flag:
#
# - dask-yarn >= 0.7.0, for deploying Dask on YARN.
# - pyarrow for working with hdfs, parquet, ORC, etc...
# - s3fs for access to s3
# - conda-pack for packaging the environment for distribution
# - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy
# -----------------------------------------------------------------------------
echo "Installing base packages"
conda install \
-c conda-forge \
-y \
-q \
dask-yarn \
s3fs \
conda-pack \
tornado
pip3 install pyarrow
# -----------------------------------------------------------------------------
# 4. Package the environment to be distributed to worker nodes
# -----------------------------------------------------------------------------
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz
# -----------------------------------------------------------------------------
# 5. List all packages in the worker environment
# -----------------------------------------------------------------------------
echo "Packages installed in the worker environment:"
conda list
# -----------------------------------------------------------------------------
# 6. Configure Dask
#
# This isn't necessary, but for this particular bootstrap script it will make a
# few things easier:
#
# - Configure the cluster's dashboard link to show the proxied version through
# jupyter-server-proxy. This allows access to the dashboard with only an ssh
# tunnel to the notebook.
#
# - Specify the pre-packaged python environment, so users don't have to
#
# - Set the default deploy-mode to local, so the dashboard proxying works
#
# - Specify the location of the native libhdfs library so pyarrow can find it
# on the workers and the client (if submitting applications).
# ------------------------------------------------------------------------------
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
distributed:
dashboard:
link: "/proxy/{port}/status"
yarn:
environment: /home/hadoop/environment.tar.gz
deploy-mode: local
worker:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
client:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
EOT
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc
# -----------------------------------------------------------------------------
# 8. Install jupyter notebook server and dependencies
#
# We do this after packaging the worker environments to keep the tar.gz as
# small as possible.
#
# We install the following packages:
#
# - notebook: the Jupyter Notebook Server
# - ipywidgets: used to provide an interactive UI for the YarnCluster objects
# - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server
# -----------------------------------------------------------------------------
echo "Installing Jupyter"
conda install \
-c conda-forge \
-y \
-q \
notebook \
ipywidgets \
jupyter-server-proxy \
jupyter
# -----------------------------------------------------------------------------
# 9. List all packages in the client environment
# -----------------------------------------------------------------------------
echo "Packages installed in the client environment:"
conda list
# -----------------------------------------------------------------------------
# 10. Configure Jupyter Notebook
# -----------------------------------------------------------------------------
echo "Configuring Jupyter"
mkdir -p $HOME/.jupyter
JUPYTER_PASSWORD="dask-user"
HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"`
cat <<EOF >> $HOME/.jupyter/jupyter_notebook_config.py
c.NotebookApp.password = u'$HASHED_PASSWORD'
c.NotebookApp.open_browser = False
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.port = 8888
EOF
# -----------------------------------------------------------------------------
# 11. Define an upstart service for the Jupyter Notebook Server
#
# This sets the notebook server up to properly run as a background service.
# -----------------------------------------------------------------------------
echo "Configuring Jupyter Notebook Upstart Service"
cat <<EOF > /tmp/jupyter-notebook.service
[Unit]
Description=Jupyter Notebook
[Service]
ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
Type=simple
PIDFile=/run/jupyter.pid
WorkingDirectory=$HOME
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/
sudo systemctl enable jupyter-notebook.service
# -----------------------------------------------------------------------------
# 12. Start the Jupyter Notebook Server
# -----------------------------------------------------------------------------
echo "Starting Jupyter Notebook Server"
sudo systemctl daemon-reload
sudo systemctl restart jupyter-notebook.service
#$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
之后,我使用$ home/miniconda/bin/bin/jupyter-notebook启动jupyter笔记本 - allow-root-config = $ home/.jupyter/jupyter/jupyter_notebook_config.py
jupyter笔记本成功开始。 当我在笔记本上运行此代码时,
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
出现错误
AttributeError Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)
File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
832 elif isinstance(getattr(address, "scheduler_address", None), str):
833 # It's a LocalCluster or LocalCluster-compatible object
834 self.cluster = address
--> 835 status = getattr(self.cluster, "status")
836 if status and status in [Status.closed, Status.closing]:
837 raise RuntimeError(
838 f"Trying to connect to an already closed or closing Cluster {self.cluster}."
839 )
AttributeError: 'YarnCluster' object has no attribute 'status'
它会像使用LocalCluster而不是YarnCluster时一样 。我被困在这里几天,请帮忙。还如何配置工作者节点。
I want run dask on EMR using YarnCluster.
I have used below bootstrap script but I have run these instructions in SSH console.
#!/bin/bash
HELP="Usage: bootstrap-dask [OPTIONS]
Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter
By default it does the following things:
- Installs miniconda
- Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be
extended using the --conda-packages flag below.
- Packages this environment for distribution to the workers.
- Installs and starts a jupyter notebook server running on port 8888. This can
be disabled with the --no-jupyter flag below.
Options:
--jupyter / --no-jupyter Whether to also install and start a Jupyter
Notebook Server. Default is True.
--password, -pw Set the password for the Jupyter Notebook
Server. Default is 'dask-user'.
--conda-packages Extra packages to install from conda.
"
# Parse Inputs. This is specific to this script, and can be ignored
# -----------------------------------------------------------------
# -----------------------------------------------------------------------------
# 1. Check if running on the master node. If not, there's nothing do.
# -----------------------------------------------------------------------------
grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \
|| { echo "Not running on master node, nothing to do" && exit 0; }
# -----------------------------------------------------------------------------
# 2. Install Miniconda
# -----------------------------------------------------------------------------
echo "Installing Miniconda"
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh
bash /tmp/miniconda.sh -b -p $HOME/miniconda
rm /tmp/miniconda.sh
echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc
source $HOME/.bashrc
conda update conda -y
# configure conda environment
#source ~/miniconda/etc/profile.d/conda.sh
#conda activate base
# -----------------------------------------------------------------------------
# 3. Install packages to use in packaged environment
#
# We install a few packages by default, and allow users to extend this list
# with a CLI flag:
#
# - dask-yarn >= 0.7.0, for deploying Dask on YARN.
# - pyarrow for working with hdfs, parquet, ORC, etc...
# - s3fs for access to s3
# - conda-pack for packaging the environment for distribution
# - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy
# -----------------------------------------------------------------------------
echo "Installing base packages"
conda install \
-c conda-forge \
-y \
-q \
dask-yarn \
s3fs \
conda-pack \
tornado
pip3 install pyarrow
# -----------------------------------------------------------------------------
# 4. Package the environment to be distributed to worker nodes
# -----------------------------------------------------------------------------
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz
# -----------------------------------------------------------------------------
# 5. List all packages in the worker environment
# -----------------------------------------------------------------------------
echo "Packages installed in the worker environment:"
conda list
# -----------------------------------------------------------------------------
# 6. Configure Dask
#
# This isn't necessary, but for this particular bootstrap script it will make a
# few things easier:
#
# - Configure the cluster's dashboard link to show the proxied version through
# jupyter-server-proxy. This allows access to the dashboard with only an ssh
# tunnel to the notebook.
#
# - Specify the pre-packaged python environment, so users don't have to
#
# - Set the default deploy-mode to local, so the dashboard proxying works
#
# - Specify the location of the native libhdfs library so pyarrow can find it
# on the workers and the client (if submitting applications).
# ------------------------------------------------------------------------------
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
distributed:
dashboard:
link: "/proxy/{port}/status"
yarn:
environment: /home/hadoop/environment.tar.gz
deploy-mode: local
worker:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
client:
env:
ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
EOT
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc
# -----------------------------------------------------------------------------
# 8. Install jupyter notebook server and dependencies
#
# We do this after packaging the worker environments to keep the tar.gz as
# small as possible.
#
# We install the following packages:
#
# - notebook: the Jupyter Notebook Server
# - ipywidgets: used to provide an interactive UI for the YarnCluster objects
# - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server
# -----------------------------------------------------------------------------
echo "Installing Jupyter"
conda install \
-c conda-forge \
-y \
-q \
notebook \
ipywidgets \
jupyter-server-proxy \
jupyter
# -----------------------------------------------------------------------------
# 9. List all packages in the client environment
# -----------------------------------------------------------------------------
echo "Packages installed in the client environment:"
conda list
# -----------------------------------------------------------------------------
# 10. Configure Jupyter Notebook
# -----------------------------------------------------------------------------
echo "Configuring Jupyter"
mkdir -p $HOME/.jupyter
JUPYTER_PASSWORD="dask-user"
HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"`
cat <<EOF >> $HOME/.jupyter/jupyter_notebook_config.py
c.NotebookApp.password = u'$HASHED_PASSWORD'
c.NotebookApp.open_browser = False
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.port = 8888
EOF
# -----------------------------------------------------------------------------
# 11. Define an upstart service for the Jupyter Notebook Server
#
# This sets the notebook server up to properly run as a background service.
# -----------------------------------------------------------------------------
echo "Configuring Jupyter Notebook Upstart Service"
cat <<EOF > /tmp/jupyter-notebook.service
[Unit]
Description=Jupyter Notebook
[Service]
ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
Type=simple
PIDFile=/run/jupyter.pid
WorkingDirectory=$HOME
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/
sudo systemctl enable jupyter-notebook.service
# -----------------------------------------------------------------------------
# 12. Start the Jupyter Notebook Server
# -----------------------------------------------------------------------------
echo "Starting Jupyter Notebook Server"
sudo systemctl daemon-reload
sudo systemctl restart jupyter-notebook.service
#$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
after this i start jupyter notebook using $HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
jupyter notebook start successfully.
When i run this code on notebook
from dask_yarn import YarnCluster
from dask.distributed import Client
# Create a cluster
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
it gives error like
AttributeError Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)
File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
832 elif isinstance(getattr(address, "scheduler_address", None), str):
833 # It's a LocalCluster or LocalCluster-compatible object
834 self.cluster = address
--> 835 status = getattr(self.cluster, "status")
836 if status and status in [Status.closed, Status.closing]:
837 raise RuntimeError(
838 f"Trying to connect to an already closed or closing Cluster {self.cluster}."
839 )
AttributeError: 'YarnCluster' object has no attribute 'status'
Also when I use LocalCluster instead of YarnCluster it run perfectly. I am stuck here for days please help. Also how we configure worker nodes.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论