abril/vagrant/test_cluster.py
2022-08-29 19:45:07 -05:00

115 lines
3.7 KiB
Python
Executable File

#!/usr/bin/env python3
# Perform a few tests on a cluster created with this playbook.
# To simplify test execution, the scripts does not depend on any third-party
# packages, only the Python standard library.
import json
import subprocess
import unittest
from pathlib import Path
from time import sleep
from warnings import warn
VAGRANT_DIR = Path(__file__).parent.absolute()
PLAYBOOK_DIR = VAGRANT_DIR.parent.absolute()
class TestK3sCluster(unittest.TestCase):
def _kubectl(self, args: str, json_out: bool = True) -> dict | None:
cmd = "kubectl"
if json_out:
cmd += " -o json"
cmd += f" {args}"
result = subprocess.run(cmd, capture_output=True, shell=True, check=True)
if json_out:
return json.loads(result.stdout)
else:
return None
def _curl(self, url: str) -> str:
options = [
"--silent", # no progress info
"--show-error", # ... but errors should still be shown
"--fail", # set exit code on error
"--location", # follow redirects
]
cmd = f'curl {" ".join(options)} "{url}"'
result = subprocess.run(cmd, capture_output=True, shell=True, check=True)
output = result.stdout.decode("utf-8")
return output
def _apply_manifest(self, manifest_file: Path) -> dict:
apply_result = self._kubectl(
f'apply --filename="{manifest_file}" --cascade="background"'
)
self.addCleanup(
lambda: self._kubectl(
f'delete --filename="{manifest_file}"',
json_out=False,
)
)
return apply_result
@staticmethod
def _retry(function, retries: int = 5, seconds_between_retries=1):
for retry in range(1, retries + 1):
try:
return function()
except Exception as exc:
if retry < retries:
sleep(seconds_between_retries)
continue
else:
raise exc
def _get_load_balancer_ip(
self,
service: str,
namespace: str = "default",
) -> str | None:
svc_description = self._kubectl(
f'get --namespace="{namespace}" service "{service}"'
)
ip = svc_description["status"]["loadBalancer"]["ingress"][0]["ip"]
return ip
def test_nodes_exist(self):
out = self._kubectl("get nodes")
node_names = {item["metadata"]["name"] for item in out["items"]}
self.assertEqual(
node_names,
{"control1", "control2", "control3", "node1", "node2"},
)
def test_ip_address_pool_exists(self):
out = self._kubectl("get --all-namespaces IpAddressPool")
pools = out["items"]
self.assertGreater(len(pools), 0)
def test_nginx_example_page(self):
# Deploy the manifests to the cluster
deployment = self._apply_manifest(PLAYBOOK_DIR / "example" / "deployment.yml")
service = self._apply_manifest(PLAYBOOK_DIR / "example" / "service.yml")
# Assert that the dummy page is available
metallb_ip = self._retry(
lambda: self._get_load_balancer_ip(service["metadata"]["name"])
)
# Now that an IP address was assigned, let's reload the service description:
service = self._kubectl(f'get service "{service["metadata"]["name"]}"')
metallb_port = service["spec"]["ports"][0]["port"]
response_body = self._retry(
lambda: self._curl(f"http://{metallb_ip}:{metallb_port}/")
)
self.assertIn("Welcome to nginx!", response_body)
if __name__ == "__main__":
unittest.main()