json.decoder.jsondecodeerror迁移到kubeflow管道v2

发布于 2025-01-24 18:09:28 字数 9273 浏览 3 评论 0原文

从这里复制: https://github.com/kube.com/kubeflow/pipeline/pipelines/sissues/7608

我有一个针对KubeFlow运行的生成的代码文件。它在Kubeflow V1上运行良好,现在我将其移至Kubeflow V2。当我这样做时,我会收到以下错误: JSON.DECODER.JSONDECODEERROR:期望在双引号中包含的属性名称:第1列第2列(char 1)

我甚至不知道下一步去哪里。感觉就像是在第一个字符中失败的东西从根本上破裂了,但我看不到它(它在KubeFlow执行中)。

谢谢!


环境

  • 您是如何部署KubeFlow管道(KFP)的? 标准部署到AWS

  • KFP版本: 1.8.1

  • kfp SDK版本: 1.8.12

这是日志:

time="2022-04-26T17:38:09.547Z" level=info msg="capturing logs" argo=true
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: 
https://pip.pypa.io/warnings/venv
[KFP Executor 2022-04-26 17:38:24,691 INFO]: Looking for component `run_info_fn` in --component_module_path `/tmp/tmp.NJW6PWXpIt/ephemeral_component.py`
[KFP Executor 2022-04-26 17:38:24,691 INFO]: Loading KFP component "run_info_fn" from /tmp/tmp.NJW6PWXpIt/ephemeral_component.py (directory "/tmp/tmp.NJW6PWXpIt" and module name "ephemeral_component")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 104, in <module>
    executor_main()
  File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 94, in executor_main
    executor_input = json.loads(args.executor_input)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
time="2022-04-26T17:38:24.803Z" level=error msg="cannot save artifact /tmp/outputs/run_info/data" argo=true error="stat /tmp/outputs/run_info/data: no such file or directory"
Error: exit status 1

以下是repro的文件: root_pipeline_04d99580c84b47c28405a2c8bcae8703.py

import kfp.v2.components
from kfp.v2.dsl import InputPath
from kubernetes.client.models import V1EnvVar
from kubernetes import client, config
from typing import NamedTuple
from base64 import b64encode
import kfp.v2.dsl as dsl
import kubernetes
import json
import kfp

from run_info import run_info_fn
from same_step_000_ce6494722c474dd3b8bef482bb976557 import same_step_000_ce6494722c474dd3b8bef482bb976557_fn


run_info_comp = kfp.v2.dsl.component(
    func=run_info_fn,
    packages_to_install=[
        "kfp",
        "dill",
    ],
)

same_step_000_ce6494722c474dd3b8bef482bb976557_comp = kfp.v2.dsl.component(
    func=same_step_000_ce6494722c474dd3b8bef482bb976557_fn,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/codeserver-python:v1.5.0",
    packages_to_install=[
        "dill",
        "requests",
         # TODO: make this a loop
    ],
)

@kfp.dsl.pipeline(name="root_pipeline_compilation",)
def root(
    context: str='', metadata_url: str='',
):
    # Generate secrets (if not already created)
    secrets_by_env = {}

    env_vars = {
    }

    run_info = run_info_comp(run_id=kfp.dsl.RUN_ID_PLACEHOLDER)


    same_step_000_ce6494722c474dd3b8bef482bb976557 = same_step_000_ce6494722c474dd3b8bef482bb976557_comp(
        input_context_path="",
        run_info=run_info.outputs["run_info"],
        metadata_url=metadata_url
    )

    same_step_000_ce6494722c474dd3b8bef482bb976557.execution_options.caching_strategy.max_cache_staleness = "P0D"
    for k in env_vars:
        same_step_000_ce6494722c474dd3b8bef482bb976557.add_env_variable(V1EnvVar(name=k, value=env_vars[k]))

run_info.py

"""
The run_info component fetches metadata about the current pipeline execution
from kubeflow and passes it on to the user code step components.
"""
from typing import NamedTuple


