如何使用 web3.py 获取合约地址的待处理交易?

发布于 2025-01-15 05:48:52 字数 1361 浏览 4 评论 0原文

我想获取合约地址的待处理交易,我尝试了很多方法但不起作用

方法1:这似乎擅长对待处理交易进行排序,但我无法从我的地址获取任何交易,我不知道为什么。请帮助我

def main():
    block_filter = web3.eth.filter('pending') 
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            if web3.eth.getTransaction(event)['from'] == my contract:
                print(event)

方法 2:这可以帮助我从我的地址获取交易,但它获得的所有交易均已确认,而不是待处理

def main():
    block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'pending', 'address':contract_address}) #this is not working, return nothing

    #block_filter = web3.eth.filter({'fromBlock':0,'toBlock':'pending', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'latest', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'latest','toBlock':'pending', 'address':contract_address}) #return error from > to

    #block_filter = web3.eth.filter({'address':contract_address}) #return confirmed transaction, not pending
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            print(event)

I want to get pending transaction of a contract address, I have tried many ways but didn't work

method 1: this seems to be good at sorting pending transaction but I can't get any transaction from my address, I don't know why. Please help me

def main():
    block_filter = web3.eth.filter('pending') 
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            if web3.eth.getTransaction(event)['from'] == my contract:
                print(event)

method 2: this help me to get transaction from my address but all transaction it get is comfirmed, not pending

def main():
    block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'pending', 'address':contract_address}) #this is not working, return nothing

    #block_filter = web3.eth.filter({'fromBlock':0,'toBlock':'pending', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'latest', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'latest','toBlock':'pending', 'address':contract_address}) #return error from > to

    #block_filter = web3.eth.filter({'address':contract_address}) #return confirmed transaction, not pending
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            print(event)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

永不分离 2025-01-22 05:48:52

您可以使用 WebSocket 连接到您的节点来订阅新的待处理交易。有关更多信息,请检查 https:// geth.ethereum.org/docs/rpc/pubsub

这样您将收到所有待处理的交易 txHashes。
之后,您可以过滤它们并仅处理发送到所需令牌地址的令牌。
一种方法是对每个收到的 txHash 使用 web3.eth.get_transaction()。它会返回给您一个包含交易详细信息的字典。使用此字典中的“to”键来过滤是否将待处理的交易发送到所需的合约地址。
以下是建议的(稍作修改的)解决方案:
https: //community.infura.io/t/web3-py-how-to-subscribe-to-pending-ethereum-transactions-in-python/5409

import asyncio
import json
import requests
import websockets
from web3 import Web3


web3 = Web3(Web3.HTTPProvider(node_http_url))
# specific address to watch at 
watched_address = Web3.toChecksumAddress('0x....')

async def get_event():
    async with websockets.connect(node_ws_url) as ws:
        await ws.send('{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}')
        subscription_response = await ws.recv()
        print(subscription_response)

        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=15)
                response = json.loads(message)
                txHash = response['params']['result']
                print(txHash)

                tx = web3.eth.get_transaction(txHash)
                if tx['to'] == watched_address:
                    print("Pending transaction found with the following details:")
                    print({
                        "hash": txHash,
                        "from": tx["from"],
                        "value": web3.fromWei(tx["value"], 'ether')
                    })
                pass
            except:
                pass

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())
 

如果您想要解码 tx['input'] 你可以使用 eth-abi
https://github.com/ethereum/eth-abi

或者如果您有合约 abi你可以创建web3合约并调用decode_function_input()

wa_contract = web3.eth.contract(address=watched_address, abi=watched_address_abi)  
func_obj, func_params = wa_contract.decode_function_input(tx['input'])

You can subscribe to new pending transactions, using WebSocket connection to your Node For more info check https://geth.ethereum.org/docs/rpc/pubsub

Than way you will receive all pending transaction txHashes.
After that you can filter them and process only that sent to desired token address.
One way is to use web3.eth.get_transaction() for each received txHash. It will return you a dict with transaction details. Use 'to' key from this dict to filter is pending transaction sent to desired contract address or not.
Bellow is (slightly modified) solution suggested at:
https://community.infura.io/t/web3-py-how-to-subscribe-to-pending-ethereum-transactions-in-python/5409

import asyncio
import json
import requests
import websockets
from web3 import Web3


web3 = Web3(Web3.HTTPProvider(node_http_url))
# specific address to watch at 
watched_address = Web3.toChecksumAddress('0x....')

async def get_event():
    async with websockets.connect(node_ws_url) as ws:
        await ws.send('{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}')
        subscription_response = await ws.recv()
        print(subscription_response)

        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=15)
                response = json.loads(message)
                txHash = response['params']['result']
                print(txHash)

                tx = web3.eth.get_transaction(txHash)
                if tx['to'] == watched_address:
                    print("Pending transaction found with the following details:")
                    print({
                        "hash": txHash,
                        "from": tx["from"],
                        "value": web3.fromWei(tx["value"], 'ether')
                    })
                pass
            except:
                pass

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())
 

If you wants to decode tx['input'] you can use eth-abi
https://github.com/ethereum/eth-abi

or if you have the contract abi you can create web3 contract and call decode_function_input()

wa_contract = web3.eth.contract(address=watched_address, abi=watched_address_abi)  
func_obj, func_params = wa_contract.decode_function_input(tx['input'])
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文