张量流TFX管道中的图像处理
我正在尝试使用MNIST数据集启动并运行一个TensorFlow TFX管道。
# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen
from platform import python_version
python_version() #'3.8.8'
# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()
设置管道路径
_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
!mkdir {_pipeline_root}
!mkdir {_data_root}
将数据写入TF.RECORD格式,并保存在eval和trains dirs中。请注意,MNIST数据以Numpy Array 28x28开头,并转换为bytestring,以使其能够作为TF.Record的一部分进行编码。
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))): # if value ist tensor
value = value.numpy() # get value of tensor
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_array(array):
array = tf.io.serialize_tensor(array)
return array
def image_label_to_tf_train(image, label):
image_shape = np.shape(image)
#define the dictionary -- the structure -- of our single example
data = {
'height': _int64_feature(image_shape[0]),
'width': _int64_feature(image_shape[1]),
'raw_image' : _bytes_feature(serialize_array(image)),
'label' : _int64_feature(label)
}
#create an Example, wrapping the single features
return tf.train.Example(features=tf.train.Features(feature=data))
def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
if not os.path.isdir(folder):
!mkdir {folder}
filename= folder + "/" + filename+".tfrecords"
writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
count = 0
for index in range(len(images)):
#get the data we want to write
current_image = images[index]
current_label = labels[index]
out = image_label_to_tf_train(image=current_image, label=current_label)
writer.write(out.SerializeToString())
count += 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
return count
下一个阶段是调用使用PreProcessing_FN的转换组件。此功能应处理所有数据,因此例如将图像阵列除以255是标准特征过程。但是图像仍然是一个差异,我一生都无法弄清楚如何将其变回阵列。以下是我尝试的。
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
# Initialize outputs dictionary
outputs = {}
raw_image_dataset = inputs[_IMAGE_KEY]
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)
return outputs
我会收到以下错误:
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
3 schema=schema_gen.outputs['schema'],
4 module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
61 # __IPYTHON__ variable is set by IPython, see
62 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63 return fn(*args, **kwargs)
64 else:
65 absl.logging.warning(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
181 telemetry_utils.LABEL_TFX_RUNNER: runner_label,
182 }):
--> 183 execution_id = launcher.launch().execution_id
184
185 return execution_result.ExecutionResult(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
198 # be immutable in this context.
199 # output_dict can still be changed, specifically properties.
--> 200 self._run_executor(execution_decision.execution_id,
201 copy.deepcopy(execution_decision.input_dict),
202 execution_decision.output_dict,
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
71 # be immutable in this context.
72 # output_dict can still be changed, specifically properties.
---> 73 executor.Do(
74 copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
581 # remove the `_pip_dependencies` attribute.
582 with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583 TransformProcessor().Transform(label_inputs, label_outputs, status_file)
584 logging.debug('Cleaning up temp path %s on executor success', temp_path)
585 io_utils.delete_dir(temp_path)
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
1114 materialization_format = (
1115 transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116 self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
1117 stats_options_updater_fn, force_tf_compat_v1,
1118 input_dataset_metadata, transform_output_path,
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
1496 for dataset in transform_data_list:
1497 infix = 'TransformIndex{}'.format(dataset.index)
-> 1498 (dataset.transformed
1499 | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
1500 self._RecordBatchToExamplesFn(transformed_schema_proto))
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
594 try:
595 if not exc_type:
--> 596 self.result = self.run()
597 self.result.wait_until_finish()
598 finally:
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
571 finally:
572 shutil.rmtree(tmpdir)
--> 573 return self.runner.run_pipeline(self, self._options)
574 finally:
575 if not is_in_ipython():
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
197 options.view_as(pipeline_options.ProfilingOptions))
198
--> 199 self._latest_run_result = self.run_via_runner_api(
200 pipeline.to_runner_api(default_environment=self._default_environment))
201 return self._latest_run_result
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
208 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
209 # the teststream (if any), and all the stages).
--> 210 return self.run_stages(stage_context, stages)
211
212 @contextlib.contextmanager
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
393 )
394
--> 395 stage_results = self._run_stage(
396 runner_execution_context, bundle_context_manager)
397
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
658 while True:
659 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660 self._run_bundle(
661 runner_execution_context,
662 bundle_context_manager,
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
781 expected_timer_output)
782
--> 783 result, splits = bundle_manager.process_bundle(
784 data_input, data_output, input_timers, expected_timer_output)
785 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1092 process_bundle_descriptor.id,
1093 cache_tokens=[next(self._cache_token_generator)]))
-> 1094 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1095
1096 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
578 if request_type:
579 # E.g. if register is set, this will call self.register(request.register))
--> 580 return getattr(self, request_type)(
581 getattr(request, request_type), request.instruction_id)
582 else:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
616 with self.maybe_profile(instruction_id):
617 delayed_applications, requests_finalization = (
--> 618 bundle_processor.process_bundle(instruction_id))
619 monitoring_infos = bundle_processor.monitoring_infos()
620 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
993 element.timer_family_id, timer_data)
994 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995 input_op_by_transform_id[element.transform_id].process_encoded(
996 element.data)
997
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
219 decoded_value = self.windowed_coder_impl.decode_from_stream(
220 input_stream, True)
--> 221 self.output(decoded_value)
222
223 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1635 from apache_beam.transforms.util import fn_takes_side_inputs
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
1639 wrapper = lambda x: [fn(x)]
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
661 """
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
665 baseline_analyzers_fingerprint,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
892 if not concrete_transform_fn.graph.get_collection(
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
896 tensor_replacement_map)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
804 evaluate_schema_overrides=True)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
808 concrete_metadata_fn,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
254 tensor_annotations, global_annotations = _get_schema_annotations_v2(
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
258 tensor_ranges,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
299 domains[name] = schema_pb2.IntDomain(
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
303
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
127 if is_evaluation_complete and any(
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
131 'from the batch dimension, all dimensions must have known size'
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']
我知道标签功能可以正常工作,因为我可以调用以下代码并获得打印件。...
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Decode the first record and print output
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)
如果删除行:
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
我被打印出来
features {
feature {
key: "label"
value {
int64_list {
value: 5
}
}
}
}
了工作,但我真的不知道如何改变图像字节。问题的一部分是我不完全确定格式是什么,因为它只是一个非常不透明的张量。看来我有效地在一系列数据列上操作标签操作,但再次无法弄清楚正确的操作或语法
I am trying to get a Tensorflow TFX pipeline up and running using the MNIST dataset.
# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen
from platform import python_version
python_version() #'3.8.8'
# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()
Setup pipeline paths
_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
!mkdir {_pipeline_root}
!mkdir {_data_root}
Write the data to TF.record format and save in eval and train dirs. NOTE that the MNIST data starts as a numpy array 28x28 and is converted to a bytestring to enable it to be encoded as part of the Tf.record.
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))): # if value ist tensor
value = value.numpy() # get value of tensor
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_array(array):
array = tf.io.serialize_tensor(array)
return array
def image_label_to_tf_train(image, label):
image_shape = np.shape(image)
#define the dictionary -- the structure -- of our single example
data = {
'height': _int64_feature(image_shape[0]),
'width': _int64_feature(image_shape[1]),
'raw_image' : _bytes_feature(serialize_array(image)),
'label' : _int64_feature(label)
}
#create an Example, wrapping the single features
return tf.train.Example(features=tf.train.Features(feature=data))
def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
if not os.path.isdir(folder):
!mkdir {folder}
filename= folder + "/" + filename+".tfrecords"
writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
count = 0
for index in range(len(images)):
#get the data we want to write
current_image = images[index]
current_label = labels[index]
out = image_label_to_tf_train(image=current_image, label=current_label)
writer.write(out.SerializeToString())
count += 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
return count
The next stage is to call the transform component which uses the preprocessing_fn. This function should process all the data so for example divide the image array by 255 is a standard feature process. But the image is still as a bytestring and I can't for the life of me figure out how to turn it back into an array. The below is what I have tried.
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
# Initialize outputs dictionary
outputs = {}
raw_image_dataset = inputs[_IMAGE_KEY]
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)
return outputs
I get the following error:
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
3 schema=schema_gen.outputs['schema'],
4 module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
61 # __IPYTHON__ variable is set by IPython, see
62 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63 return fn(*args, **kwargs)
64 else:
65 absl.logging.warning(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
181 telemetry_utils.LABEL_TFX_RUNNER: runner_label,
182 }):
--> 183 execution_id = launcher.launch().execution_id
184
185 return execution_result.ExecutionResult(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
198 # be immutable in this context.
199 # output_dict can still be changed, specifically properties.
--> 200 self._run_executor(execution_decision.execution_id,
201 copy.deepcopy(execution_decision.input_dict),
202 execution_decision.output_dict,
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
71 # be immutable in this context.
72 # output_dict can still be changed, specifically properties.
---> 73 executor.Do(
74 copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
581 # remove the `_pip_dependencies` attribute.
582 with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583 TransformProcessor().Transform(label_inputs, label_outputs, status_file)
584 logging.debug('Cleaning up temp path %s on executor success', temp_path)
585 io_utils.delete_dir(temp_path)
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
1114 materialization_format = (
1115 transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116 self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
1117 stats_options_updater_fn, force_tf_compat_v1,
1118 input_dataset_metadata, transform_output_path,
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
1496 for dataset in transform_data_list:
1497 infix = 'TransformIndex{}'.format(dataset.index)
-> 1498 (dataset.transformed
1499 | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
1500 self._RecordBatchToExamplesFn(transformed_schema_proto))
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
594 try:
595 if not exc_type:
--> 596 self.result = self.run()
597 self.result.wait_until_finish()
598 finally:
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
571 finally:
572 shutil.rmtree(tmpdir)
--> 573 return self.runner.run_pipeline(self, self._options)
574 finally:
575 if not is_in_ipython():
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
197 options.view_as(pipeline_options.ProfilingOptions))
198
--> 199 self._latest_run_result = self.run_via_runner_api(
200 pipeline.to_runner_api(default_environment=self._default_environment))
201 return self._latest_run_result
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
208 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
209 # the teststream (if any), and all the stages).
--> 210 return self.run_stages(stage_context, stages)
211
212 @contextlib.contextmanager
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
393 )
394
--> 395 stage_results = self._run_stage(
396 runner_execution_context, bundle_context_manager)
397
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
658 while True:
659 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660 self._run_bundle(
661 runner_execution_context,
662 bundle_context_manager,
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
781 expected_timer_output)
782
--> 783 result, splits = bundle_manager.process_bundle(
784 data_input, data_output, input_timers, expected_timer_output)
785 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1092 process_bundle_descriptor.id,
1093 cache_tokens=[next(self._cache_token_generator)]))
-> 1094 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1095
1096 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
578 if request_type:
579 # E.g. if register is set, this will call self.register(request.register))
--> 580 return getattr(self, request_type)(
581 getattr(request, request_type), request.instruction_id)
582 else:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
616 with self.maybe_profile(instruction_id):
617 delayed_applications, requests_finalization = (
--> 618 bundle_processor.process_bundle(instruction_id))
619 monitoring_infos = bundle_processor.monitoring_infos()
620 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
993 element.timer_family_id, timer_data)
994 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995 input_op_by_transform_id[element.transform_id].process_encoded(
996 element.data)
997
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
219 decoded_value = self.windowed_coder_impl.decode_from_stream(
220 input_stream, True)
--> 221 self.output(decoded_value)
222
223 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1635 from apache_beam.transforms.util import fn_takes_side_inputs
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
1639 wrapper = lambda x: [fn(x)]
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
661 """
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
665 baseline_analyzers_fingerprint,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
892 if not concrete_transform_fn.graph.get_collection(
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
896 tensor_replacement_map)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
804 evaluate_schema_overrides=True)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
808 concrete_metadata_fn,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
254 tensor_annotations, global_annotations = _get_schema_annotations_v2(
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
258 tensor_ranges,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
299 domains[name] = schema_pb2.IntDomain(
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
303
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
127 if is_evaluation_complete and any(
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
131 'from the batch dimension, all dimensions must have known size'
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']
I know the label feature is working as I can call the below code and get a print as so....
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Decode the first record and print output
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)
IF I remove the lines:
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
I get printed
features {
feature {
key: "label"
value {
int64_list {
value: 5
}
}
}
}
This shows what I am doing to the label feature is working but I really can't figure how to transform the image bytes. Part of the issue is I'm not completely sure what the format is as it's just a tensor which is pretty opaque. It seems given the label operation I'm operating on a column of data effectively but again, can't figure the correct operation or syntax
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
对于任何未来的观众来说,这都可以
For any future viewers this works
因此,我认为我使用
有关数据作为批次进行绘制并使用结果张量“ x [0]”的正确组件进行绘制的问题解决了问题,我仍然不确定100%确定为什么要这样做是这种情况,但似乎正在运行。
现在我正在与TFX斗争,因为它不允许我输出与所发生的内容不同的功能...
So I think I solved this using
Theres something about the data going in as a batch so needing to map it and also using the right component of the resulting tensor "x[0]", I'm still not 100% sure on why this is the case but it seems to run.
Now I'm struggling with TFX as it won't let me output features that are different to the what went in...
我将使用Keras子分类API解决此问题,并取消张紧器,解码它们并将它们堆叠在一起。我必须在工作中做这样的事情,所以相信这种方法会起作用。
I would solve this using Keras Subclassing API and unstacking the tensors, decoding them, and stacking them back together. I had to do something like this at work, so am confident the approach will work.