def run_info_fn(
    run_id: str,
) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
    from base64 import urlsafe_b64encode
    from collections import namedtuple
    import datetime
    import base64
    import dill
    import kfp

    client = kfp.Client(host="http://ml-pipeline:8888")
    run_info = client.get_run(run_id=run_id)

    run_info_dict = {
        "run_id": run_info.run.id,
        "name": run_info.run.name,
        "created_at": run_info.run.created_at.isoformat(),
        "pipeline_id": run_info.run.pipeline_spec.pipeline_id,
    }

    # Track kubernetes resources associated wth the run.
    for r in run_info.run.resource_references:
        run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id

    # Base64-encoded as value is visible in kubeflow ui.
    output = urlsafe_b64encode(dill.dumps(run_info_dict))

    return namedtuple("RunInfoOutput", ["run_info"])(
        str(output, encoding="ascii")
    )

same_step_000_000_ce6494722c47474dd3b8bef482bbbbbbbb97657.py

import kfp
from kfp.v2.dsl import component, Artifact, Input, InputPath, Output, OutputPath, Dataset, Model
from typing import NamedTuple


def same_step_000_ce6494722c474dd3b8bef482bb976557_fn(
    input_context_path: InputPath(str),
    output_context_path: OutputPath(str),
    run_info: str = "gAR9lC4=",
    metadata_url: str = "",
):
    from base64 import urlsafe_b64encode, urlsafe_b64decode
    from pathlib import Path
    import datetime
    import requests
    import tempfile
    import dill
    import os

    input_context = None
    with Path(input_context_path).open("rb") as reader:
        input_context = reader.read()

    # Helper function for posting metadata to mlflow.
    def post_metadata(json):
        if metadata_url == "":
            return

        try:
            req = requests.post(metadata_url, json=json)
            req.raise_for_status()
        except requests.exceptions.HTTPError as err:
            print(f"Error posting metadata: {err}")

    # Move to writable directory as user might want to do file IO.
    # TODO: won't persist across steps, might need support in SDK?
    os.chdir(tempfile.mkdtemp())

    # Load information about the current experiment run:
    run_info = dill.loads(urlsafe_b64decode(run_info))

    # Post session context to mlflow.
    if len(input_context) > 0:
        input_context_str = urlsafe_b64encode(input_context)
        post_metadata(
            {
                "experiment_id": run_info["experiment_id"],
                "run_id": run_info["run_id"],
                "step_id": "same_step_000",
                "metadata_type": "input",
                "metadata_value": input_context_str,
                "metadata_time": datetime.datetime.now().isoformat(),
            }
        )

    # User code for step, which we run in its own execution frame.
    user_code = f"""
import dill

# Load session context into global namespace:
if { len(input_context) } > 0:
    dill.load_session("{ input_context_path }")

{dill.loads(urlsafe_b64decode("gASVGAAAAAAAAACMFHByaW50KCJIZWxsbyB3b3JsZCIplC4="))}

# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
    try:
        dill.dumps(globals()[k])
    except TypeError:
        _bad_keys.append(k)

for k in _bad_keys:
    del globals()[k]

# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""

    # Runs the user code in a new execution frame. Context from the previous
    # component in the run is loaded into the session dynamically, and we run
    # with a single globals() namespace to simulate top-level execution.
    exec(user_code, globals(), globals())

    # Post new session context to mlflow:
    with Path(output_context_path).open("rb") as reader:
        context = urlsafe_b64encode(reader.read())
        post_metadata(
            {
                "experiment_id": run_info["experiment_id"],
                "run_id": run_info["run_id"],
                "step_id": "same_step_000",
                "metadata_type": "output",
                "metadata_value": context,
                "metadata_time": datetime.datetime.now().isoformat(),
            }
        )

Python文件

from sameproject.ops import helpers
from pathlib import Path
import importlib
import kfp


def deploy(compiled_path: Path, root_module_name: str):
    with helpers.add_path(str(compiled_path)):
        kfp_client = kfp.Client()  # only supporting 'kubeflow' namespace
        root_module = importlib.import_module(root_module_name)

        return kfp_client.create_run_from_pipeline_func(
            root_module.root,
            arguments={},
        )

Copied from here: https://github.com/kubeflow/pipelines/issues/7608

I have a generated code file that runs against Kubeflow. It ran fine on Kubeflow v1, and now I'm moving it to Kubeflow v2. When I do this, I get the following error:
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)

I honestly don't even know where to go next. It feels like something is fundamentally broken for something to fail in the first character, but I can't see it (it's inside the kubeflow execution).

Thanks!


Environment

  • How did you deploy Kubeflow Pipelines (KFP)?
    Standard deployment to AWS

  • KFP version:
    1.8.1

  • KFP SDK version:
    1.8.12

Here's the logs:

time="2022-04-26T17:38:09.547Z" level=info msg="capturing logs" argo=true
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: 
https://pip.pypa.io/warnings/venv
[KFP Executor 2022-04-26 17:38:24,691 INFO]: Looking for component `run_info_fn` in --component_module_path `/tmp/tmp.NJW6PWXpIt/ephemeral_component.py`
[KFP Executor 2022-04-26 17:38:24,691 INFO]: Loading KFP component "run_info_fn" from /tmp/tmp.NJW6PWXpIt/ephemeral_component.py (directory "/tmp/tmp.NJW6PWXpIt" and module name "ephemeral_component")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 104, in <module>
    executor_main()
  File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 94, in executor_main
    executor_input = json.loads(args.executor_input)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
time="2022-04-26T17:38:24.803Z" level=error msg="cannot save artifact /tmp/outputs/run_info/data" argo=true error="stat /tmp/outputs/run_info/data: no such file or directory"
Error: exit status 1

Here's the files to repro:
root_pipeline_04d99580c84b47c28405a2c8bcae8703.py

import kfp.v2.components
from kfp.v2.dsl import InputPath
from kubernetes.client.models import V1EnvVar
from kubernetes import client, config
from typing import NamedTuple
from base64 import b64encode
import kfp.v2.dsl as dsl
import kubernetes
import json
import kfp

from run_info import run_info_fn
from same_step_000_ce6494722c474dd3b8bef482bb976557 import same_step_000_ce6494722c474dd3b8bef482bb976557_fn


run_info_comp = kfp.v2.dsl.component(
    func=run_info_fn,
    packages_to_install=[
        "kfp",
        "dill",
    ],
)

same_step_000_ce6494722c474dd3b8bef482bb976557_comp = kfp.v2.dsl.component(
    func=same_step_000_ce6494722c474dd3b8bef482bb976557_fn,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/codeserver-python:v1.5.0",
    packages_to_install=[
        "dill",
        "requests",
         # TODO: make this a loop
    ],
)

@kfp.dsl.pipeline(name="root_pipeline_compilation",)
def root(
    context: str='', metadata_url: str='',
):
    # Generate secrets (if not already created)
    secrets_by_env = {}

    env_vars = {
    }

    run_info = run_info_comp(run_id=kfp.dsl.RUN_ID_PLACEHOLDER)


    same_step_000_ce6494722c474dd3b8bef482bb976557 = same_step_000_ce6494722c474dd3b8bef482bb976557_comp(
        input_context_path="",
        run_info=run_info.outputs["run_info"],
        metadata_url=metadata_url
    )

    same_step_000_ce6494722c474dd3b8bef482bb976557.execution_options.caching_strategy.max_cache_staleness = "P0D"
    for k in env_vars:
        same_step_000_ce6494722c474dd3b8bef482bb976557.add_env_variable(V1EnvVar(name=k, value=env_vars[k]))

run_info.py

"""
The run_info component fetches metadata about the current pipeline execution
from kubeflow and passes it on to the user code step components.
"""
from typing import NamedTuple


def run_info_fn(
    run_id: str,
) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
    from base64 import urlsafe_b64encode
    from collections import namedtuple
    import datetime
    import base64
    import dill
    import kfp

    client = kfp.Client(host="http://ml-pipeline:8888")
    run_info = client.get_run(run_id=run_id)

    run_info_dict = {
        "run_id": run_info.run.id,
        "name": run_info.run.name,
        "created_at": run_info.run.created_at.isoformat(),
        "pipeline_id": run_info.run.pipeline_spec.pipeline_id,
    }

    # Track kubernetes resources associated wth the run.
    for r in run_info.run.resource_references:
        run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id

    # Base64-encoded as value is visible in kubeflow ui.
    output = urlsafe_b64encode(dill.dumps(run_info_dict))

    return namedtuple("RunInfoOutput", ["run_info"])(
        str(output, encoding="ascii")
    )

same_step_000_ce6494722c474dd3b8bef482bb976557.py

import kfp
from kfp.v2.dsl import component, Artifact, Input, InputPath, Output, OutputPath, Dataset, Model
from typing import NamedTuple


def same_step_000_ce6494722c474dd3b8bef482bb976557_fn(
    input_context_path: InputPath(str),
    output_context_path: OutputPath(str),
    run_info: str = "gAR9lC4=",
    metadata_url: str = "",
):
    from base64 import urlsafe_b64encode, urlsafe_b64decode
    from pathlib import Path
    import datetime
    import requests
    import tempfile
    import dill
    import os

    input_context = None
    with Path(input_context_path).open("rb") as reader:
        input_context = reader.read()

    # Helper function for posting metadata to mlflow.
    def post_metadata(json):
        if metadata_url == "":
            return

        try:
            req = requests.post(metadata_url, json=json)
            req.raise_for_status()
        except requests.exceptions.HTTPError as err:
            print(f"Error posting metadata: {err}")

    # Move to writable directory as user might want to do file IO.
    # TODO: won't persist across steps, might need support in SDK?
    os.chdir(tempfile.mkdtemp())

    # Load information about the current experiment run:
    run_info = dill.loads(urlsafe_b64decode(run_info))

    # Post session context to mlflow.
    if len(input_context) > 0:
        input_context_str = urlsafe_b64encode(input_context)
        post_metadata(
            {
                "experiment_id": run_info["experiment_id"],
                "run_id": run_info["run_id"],
                "step_id": "same_step_000",
                "metadata_type": "input",
                "metadata_value": input_context_str,
                "metadata_time": datetime.datetime.now().isoformat(),
            }
        )

    # User code for step, which we run in its own execution frame.
    user_code = f"""
