Skip to content

Commit

Permalink
Merge bitcoin#21008: test: fix zmq test flakiness, improve speed
Browse files Browse the repository at this point in the history
ef21fb7 zmq test: speedup test by whitelisting peers (immediate tx relay) (Sebastian Falbesoner)
5c65463 zmq test: fix flakiness by using more robust sync method (Sebastian Falbesoner)
8666033 zmq test: accept arbitrary sequence start number in ZMQSubscriber (Sebastian Falbesoner)
6014d6e zmq test: dedup message reception handling in ZMQSubscriber (Sebastian Falbesoner)

Pull request description:

  Fixes bitcoin#20934 by using the "sync up" method described in bitcoin#20538 (comment).

  After improving robustness with this approach (commits 1-3), it turned out that there were still some fails, but those were unrelated to zmq: Out of 500 runs, 3 times `sync_mempool()` or `sync_blocks()` timed out, which can happen because the trickle relay time has no upper bound -- hence in rare cases, it takes longer than 60s. This is fixed by enabling immediate tx relay on node1 (commit 4), which as a nice side-effect also gives us a rough 2x speedup for the test.

  For further details, also see the explanations in the commit messages.

  There is no guarantee that the test is still not flaky, but it would help if potential reviewers would run the following script locally and report how many runs failed (feel free to do less than 1000 runs, as this takes quite a long if ran with `--valgrind`):
  ```
  #!/bin/sh
  OUTPUT_FILE=./zmq_results
  echo ===== repeated zmq test ===== > $OUTPUT_FILE

  for i in `seq 1000`; do
      echo ------------------------
      echo ----- test run $i -----
      echo ------------------------
      echo --- $i --- >> $OUTPUT_FILE
      ./test/functional/interface_zmq.py --valgrind
      if [ $? -ne 0 ]; then
          echo "FAILED. /o\\" >> $OUTPUT_FILE
      else
          echo "PASSED. \\o/" >> $OUTPUT_FILE
      fi
  done

  echo Failed test runs:
  grep FAILED $OUTPUT_FILE | wc -l
  ```

ACKs for top commit:
  jonatack:
    Light ACK ef21fb7 with the caveat that I was unable to make the test fail with valgrind both here and on master, so I can't vouch that it actually fixes the CI flakiness. The test does run ~2x faster with this.

Tree-SHA512: 7a1e7592fbbd98e69e1e1294486b91253e589c72b3c6bbb7f587028ec07cca59b7d984e4ebf256c4bc3e8a529ec77d31842f3dd874038aea0b684abfea50306a
  • Loading branch information
MarcoFalke committed Feb 16, 2021
2 parents 9bbf08b + ef21fb7 commit 3c9d9d2
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions test/functional/interface_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,31 @@ def hash256_reversed(byte_str):

class ZMQSubscriber:
def __init__(self, socket, topic):
self.sequence = 0
self.sequence = None # no sequence number received yet
self.socket = socket
self.topic = topic

self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)

def receive(self):
# Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
received_seq = struct.unpack('<I', seq)[-1]
if self.sequence is None:
self.sequence = received_seq
else:
assert_equal(received_seq, self.sequence)
self.sequence += 1
return body

def receive(self):
return self._receive_from_publisher_and_check()

def receive_sequence(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
self.sequence += 1
body = self._receive_from_publisher_and_check()
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
Expand All @@ -64,6 +67,9 @@ def set_test_params(self):
self.num_nodes = 2
if self.is_wallet_compiled():
self.requires_wallet = True
# This test isn't testing txn relay/timing, so set whitelist on the
# peers for instant txn relay. This speeds up the test run time 2-3x.
self.extra_args = [["[email protected]"]] * self.num_nodes

def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
Expand All @@ -84,23 +90,46 @@ def run_test(self):

# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))

self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])

if connect_nodes:
self.connect_nodes(0, 1)
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
self.extra_args[0])

for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])

# Relax so that the subscribers are ready before publishing zmq messages
sleep(0.2)
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive a notification on all subscribers
# 3. If all subscribers get a message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
self.nodes[0].generate(1)
recv_failed = False
for sub in subscribers:
try:
sub.receive()
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break

# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)

self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()

return subscribers

Expand All @@ -110,9 +139,7 @@ def test_basic(self):
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])

address = 'tcp://127.0.0.1:28332'
subs = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
connect_nodes=True)
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])

hashblock = subs[0]
hashtx = subs[1]
Expand Down Expand Up @@ -189,6 +216,7 @@ def test_reorg(self):
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)

# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
Expand Down Expand Up @@ -237,6 +265,7 @@ def test_sequence(self):
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)

# Mempool sequence number starts at 1
seq_num = 1
Expand Down Expand Up @@ -387,7 +416,7 @@ def test_mempool_sync(self):
return

self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])

# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
Expand Down Expand Up @@ -487,10 +516,13 @@ def test_mempool_sync(self):

def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
])
], sync_blocks=False)

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
Expand Down

0 comments on commit 3c9d9d2

Please sign in to comment.