使用以太坊测试器进行测试

以太坊测试器 是一款用于测试基于以太坊的应用程序的工具套件。

本节简要概述了使用 eth-tester 进行测试的方法。如需了解更多信息,您可以查看 Github 仓库 中的文档或加入 Gitter 频道。

入门

在测试之前,需要设置 Vyper 特定的合约转换和区块链相关的固定装置。这些固定装置将在每个测试文件中使用,因此应该在 conftest.py 中定义。

注意

由于测试是在 pytest 框架中完成的,因此您可以使用 pytest.ini、tox.ini 和 setup.cfg,并且可以使用大多数 IDE 的 pytest 插件。

  1import json
  2
  3import pytest
  4import web3.exceptions
  5from eth_tester import EthereumTester, PyEVMBackend
  6from eth_tester.exceptions import TransactionFailed
  7from eth_utils.toolz import compose
  8from hexbytes import HexBytes
  9from web3 import Web3
 10from web3.contract import Contract
 11from web3.providers.eth_tester import EthereumTesterProvider
 12
 13from vyper import compiler
 14from vyper.ast.grammar import parse_vyper_source
 15from vyper.compiler.settings import Settings
 16
 17
 18class VyperMethod:
 19    ALLOWED_MODIFIERS = {"call", "estimateGas", "transact", "buildTransaction"}
 20
 21    def __init__(self, function, normalizers=None):
 22        self._function = function
 23        self._function._return_data_normalizers = normalizers
 24
 25    def __call__(self, *args, **kwargs):
 26        return self.__prepared_function(*args, **kwargs)
 27
 28    def __prepared_function(self, *args, **kwargs):
 29        if not kwargs:
 30            modifier, modifier_dict = "call", {}
 31            fn_abi = [
 32                x
 33                for x in self._function.contract_abi
 34                if x.get("name") == self._function.function_identifier
 35            ].pop()
 36            # To make tests faster just supply some high gas value.
 37            modifier_dict.update({"gas": fn_abi.get("gas", 0) + 500000})
 38        elif len(kwargs) == 1:
 39            modifier, modifier_dict = kwargs.popitem()
 40            if modifier not in self.ALLOWED_MODIFIERS:
 41                raise TypeError(f"The only allowed keyword arguments are: {self.ALLOWED_MODIFIERS}")
 42        else:
 43            raise TypeError(f"Use up to one keyword argument, one of: {self.ALLOWED_MODIFIERS}")
 44        return getattr(self._function(*args), modifier)(modifier_dict)
 45
 46
 47class VyperContract:
 48    """
 49    An alternative Contract Factory which invokes all methods as `call()`,
 50    unless you add a keyword argument. The keyword argument assigns the prep method.
 51    This call
 52    > contract.withdraw(amount, transact={'from': eth.accounts[1], 'gas': 100000, ...})
 53    is equivalent to this call in the classic contract:
 54    > contract.functions.withdraw(amount).transact({'from': eth.accounts[1], 'gas': 100000, ...})
 55    """
 56
 57    def __init__(self, classic_contract, method_class=VyperMethod):
 58        classic_contract._return_data_normalizers += CONCISE_NORMALIZERS
 59        self._classic_contract = classic_contract
 60        self.address = self._classic_contract.address
 61        protected_fn_names = [fn for fn in dir(self) if not fn.endswith("__")]
 62
 63        try:
 64            fn_names = [fn["name"] for fn in self._classic_contract.functions._functions]
 65        except web3.exceptions.NoABIFunctionsFound:
 66            fn_names = []
 67
 68        for fn_name in fn_names:
 69            # Override namespace collisions
 70            if fn_name in protected_fn_names:
 71                raise AttributeError(f"{fn_name} is protected!")
 72            else:
 73                _classic_method = getattr(self._classic_contract.functions, fn_name)
 74                _concise_method = method_class(
 75                    _classic_method, self._classic_contract._return_data_normalizers
 76                )
 77            setattr(self, fn_name, _concise_method)
 78
 79    @classmethod
 80    def factory(cls, *args, **kwargs):
 81        return compose(cls, Contract.factory(*args, **kwargs))
 82
 83
 84def _none_addr(datatype, data):
 85    if datatype == "address" and int(data, base=16) == 0:
 86        return (datatype, None)
 87    else:
 88        return (datatype, data)
 89
 90
 91CONCISE_NORMALIZERS = (_none_addr,)
 92
 93
 94@pytest.fixture(scope="module")
 95def tester():
 96    # set absurdly high gas limit so that london basefee never adjusts
 97    # (note: 2**63 - 1 is max that evm allows)
 98    custom_genesis = PyEVMBackend._generate_genesis_params(overrides={"gas_limit": 10**10})
 99    custom_genesis["base_fee_per_gas"] = 0