import dill

# Load session context into global namespace:
if { len(input_context) } > 0:
    dill.load_session("{ input_context_path }")

{dill.loads(urlsafe_b64decode("gASVGAAAAAAAAACMFHByaW50KCJIZWxsbyB3b3JsZCIplC4="))}

# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
    try:
        dill.dumps(globals()[k])
    except TypeError:
        _bad_keys.append(k)

for k in _bad_keys:
    del globals()[k]

# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""

    # Runs the user code in a new execution frame. Context from the previous
    # component in the run is loaded into the session dynamically, and we run
    # with a single globals() namespace to simulate top-level execution.
    exec(user_code, globals(), globals())

    # Post new session context to mlflow:
    with Path(output_context_path).open("rb") as reader:
        context = urlsafe_b64encode(reader.read())
        post_metadata(
            {
                "experiment_id": run_info["experiment_id"],
                "run_id": run_info["run_id"],
                "step_id": "same_step_000",
                "metadata_type": "output",
                "metadata_value": context,
                "metadata_time": datetime.datetime.now().isoformat(),
            }
        )

Python file to execute to run:

from sameproject.ops import helpers
from pathlib import Path
import importlib
import kfp


def deploy(compiled_path: Path, root_module_name: str):
    with helpers.add_path(str(compiled_path)):
        kfp_client = kfp.Client()  # only supporting 'kubeflow' namespace
        root_module = importlib.import_module(root_module_name)

        return kfp_client.create_run_from_pipeline_func(
            root_module.root,
            arguments={},
        )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

暮光沉寂 2025-01-31 18:09:28

事实证明,这与不使用正确的执行模式编译有关。

如果您得到了这个,那么您的代码应该看起来像这样。

Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(pipeline_func=root_module.root, package_path=str(package_yaml_path))

Turns out it has to do with not compiling with the right execution mode on.

If you're getting this, your code should look like this.

Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(pipeline_func=root_module.root, package_path=str(package_yaml_path))

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文