Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: apply black to e2elive.py as well #1154

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 97 additions & 41 deletions misc/e2elive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,33 @@

logger = logging.getLogger(__name__)


def main():
start = time.time()
import argparse

ap = argparse.ArgumentParser()
ap.add_argument('--keep-temps', default=False, action='store_true')
ap.add_argument('--indexer-bin', default=None, help='path to algorand-indexer binary, otherwise search PATH')
ap.add_argument('--indexer-port', default=None, type=int, help='port to run indexer on. defaults to random in [4000,30000]')
ap.add_argument('--connection-string', help='Use this connection string instead of attempting to manage a local database.')
ap.add_argument('--source-net', help='Path to test network directory containing Primary and other nodes. May be a tar file.')
ap.add_argument('--verbose', default=False, action='store_true')
ap.add_argument("--keep-temps", default=False, action="store_true")
ap.add_argument(
"--indexer-bin",
default=None,
help="path to algorand-indexer binary, otherwise search PATH",
)
ap.add_argument(
"--indexer-port",
default=None,
type=int,
help="port to run indexer on. defaults to random in [4000,30000]",
)
ap.add_argument(
"--connection-string",
help="Use this connection string instead of attempting to manage a local database.",
)
ap.add_argument(
"--source-net",
help="Path to test network directory containing Primary and other nodes. May be a tar file.",
)
ap.add_argument("--verbose", default=False, action="store_true")
args = ap.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -41,9 +58,9 @@ def main():
sourcenet = args.source_net
source_is_tar = False
if not sourcenet:
e2edata = os.getenv('E2EDATA')
sourcenet = e2edata and os.path.join(e2edata, 'net')
if sourcenet and hassuffix(sourcenet, '.tar', '.tar.gz', '.tar.bz2', '.tar.xz'):
e2edata = os.getenv("E2EDATA")
sourcenet = e2edata and os.path.join(e2edata, "net")
if sourcenet and hassuffix(sourcenet, ".tar", ".tar.gz", ".tar.bz2", ".tar.xz"):
source_is_tar = True
tempdir = tempfile.mkdtemp()
if not args.keep_temps:
Expand All @@ -52,67 +69,99 @@ def main():
logger.info("leaving temp dir %r", tempdir)
if not (source_is_tar or (sourcenet and os.path.isdir(sourcenet))):
# fetch test data from S3
bucket = 'algorand-testdata'
bucket = "algorand-testdata"
import boto3
from botocore.config import Config
from botocore import UNSIGNED
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
tarname = 'net_done.tar.bz2'

s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
tarname = "net_done.tar.bz2"
tarpath = os.path.join(tempdir, tarname)
firstFromS3Prefix(s3, bucket, 'indexer/e2e4', tarname, outpath=tarpath)
firstFromS3Prefix(s3, bucket, "indexer/e2e4", tarname, outpath=tarpath)
source_is_tar = True
sourcenet = tarpath
tempnet = os.path.join(tempdir, 'net')
tempnet = os.path.join(tempdir, "net")
if source_is_tar:
xrun(['tar', '-C', tempdir, '-x', '-f', sourcenet])
xrun(["tar", "-C", tempdir, "-x", "-f", sourcenet])
else:
xrun(['rsync', '-a', sourcenet + '/', tempnet + '/'])
blockfiles = glob.glob(os.path.join(tempdir, 'net', 'Primary', '*', '*.block.sqlite'))
xrun(["rsync", "-a", sourcenet + "/", tempnet + "/"])
blockfiles = glob.glob(
os.path.join(tempdir, "net", "Primary", "*", "*.block.sqlite")
)
lastblock = countblocks(blockfiles[0])
#subprocess.run(['find', tempnet, '-type', 'f'])
# subprocess.run(['find', tempnet, '-type', 'f'])
try:
xrun(['goal', 'network', 'start', '-r', tempnet])
xrun(["goal", "network", "start", "-r", tempnet])
except Exception:
logger.error('failed to start private network, looking for node.log')
logger.error("failed to start private network, looking for node.log")
for root, dirs, files in os.walk(tempnet):
for f in files:
if f == 'node.log':
if f == "node.log":
p = os.path.join(root, f)
logger.error('found node.log: {}'.format(p))
logger.error("found node.log: {}".format(p))
with open(p) as nf:
for line in nf:
logger.error(' {}'.format(line))
logger.error(" {}".format(line))
raise

