在Google Colab笔记本中,我正在开发一个项目,其中我尝试将KERAS顺序模型扩展到Pyspark环境中。
首先,我开发了一个CNN模型,该模型从Kaggle数据集(2个具有20.000 .jpg文件的文件夹)对真实面孔和漫画面进行了分类。可以在此处下载ZIP文件:“!kaggle数据集下载-D Defileroff/comic-faces-paired-synthetic-v2”
第二,我在tf.estimator中转换了CNN模型,并遵循指南的所有步骤( https://github.com/yahoo/yahoo/tensorflownspark/wiki/wiki/wiki/conversion-guide-guide )为了在pyspark中晒太阳。
估算器正常工作,直到我尝试以tfparallel.run(** kargs)命令将其引入,该命令需要以前的argparse()函数。
“用法:ipykernel_launcher.py [-h] [-cluster_size cluster_size]
[-num_ps num_ps] [-tensorboard]
def inference():
if __name__ == '__main__':
#Start Spark session and context
spark = SparkSession.builder.appName("Pyspark on Google Colab")
sc = SparkContext(conf=SparkConf().setAppName("TFOS"))
conf = SparkConf().setMaster("local[*]").setAppName("Colab")
executors = sc._conf.get("spark.executor.instances")
#Define parameters to parse
num_executors = int(executors) if executors is not None else 1
#Define Parser
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark
Standalone)", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of parameter servers", type=int,
args = parser.parse_args()
#define the CNN Keras sequential model and compile it.
cnn = tf.keras.models.Sequential()
cnn.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])
#Convert the CNN model in a tf.estimator.train_and_evaluate
from keras.preprocessing.image import ImageDataGenerator
# create generator object
datagen = ImageDataGenerator(
#define train test input function
def train_input_fn():
val_it = datagen.flow_from_directory(
return features, labels
#define validation test input function
def eval_input_fn():
val_it = datagen.flow_from_directory(
return features, labels
#define the estimator
import tempfile
model_dir = tempfile.mkdtemp()
keras_estimator = tf.keras.estimator.model_to_estimator(
keras_model=cnn, model_dir=model_dir)
#Train and evaluate the estimator
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=1000)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
tf.estimator.train_and_evaluate(keras_estimator, train_spec, eval_spec)
#define parallel run of estimator in Spark environment
TFParallel.run(sc, inference, args, args.cluster_size, use_barrier=False)
In Google Colab notebook, I'm developing a project in which I try to scale up my Keras sequential model into Pyspark environment.
At first I developed and tested a CNN model that classifies real faces and comics faces from a Kaggle dataset (2 folders with 20.000 .jpg files). The zip file can be downloaded here: "! kaggle datasets download -d defileroff/comic-faces-paired-synthetic-v2"
Secondly I converted the CNN model in a tf.estimator and followed all the steps from the guide (https://github.com/yahoo/TensorFlowOnSpark/wiki/Conversion-Guide) in order to sun my estimator in Pyspark.
The the estimator works correctly until I try to introduce it in a TFParallel.run(**kargs) command for which a previous argparse() function is required.
The error I recieve is:
"usage: ipykernel_launcher.py [-h] [--cluster_size CLUSTER_SIZE]
[--num_ps NUM_PS] [--tensorboard]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-7aa3f316-ee26-49e8-9d72-7814d9a48255.json
An exception has occurred, use %tb to see the full traceback.
SystemExit: 2"
It looks like there is a problem with argparse() function.
Very Briefly, the code is available here (https://colab.research.google.com/github/cosimo-schiavoni/Massive_Data_Project/blob/main/Downloads_TFOS_ERROR.ipynb) and the structure is:
#Import Libraries
#def inference function: the function takes all the code indented, including the TFParallel() command.
def inference():
if __name__ == '__main__':
#Start Spark session and context
spark = SparkSession.builder.appName("Pyspark on Google Colab")
sc = SparkContext(conf=SparkConf().setAppName("TFOS"))
conf = SparkConf().setMaster("local[*]").setAppName("Colab")
executors = sc._conf.get("spark.executor.instances")
#Define parameters to parse
num_executors = int(executors) if executors is not None else 1
#Define Parser
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for Spark
Standalone)", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of parameter servers", type=int,
args = parser.parse_args()
#define the CNN Keras sequential model and compile it.
cnn = tf.keras.models.Sequential()
cnn.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])
#Convert the CNN model in a tf.estimator.train_and_evaluate
from keras.preprocessing.image import ImageDataGenerator
# create generator object
datagen = ImageDataGenerator(
#define train test input function
def train_input_fn():
val_it = datagen.flow_from_directory(
return features, labels
#define validation test input function
def eval_input_fn():
val_it = datagen.flow_from_directory(
return features, labels
#define the estimator
import tempfile
model_dir = tempfile.mkdtemp()
keras_estimator = tf.keras.estimator.model_to_estimator(
keras_model=cnn, model_dir=model_dir)
#Train and evaluate the estimator
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=1000)
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
tf.estimator.train_and_evaluate(keras_estimator, train_spec, eval_spec)
#define parallel run of estimator in Spark environment
TFParallel.run(sc, inference, args, args.cluster_size, use_barrier=False)
#call inference function and activate the code
Can anybody help me with this issue?
Moreover I have doubts about the configuration of Spark Session, is it correctly configured?
Is there a way tho knwow if I have a cluster or just a single device?
Can I know the number of active workers?
Thank you in advance.