100    backend = PyEVMBackend(genesis_parameters=custom_genesis)
101    return EthereumTester(backend=backend)
102
103
104def zero_gas_price_strategy(web3, transaction_params=None):
105    return 0  # zero gas price makes testing simpler.
106
107
108@pytest.fixture(scope="module")
109def w3(tester):
110    w3 = Web3(EthereumTesterProvider(tester))
111    w3.eth.set_gas_price_strategy(zero_gas_price_strategy)
112    return w3
113
114
115def _get_contract(w3, source_code, optimize, *args, override_opt_level=None, **kwargs):
116    settings = Settings()
117    settings.evm_version = kwargs.pop("evm_version", None)
118    settings.optimize = override_opt_level or optimize
119    out = compiler.compile_code(
120        source_code,
121        # test that metadata and natspecs get generated
122        ["abi", "bytecode", "metadata", "userdoc", "devdoc"],
123        settings=settings,
124        interface_codes=kwargs.pop("interface_codes", None),
125        show_gas_estimates=True,  # Enable gas estimates for testing
126    )
127    parse_vyper_source(source_code)  # Test grammar.
128    json.dumps(out["metadata"])  # test metadata is json serializable
129    abi = out["abi"]
130    bytecode = out["bytecode"]
131    value = kwargs.pop("value_in_eth", 0) * 10**18  # Handle deploying with an eth value.
132    c = w3.eth.contract(abi=abi, bytecode=bytecode)
133    deploy_transaction = c.constructor(*args)
134    tx_info = {"from": w3.eth.accounts[0], "value": value, "gasPrice": 0}
135    tx_info.update(kwargs)
136    tx_hash = deploy_transaction.transact(tx_info)
137    address = w3.eth.get_transaction_receipt(tx_hash)["contractAddress"]
138    return w3.eth.contract(address, abi=abi, bytecode=bytecode, ContractFactoryClass=VyperContract)
139
140
141def _deploy_blueprint_for(w3, source_code, optimize, initcode_prefix=b"", **kwargs):
142    settings = Settings()
143    settings.evm_version = kwargs.pop("evm_version", None)
144    settings.optimize = optimize
145    out = compiler.compile_code(
146        source_code,
147        ["abi", "bytecode"],
148        interface_codes=kwargs.pop("interface_codes", None),
149        settings=settings,
150        show_gas_estimates=True,  # Enable gas estimates for testing
151    )
152    parse_vyper_source(source_code)  # Test grammar.
153    abi = out["abi"]
154    bytecode = HexBytes(initcode_prefix) + HexBytes(out["bytecode"])
155    bytecode_len = len(bytecode)
156    bytecode_len_hex = hex(bytecode_len)[2:].rjust(4, "0")
157    # prepend a quick deploy preamble
158    deploy_preamble = HexBytes("61" + bytecode_len_hex + "3d81600a3d39f3")
159    deploy_bytecode = HexBytes(deploy_preamble) + bytecode
160
161    deployer_abi = []  # just a constructor
162    c = w3.eth.contract(abi=deployer_abi, bytecode=deploy_bytecode)
163    deploy_transaction = c.constructor()
164    tx_info = {"from": w3.eth.accounts[0], "value": 0, "gasPrice": 0}
165
166    tx_hash = deploy_transaction.transact(tx_info)
167    address = w3.eth.get_transaction_receipt(tx_hash)["contractAddress"]
168
169    # sanity check
170    assert w3.eth.get_code(address) == bytecode, (w3.eth.get_code(address), bytecode)
171
172    def factory(address):
173        return w3.eth.contract(
174            address, abi=abi, bytecode=bytecode, ContractFactoryClass=VyperContract
175        )
176
177    return w3.eth.contract(address, bytecode=deploy_bytecode), factory
178
179
180@pytest.fixture(scope="module")
181def deploy_blueprint_for(w3, optimize):
182    def deploy_blueprint_for(source_code, *args, **kwargs):
183        return _deploy_blueprint_for(w3, source_code, optimize, *args, **kwargs)
184
185    return deploy_blueprint_for
186
187
188@pytest.fixture(scope="module")
189def get_contract(w3, optimize):
190    def get_contract(source_code, *args, **kwargs):
191        return _get_contract(w3, source_code, optimize, *args, **kwargs)
192
193    return get_contract
194
195
196@pytest.fixture
197def get_logs(w3):
198    def get_logs(tx_hash, c, event_name):
199        tx_receipt = w3.eth.get_transaction_receipt(tx_hash)
200        return c._classic_contract.events[event_name]().process_receipt(tx_receipt)
201
202    return get_logs
203
204
205@pytest.fixture(scope="module")
206def assert_tx_failed(tester):
207    def assert_tx_failed(function_to_test, exception=TransactionFailed, exc_text=None):
208        snapshot_id = tester.take_snapshot()
209        with pytest.raises(exception) as excinfo:
210            function_to_test()
211        tester.revert_to_snapshot(snapshot_id)
212        if exc_text:
213            # TODO test equality
214            assert exc_text in str(excinfo.value), (exc_text, excinfo.value)
215
216    return assert_tx_failed

