获取此错误,google.api_core.exceptions.invalidargument:400请求包含一个无效的参数。来自Google BigQuery Storage Write API

发布于 2025-01-30 10:38:07 字数 9672 浏览 1 评论 0原文

我正在尝试获取大量数据(数百万),并且在运行代码时会遇到以下错误。如果我以较小的范围运行相同的代码(确切的范围为2),则可以成功运行。请帮助我知道这是我的问题还是来自API方面的问题,

谢谢

DEBUG:google.api_core.bidi:Started helper thread Thread-ConsumeBidirectionalStream
DEBUG:google.api_core.bidi:Thread-ConsumeBidirectionalStream caught error 400 Request contains an invalid argument. and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.
Traceback (most recent call last):
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 147, in error_remapped_callable
    return _StreamingResponseIterator(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 73, in __init__
    self._stored_first_result = next(self._wrapped)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "Request contains an invalid argument."
        debug_error_string = "{"created":"@1652904360.179503883","description":"Error received from peer ipv4:173.194.76.95:443","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Request contains an invalid argument.","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/bidi.py", line 636, in _thread_main
    self._bidi_rpc.open()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/bidi.py", line 279, in open
    call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/client.py", line 678, in append_rows
    response = rpc(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 154, in __call__
    return wrapped_func(*args, **kwargs)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 151, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Request contains an invalid argument.
INFO:google.api_core.bidi:Thread-ConsumeBidirectionalStream exiting
DEBUG:google.cloud.bigquery_storage_v1.writer:Finished stopping manager.
Traceback (most recent call last):
  File "write_data_to_db2.py", line 207, in <module>
    p.append_rows_pending(project_id='dwingestion', dataset_id='ke',
  File "write_data_to_db2.py", line 188, in append_rows_pending
    response_future_1 = append_rows_stream.send(request)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 234, in send
    return self._open(request)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 207, in _open
    raise request_exception
google.api_core.exceptions.Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to see the error.

我的代码的摘要

# PULLING DATA FROM THE API

def whole_teltel_raw_data():
    # Creating a session to introduce network consistency
    session = requests.Session()
    retry = Retry(connect=3, backoff_factor=1.0)
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)

    url = "https://my_api_url"
    the_headers = {"X-API-KEY": 'my key'}
    offset_limit = 1249500
    teltel_data = []

    # Loop through the results and if present extend the teltel_data list

  

#======================================================================================================================
# WRITE THE DATA TO THE DATA WAREHOUSE
# ======================================================================================================================


os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dwingestion-b033d9535e9d.json'


def create_row_data(tuple_data):
    call_id, starttime, stoptime, direction, type, status, duration_sec, rate, cost, transfer, extra_prefix, audio_url, \
    hangup_element, caller_number, caller_type, caller_cid, caller_dnid, caller_user_id, caller_user_short, \
    callee_number, calle_type, callee, hangup_element_name, hangup_element_element, callee_user_id, callee_user_short, \
    caller = tuple_data

    row = teltel_call_data_pb2.TeltelCall()
    row.call_id = call_id
    row.starttime = starttime
    row.stoptime = stoptime
    row.direction = direction
    row.type = type
    row.status = status
    row.duration_sec = duration_sec
    row.rate = rate
    row.cost = cost
    row.transfer = transfer
    row.extra_prefix = extra_prefix
    row.audio_url = audio_url
    row.hangup_element = hangup_element
    row.caller_number = caller_number
    row.caller_type = caller_type
    row.caller_cid = caller_cid
    row.caller_dnid = caller_dnid
    row.caller_user_id = caller_user_id
    row.caller_user_short = caller_user_short
    row.callee_number = callee_number
    row.calle_type = calle_type
    row.callee = callee
    row.hangup_element_name = hangup_element_name
    row.hangup_element_title = hangup_element_element
    row.callee_user_id = callee_user_id
    row.callee_user_short = callee_user_short
    row.caller = caller
    return row.SerializeToString()


# Creating connection to the data warehouse
def create_bigquery_storage_client(google_credentials):
    return bigquery_storage_v1.client.BigQueryWriteClient(
        credentials=google_credentials
    )


class GcpBigqueryStorageService(object):

    def __init__(self, google_credentials=None, gcp_config=None):
        self.client = create_bigquery_storage_client(google_credentials)
        self.config = gcp_config

    def append_rows_pending(self, project_id: str, dataset_id: str, table_id: str):
        """Create a write stream, write some sample data, and commit the stream."""
        # write_client = self.client
        parent = self.client.table_path(project_id, dataset_id, table_id)
        write_stream = types.WriteStream()

        # When creating the stream, choose the type. Use the PENDING type to wait

        write_stream.type_ = types.WriteStream.Type.PENDING
        write_stream = self.client.create_write_stream(
            parent=parent, write_stream=write_stream
        )
        stream_name = write_stream.name

        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()

        # The initial request must contain the stream name.
        request_template.write_stream = stream_name

        # So that BigQuery knows how to parse the serialized_rows, generate a
        # protocol buffer representation of your message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()
        teltel_call_data_pb2.TeltelCall.DESCRIPTOR.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data

        # Some stream types support an unbounded number of requests. Construct an
        # AppendRowsStream to send an arbitrary number of requests to a stream.
        append_rows_stream = writer.AppendRowsStream(self.client, request_template)

        # Create a batch of row data by appending proto2 serialized bytes to the
        # serialized_rows repeated field.
        proto_rows = types.ProtoRows()
        row_number = 0
        for row in whole_teltel_raw_data():
            proto_rows.serialized_rows.append(create_row_data(row))
            # checking the writing progress
            row_number = row_number + 1
            print("Writing to the database row number", row_number)

        # The first request must always have an offset of 0.
        request = types.AppendRowsRequest()
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.rows = proto_rows
        request.proto_rows = proto_data

        append_rows_stream.close()
        # A PENDING type stream must be "finalized" before being committed. No new
        # records can be written to the stream after this method has been called.
        self.client.finalize_write_stream(name=write_stream.name)

        # Commit the stream you created earlier.
        batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
        batch_commit_write_streams_request.parent = parent
        batch_commit_write_streams_request.write_streams = [write_stream.name]
        self.client.batch_commit_write_streams(batch_commit_write_streams_request)
        print(f"Writes to stream: '{write_stream.name}' have been committed.")


p = GcpBigqueryStorageService()

p.append_rows_pending(project_id='my_project', dataset_id='my_id', table_id='teltel_call_2')

I am trying to pull a huge amount of data (in millions) and I am getting the following error when running my code. If I run the same code with a small range (to be exact a range of 2) it runs successfully. Please assist in helping me know if this is my issue or is coming from the API side

Thanks

The Error I am getting

DEBUG:google.api_core.bidi:Started helper thread Thread-ConsumeBidirectionalStream
DEBUG:google.api_core.bidi:Thread-ConsumeBidirectionalStream caught error 400 Request contains an invalid argument. and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.
Traceback (most recent call last):
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 147, in error_remapped_callable
    return _StreamingResponseIterator(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 73, in __init__
    self._stored_first_result = next(self._wrapped)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "Request contains an invalid argument."
        debug_error_string = "{"created":"@1652904360.179503883","description":"Error received from peer ipv4:173.194.76.95:443","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Request contains an invalid argument.","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/bidi.py", line 636, in _thread_main
    self._bidi_rpc.open()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/bidi.py", line 279, in open
    call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/client.py", line 678, in append_rows
    response = rpc(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 154, in __call__
    return wrapped_func(*args, **kwargs)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 151, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Request contains an invalid argument.
INFO:google.api_core.bidi:Thread-ConsumeBidirectionalStream exiting
DEBUG:google.cloud.bigquery_storage_v1.writer:Finished stopping manager.
Traceback (most recent call last):
  File "write_data_to_db2.py", line 207, in <module>
    p.append_rows_pending(project_id='dwingestion', dataset_id='ke',
  File "write_data_to_db2.py", line 188, in append_rows_pending
    response_future_1 = append_rows_stream.send(request)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 234, in send
    return self._open(request)
  File "/home/coyugi/teltel_env/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/writer.py", line 207, in _open
    raise request_exception
google.api_core.exceptions.Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to see the error.

Summary Of My Code

# PULLING DATA FROM THE API

def whole_teltel_raw_data():
    # Creating a session to introduce network consistency
    session = requests.Session()
    retry = Retry(connect=3, backoff_factor=1.0)
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)

    url = "https://my_api_url"
    the_headers = {"X-API-KEY": 'my key'}
    offset_limit = 1249500
    teltel_data = []

    # Loop through the results and if present extend the teltel_data list

  

#======================================================================================================================
# WRITE THE DATA TO THE DATA WAREHOUSE
# ======================================================================================================================


os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dwingestion-b033d9535e9d.json'


def create_row_data(tuple_data):
    call_id, starttime, stoptime, direction, type, status, duration_sec, rate, cost, transfer, extra_prefix, audio_url, \
    hangup_element, caller_number, caller_type, caller_cid, caller_dnid, caller_user_id, caller_user_short, \
    callee_number, calle_type, callee, hangup_element_name, hangup_element_element, callee_user_id, callee_user_short, \
    caller = tuple_data

    row = teltel_call_data_pb2.TeltelCall()
    row.call_id = call_id
    row.starttime = starttime
    row.stoptime = stoptime
    row.direction = direction
    row.type = type
    row.status = status
    row.duration_sec = duration_sec
    row.rate = rate
    row.cost = cost
    row.transfer = transfer
    row.extra_prefix = extra_prefix
    row.audio_url = audio_url
    row.hangup_element = hangup_element
    row.caller_number = caller_number
    row.caller_type = caller_type
    row.caller_cid = caller_cid
    row.caller_dnid = caller_dnid
    row.caller_user_id = caller_user_id
    row.caller_user_short = caller_user_short
    row.callee_number = callee_number
    row.calle_type = calle_type
    row.callee = callee
    row.hangup_element_name = hangup_element_name
    row.hangup_element_title = hangup_element_element
    row.callee_user_id = callee_user_id
    row.callee_user_short = callee_user_short
    row.caller = caller
    return row.SerializeToString()


# Creating connection to the data warehouse
def create_bigquery_storage_client(google_credentials):
    return bigquery_storage_v1.client.BigQueryWriteClient(
        credentials=google_credentials
    )


class GcpBigqueryStorageService(object):

    def __init__(self, google_credentials=None, gcp_config=None):
        self.client = create_bigquery_storage_client(google_credentials)
        self.config = gcp_config

    def append_rows_pending(self, project_id: str, dataset_id: str, table_id: str):
        """Create a write stream, write some sample data, and commit the stream."""
        # write_client = self.client
        parent = self.client.table_path(project_id, dataset_id, table_id)
        write_stream = types.WriteStream()

        # When creating the stream, choose the type. Use the PENDING type to wait

        write_stream.type_ = types.WriteStream.Type.PENDING
        write_stream = self.client.create_write_stream(
            parent=parent, write_stream=write_stream
        )
        stream_name = write_stream.name

        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()

        # The initial request must contain the stream name.
        request_template.write_stream = stream_name

        # So that BigQuery knows how to parse the serialized_rows, generate a
        # protocol buffer representation of your message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()
        teltel_call_data_pb2.TeltelCall.DESCRIPTOR.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data

        # Some stream types support an unbounded number of requests. Construct an
        # AppendRowsStream to send an arbitrary number of requests to a stream.
        append_rows_stream = writer.AppendRowsStream(self.client, request_template)

        # Create a batch of row data by appending proto2 serialized bytes to the
        # serialized_rows repeated field.
        proto_rows = types.ProtoRows()
        row_number = 0
        for row in whole_teltel_raw_data():
            proto_rows.serialized_rows.append(create_row_data(row))
            # checking the writing progress
            row_number = row_number + 1
            print("Writing to the database row number", row_number)

        # The first request must always have an offset of 0.
        request = types.AppendRowsRequest()
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.rows = proto_rows
        request.proto_rows = proto_data

        append_rows_stream.close()
        # A PENDING type stream must be "finalized" before being committed. No new
        # records can be written to the stream after this method has been called.
        self.client.finalize_write_stream(name=write_stream.name)

        # Commit the stream you created earlier.
        batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
        batch_commit_write_streams_request.parent = parent
        batch_commit_write_streams_request.write_streams = [write_stream.name]
        self.client.batch_commit_write_streams(batch_commit_write_streams_request)
        print(f"Writes to stream: '{write_stream.name}' have been committed.")


p = GcpBigqueryStorageService()

p.append_rows_pending(project_id='my_project', dataset_id='my_id', table_id='teltel_call_2')

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文