如何在Google colab中进行argparse()? (TensorFlowonSpark应用程序)

发布于 2025-01-30 15:42:19 字数 3651 浏览 3 评论 0 原文

在Google Colab笔记本中,我正在开发一个项目,其中我尝试将KERAS顺序模型扩展到Pyspark环境中。

  1. 首先,我开发了一个CNN模型,该模型从Kaggle数据集(2个具有20.000 .jpg文件的文件夹)对真实面孔和漫画面进行了分类。可以在此处下载ZIP文件:“!kaggle数据集下载-D Defileroff/comic-faces-paired-synthetic-v2”

  2. 第二,我在tf.estimator中转换了CNN模型,并遵循指南的所有步骤( https://github.com/yahoo/yahoo/tensorflownspark/wiki/wiki/wiki/conversion-guide-guide )为了在pyspark中晒太阳。

估算器正常工作,直到我尝试以tfparallel.run(** kargs)命令将其引入,该命令需要以前的argparse()函数。

我收到的错误是: “用法:ipykernel_launcher.py [-h] [-cluster_size cluster_size] [-num_ps num_ps] [-tensorboard] ipykernel_launcher.py:错误:未识别的参数:-f/root/.local/share/jupyter/jupyter/runtime/kernel-7aa3f316-ee26-49e8-9e8-9d72-781429a48255.json 发生了例外,使用%结核病查看完整的追溯。 systemexit:2“

看起来有一个问题,ArgParse()函数。

简短,非常简短,该代码可在此处可用(https://colab.research.google.com/github/cosimo-schiavoni/Massive_Data_Project/blob/main/Downloads_TFOS_ERROR.ipynb)和结构是:

#import库 ...

#DEF推理函数:该函数将所有代码缩进,包括tfparallel()命令。

def inference(): 





if __name__ == '__main__':
  #Start Spark session and context
  spark = SparkSession.builder.appName("Pyspark on Google Colab")
  sc = SparkContext(conf=SparkConf().setAppName("TFOS"))
  conf = SparkConf().setMaster("local[*]").setAppName("Colab")
  executors = sc._conf.get("spark.executor.instances")
  #Define parameters to parse
  num_executors = int(executors) if executors is not None else 1
  num_ps=1

  #Define Parser
  parser = argparse.ArgumentParser()
  parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark 
  Standalone)", type=int, default=num_executors)
  parser.add_argument("--num_ps", help="number of parameter servers", type=int, 
  default=num_ps)
  args = parser.parse_args()

  #define the CNN Keras sequential model and compile it.
  cnn = tf.keras.models.Sequential()

  ...

  cnn.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

  #Convert the CNN model in a tf.estimator.train_and_evaluate
  from keras.preprocessing.image import ImageDataGenerator
  # create generator object
  datagen = ImageDataGenerator(
  rescale=1./255,
  validation_split=0.2)

  #define train test input function
  @tf.function     
  def train_input_fn():
  val_it = datagen.flow_from_directory(
  ...)
  return features, labels
    
  #define validation test input function
  @tf.function 
  def eval_input_fn():
    val_it = datagen.flow_from_directory(
    ...)
    return features, labels

  #define the estimator
  import tempfile
  model_dir = tempfile.mkdtemp()
  keras_estimator = tf.keras.estimator.model_to_estimator(
  keras_model=cnn, model_dir=model_dir)

#Train and evaluate the estimator
  train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=1000)
  eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)

  tf.estimator.train_and_evaluate(keras_estimator, train_spec, eval_spec)

  #define parallel run of estimator in Spark environment
  
 #TFCluster.run(sc,main_fun,args,args.cluster_size,args.num_ps,TFCluster.InputMode.TENSORFLOW)
  TFParallel.run(sc, inference, args, args.cluster_size, use_barrier=False)

#call推理功能并激活代码,

inference()

任何人都可以帮助我解决这个问题吗?

此外,我对Spark会话的配置有疑问,是否正确配置了? 如果我有一个群集或一个设备,有什么办法吗? 我可以知道活跃工人的数量吗?

先感谢您。

In Google Colab notebook, I'm developing a project in which I try to scale up my Keras sequential model into Pyspark environment.

  1. At first I developed and tested a CNN model that classifies real faces and comics faces from a Kaggle dataset (2 folders with 20.000 .jpg files). The zip file can be downloaded here: "! kaggle datasets download -d defileroff/comic-faces-paired-synthetic-v2"

  2. Secondly I converted the CNN model in a tf.estimator and followed all the steps from the guide (https://github.com/yahoo/TensorFlowOnSpark/wiki/Conversion-Guide) in order to sun my estimator in Pyspark.

The the estimator works correctly until I try to introduce it in a TFParallel.run(**kargs) command for which a previous argparse() function is required.

The error I recieve is:
"usage: ipykernel_launcher.py [-h] [--cluster_size CLUSTER_SIZE]
[--num_ps NUM_PS] [--tensorboard]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-7aa3f316-ee26-49e8-9d72-7814d9a48255.json
An exception has occurred, use %tb to see the full traceback.
SystemExit: 2"

It looks like there is a problem with argparse() function.

Very Briefly, the code is available here (https://colab.research.google.com/github/cosimo-schiavoni/Massive_Data_Project/blob/main/Downloads_TFOS_ERROR.ipynb) and the structure is:

#Import Libraries
...

#def inference function: the function takes all the code indented, including the TFParallel() command.

def inference(): 





if __name__ == '__main__':
  #Start Spark session and context
  spark = SparkSession.builder.appName("Pyspark on Google Colab")
  sc = SparkContext(conf=SparkConf().setAppName("TFOS"))
  conf = SparkConf().setMaster("local[*]").setAppName("Colab")
  executors = sc._conf.get("spark.executor.instances")
  #Define parameters to parse
  num_executors = int(executors) if executors is not None else 1
  num_ps=1

  #Define Parser
  parser = argparse.ArgumentParser()
  parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark 
  Standalone)", type=int, default=num_executors)
  parser.add_argument("--num_ps", help="number of parameter servers", type=int, 
  default=num_ps)
  args = parser.parse_args()

  #define the CNN Keras sequential model and compile it.
  cnn = tf.keras.models.Sequential()

  ...

  cnn.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

  #Convert the CNN model in a tf.estimator.train_and_evaluate
  from keras.preprocessing.image import ImageDataGenerator
  # create generator object
  datagen = ImageDataGenerator(
  rescale=1./255,
  validation_split=0.2)

  #define train test input function
  @tf.function     
  def train_input_fn():
  val_it = datagen.flow_from_directory(
  ...)
  return features, labels
    
  #define validation test input function
  @tf.function 
  def eval_input_fn():
    val_it = datagen.flow_from_directory(
    ...)
    return features, labels

  #define the estimator
  import tempfile
  model_dir = tempfile.mkdtemp()
  keras_estimator = tf.keras.estimator.model_to_estimator(
  keras_model=cnn, model_dir=model_dir)

#Train and evaluate the estimator
  train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=1000)
  eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)

  tf.estimator.train_and_evaluate(keras_estimator, train_spec, eval_spec)

  #define parallel run of estimator in Spark environment
  
 #TFCluster.run(sc,main_fun,args,args.cluster_size,args.num_ps,TFCluster.InputMode.TENSORFLOW)
  TFParallel.run(sc, inference, args, args.cluster_size, use_barrier=False)

#call inference function and activate the code

inference()

Can anybody help me with this issue?

Moreover I have doubts about the configuration of Spark Session, is it correctly configured?
Is there a way tho knwow if I have a cluster or just a single device?
Can I know the number of active workers?

Thank you in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文