最后两个固定装置是可选的,将在后面讨论。本章的其余部分假设您已在 conftest.py 文件中设置了此代码。

或者,您可以将固定装置导入到 conftest.py 中,或使用 pytest 插件

编写基本测试

假设以下简单的合约 storage.vy。它有一个整数变量和一个用于设置该值的函数。

1storedData: public(int128)
2
3@external
4def __init__(_x: int128):
5  self.storedData = _x
6
7@external
8def set(_x: int128):
9  self.storedData = _x

我们创建一个测试文件 test_storage.py,在其中使用 pytest 样式编写测试。

 1import pytest
 2
 3INITIAL_VALUE = 4
 4
 5
 6@pytest.fixture
 7def storage_contract(w3, get_contract):
 8    with open("examples/storage/storage.vy") as f:
 9        contract_code = f.read()
10        # Pass constructor variables directly to the contract
11        contract = get_contract(contract_code, INITIAL_VALUE)
12    return contract
13
14
15def test_initial_state(storage_contract):
16    # Check if the constructor of the contract is set up properly
17    assert storage_contract.storedData() == INITIAL_VALUE
18
19
20def test_set(w3, storage_contract):
21    k0 = w3.eth.accounts[0]
22
23    # Let k0 try to set the value to 10
24    storage_contract.set(10, transact={"from": k0})
25    assert storage_contract.storedData() == 10  # Directly access storedData
26
27    # Let k0 try to set the value to -5
28    storage_contract.set(-5, transact={"from": k0})
29    assert storage_contract.storedData() == -5

首先,我们为合约创建一个固定装置,它将编译我们的合约并设置 Web3 合约对象。然后,我们将使用此固定装置让我们的测试函数与合约进行交互。

注意

要运行测试,请从项目目录调用 pytestpython -m pytest

事件和失败的交易

为了测试事件和失败的交易,我们将扩展我们简单的存储合约,以包含一个事件和两个导致交易失败的条件:advanced_storage.vy

 1event DataChange:
 2    setter: indexed(address)
 3    value: int128
 4
 5storedData: public(int128)
 6
 7@external
 8def __init__(_x: int128):
 9  self.storedData = _x
10
11@external
12def set(_x: int128):
13  assert _x >= 0, "No negative values"
14  assert self.storedData < 100, "Storage is locked when 100 or more is stored"
15  self.storedData = _x
16  log DataChange(msg.sender, _x)
17
18@external
19def reset():
20  self.storedData = 0