atexitrun(['goal', 'network', 'stop', '-r', tempnet])
atexitrun(["goal", "network", "stop", "-r", tempnet])

psqlstring = ensure_test_db(args.connection_string, args.keep_temps)
algoddir = os.path.join(tempnet, 'Primary')
aiport = args.indexer_port or random.randint(4000,30000)
cmd = [indexer_bin, 'daemon', '--data-dir', tempdir, '-P', psqlstring, '--dev-mode', '--algod', algoddir, '--server', ':{}'.format(aiport)]
logger.debug("%s", ' '.join(map(repr,cmd)))
algoddir = os.path.join(tempnet, "Primary")
aiport = args.indexer_port or random.randint(4000, 30000)
cmd = [
indexer_bin,
"daemon",
"--data-dir",
tempdir,
"-P",
psqlstring,
"--dev-mode",
"--algod",
algoddir,
"--server",
":{}".format(aiport),
]
logger.debug("%s", " ".join(map(repr, cmd)))
indexerdp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
indexerout = subslurp(indexerdp.stdout)
indexerout.start()
atexit.register(indexerdp.kill)
time.sleep(0.2)

indexerurl = 'http://localhost:{}/'.format(aiport)
healthurl = indexerurl + 'health'
indexerurl = "http://localhost:{}/".format(aiport)
healthurl = indexerurl + "health"
for attempt in range(20):
(ok, json) = tryhealthurl(healthurl, args.verbose, waitforround=lastblock)
if ok:
logger.debug('health round={} OK'.format(lastblock))
logger.debug("health round={} OK".format(lastblock))
break
time.sleep(0.5)
if not ok:
logger.error('could not get indexer health, or did not reach round={}\n{}'.format(lastblock, json))
logger.error(
"could not get indexer health, or did not reach round={}\n{}".format(
lastblock, json
)
)
sys.stderr.write(indexerout.dump())
return 1
try:
logger.info('reached expected round={}'.format(lastblock))
xrun(['python3', 'misc/validate_accounting.py', '--verbose', '--algod', algoddir, '--indexer', indexerurl], timeout=20)
xrun(['go', 'run', 'cmd/e2equeries/main.go', '-pg', psqlstring, '-q'], timeout=15)
logger.info("reached expected round={}".format(lastblock))
xrun(
[
"python3",
"misc/validate_accounting.py",
"--verbose",
"--algod",
algoddir,
"--indexer",
indexerurl,
],
timeout=20,
)
xrun(
["go", "run", "cmd/e2equeries/main.go", "-pg", psqlstring, "-q"], timeout=15
)
except Exception:
sys.stderr.write(indexerout.dump())
raise
Expand All @@ -121,12 +170,14 @@ def main():

return 0


def hassuffix(x, *suffixes):
for s in suffixes:
if x.endswith(s):
return True
return False


def countblocks(path):
db = sqlite3.connect(path)
cursor = db.cursor()
Expand All @@ -136,49 +187,54 @@ def countblocks(path):
db.close()
return row[0]


def tryhealthurl(healthurl, verbose=False, waitforround=100):
try:
response = urllib.request.urlopen(healthurl)
if response.code != 200:
return (False, "")
raw = response.read()
logger.debug('health %r', raw)
logger.debug("health %r", raw)
ob = json.loads(raw)
rt = ob.get('message')
rt = ob.get("message")
if not rt:
return (False, raw)
return (int(rt) >= waitforround, raw)
except Exception as e:
if verbose:
logging.warning('GET %s %s', healthurl, e)
logging.warning("GET %s %s", healthurl, e)
return (False, "")


class subslurp:
# asynchronously accumulate stdout or stderr from a subprocess and hold it for debugging if something goes wrong
def __init__(self, f):
self.f = f
self.buf = io.BytesIO()
self.gz = gzip.open(self.buf, 'wb')
self.gz = gzip.open(self.buf, "wb")
self.l = threading.Lock()
self.t = None

def run(self):
for line in self.f:
with self.l:
if self.gz is None:
return
self.gz.write(line)

def dump(self):
with self.l:
self.gz.close()
self.gz = None
self.buf.seek(0)
r = gzip.open(self.buf, 'rt')
r = gzip.open(self.buf, "rt")
return r.read()

def start(self):
self.t = threading.Thread(target=self.run)
self.t.daemon = True
self.t.start()


if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(main())