接下来,我们将看看两个固定装置,它们将允许我们读取事件日志并检查失败的交易。

@pytest.fixture(scope="module")
def assert_tx_failed(tester):
    def assert_tx_failed(function_to_test, exception=TransactionFailed, exc_text=None):
        snapshot_id = tester.take_snapshot()
        with pytest.raises(exception) as excinfo:
            function_to_test()
        tester.revert_to_snapshot(snapshot_id)
        if exc_text:
            # TODO test equality
            assert exc_text in str(excinfo.value), (exc_text, excinfo.value)

    return assert_tx_failed

断言失败的交易的固定装置默认情况下会检查 TransactionFailed 异常,但也可以用于检查其他异常,如下所示。还要注意,链将恢复到失败交易之前的状态。

@pytest.fixture
def get_logs(w3):
    def get_logs(tx_hash, c, event_name):
        tx_receipt = w3.eth.get_transaction_receipt(tx_hash)
        return c._classic_contract.events[event_name]().process_receipt(tx_receipt)

    return get_logs

此固定装置将返回一个包含特定事件和交易的所有日志的元组。元组的长度等于记录的事件(指定类型)的数量,应该首先进行检查。

最后,我们创建一个新文件 test_advanced_storage.py,在其中使用新的固定装置来测试失败的交易和事件。

 1import pytest
 2from web3.exceptions import ValidationError
 3
 4INITIAL_VALUE = 4
 5
 6
 7@pytest.fixture
 8def adv_storage_contract(w3, get_contract):
 9    with open("examples/storage/advanced_storage.vy") as f:
10        contract_code = f.read()
11        # Pass constructor variables directly to the contract
12        contract = get_contract(contract_code, INITIAL_VALUE)
13    return contract
14
15
16def test_initial_state(adv_storage_contract):
17    # Check if the constructor of the contract is set up properly
18    assert adv_storage_contract.storedData() == INITIAL_VALUE
19
20
21def test_failed_transactions(w3, adv_storage_contract, assert_tx_failed):
22    k1 = w3.eth.accounts[1]
23
24    # Try to set the storage to a negative amount
25    assert_tx_failed(lambda: adv_storage_contract.set(-10, transact={"from": k1}))
26
27    # Lock the contract by storing more than 100. Then try to change the value
28    adv_storage_contract.set(150, transact={"from": k1})
29    assert_tx_failed(lambda: adv_storage_contract.set(10, transact={"from": k1}))
30
31    # Reset the contract and try to change the value
32    adv_storage_contract.reset(transact={"from": k1})
33    adv_storage_contract.set(10, transact={"from": k1})
34    assert adv_storage_contract.storedData() == 10
35
36    # Assert a different exception (ValidationError for non matching argument type)
37    assert_tx_failed(
38        lambda: adv_storage_contract.set("foo", transact={"from": k1}), ValidationError
39    )
40
41    # Assert a different exception that contains specific text
42    assert_tx_failed(
43        lambda: adv_storage_contract.set(1, 2, transact={"from": k1}),
44        ValidationError,
45        "invocation failed due to improper number of arguments",
46    )
47
48
49def test_events(w3, adv_storage_contract, get_logs):
50    k1, k2 = w3.eth.accounts[:2]
51
52    tx1 = adv_storage_contract.set(10, transact={"from": k1})
53    tx2 = adv_storage_contract.set(20, transact={"from": k2})
54    tx3 = adv_storage_contract.reset(transact={"from": k1})
55
56    # Save DataChange logs from all three transactions
57    logs1 = get_logs(tx1, adv_storage_contract, "DataChange")
58    logs2 = get_logs(tx2, adv_storage_contract, "DataChange")
59    logs3 = get_logs(tx3, adv_storage_contract, "DataChange")
60
61    # Check log contents
62    assert len(logs1) == 1
63    assert logs1[0].args.value == 10
64
65    assert len(logs2) == 1
66    assert logs2[0].args.setter == k2
67
68    assert not logs3  # tx3 does not generate a log