podman/test/apiv2/rest_api/test_rest_v2_0_0.py
baude d3e794bda3 add network connect|disconnect compat endpoints
this enables the ability to connect and disconnect a container from a
given network. it is only for the compatibility layer. some code had to
be refactored to avoid circular imports.

additionally, tests are being deferred temporarily due to some
incompatibility/bug in either docker-py or our stack.

Signed-off-by: baude <bbaude@redhat.com>
2020-11-17 14:22:39 -06:00

270 lines
9 KiB
Python

import json
import subprocess
import sys
import time
import unittest
from multiprocessing import Process
import requests
from dateutil.parser import parse
from test.apiv2.rest_api import Podman
PODMAN_URL = "http://localhost:8080"
def _url(path):
return PODMAN_URL + "/v2.0.0/libpod" + path
def ctnr(path):
try:
r = requests.get(_url("/containers/json?all=true"))
ctnrs = json.loads(r.text)
except Exception as e:
msg = f"Bad container response: {e}"
if r is not None:
msg = msg + " " + r.text
sys.stderr.write(msg + "\n")
raise
return path.format(ctnrs[0]["Id"])
def validateObjectFields(buffer):
objs = json.loads(buffer)
if not isinstance(objs, dict):
for o in objs:
_ = o["Id"]
else:
_ = objs["Id"]
return objs
class TestApi(unittest.TestCase):
podman = None # initialized podman configuration for tests
service = None # podman service instance
def setUp(self):
super().setUp()
try:
TestApi.podman.run("run", "alpine", "/bin/ls", check=True)
except subprocess.CalledProcessError as e:
if e.stdout:
sys.stdout.write("\nRun Stdout:\n" + e.stdout.decode("utf-8"))
if e.stderr:
sys.stderr.write("\nRun Stderr:\n" + e.stderr.decode("utf-8"))
raise
@classmethod
def setUpClass(cls):
super().setUpClass()
TestApi.podman = Podman()
TestApi.service = TestApi.podman.open(
"system", "service", "tcp:localhost:8080", "--time=0"
)
# give the service some time to be ready...
time.sleep(2)
returncode = TestApi.service.poll()
if returncode is not None:
raise subprocess.CalledProcessError(returncode, "podman system service")
r = requests.post(_url("/images/pull?reference=docker.io%2Falpine%3Alatest"))
if r.status_code != 200:
raise subprocess.CalledProcessError(
r.status_code, f"podman images pull docker.io/alpine:latest {r.text}"
)
@classmethod
def tearDownClass(cls):
TestApi.service.terminate()
stdout, stderr = TestApi.service.communicate(timeout=0.5)
if stdout:
sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8"))
if stderr:
sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8"))
return super().tearDownClass()
def test_info(self):
r = requests.get(_url("/info"))
self.assertEqual(r.status_code, 200)
self.assertIsNotNone(r.content)
_ = json.loads(r.text)
def test_events(self):
r = requests.get(_url("/events?stream=false"))
self.assertEqual(r.status_code, 200, r.text)
self.assertIsNotNone(r.content)
for line in r.text.splitlines():
obj = json.loads(line)
# Actor.ID is uppercase for compatibility
_ = obj["Actor"]["ID"]
def test_containers(self):
r = requests.get(_url("/containers/json"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
obj = json.loads(r.text)
self.assertEqual(len(obj), 0)
def test_containers_all(self):
r = requests.get(_url("/containers/json?all=true"))
self.assertEqual(r.status_code, 200, r.text)
validateObjectFields(r.text)
def test_inspect_container(self):
r = requests.get(_url(ctnr("/containers/{}/json")))
self.assertEqual(r.status_code, 200, r.text)
obj = validateObjectFields(r.content)
_ = parse(obj["Created"])
def test_stats(self):
r = requests.get(_url(ctnr("/containers/{}/stats?stream=false")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
validateObjectFields(r.text)
def test_delete_containers(self):
r = requests.delete(_url(ctnr("/containers/{}")))
self.assertEqual(r.status_code, 204, r.text)
def test_stop_containers(self):
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_start_containers(self):
r = requests.post(_url(ctnr("/containers/{}/stop")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
def test_restart_containers(self):
r = requests.post(_url(ctnr("/containers/{}/start")))
self.assertIn(r.status_code, (204, 304), r.text)
r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5)
self.assertEqual(r.status_code, 204, r.text)
def test_resize(self):
r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80")))
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertIsNone(r.text)
def test_attach_containers(self):
self.skipTest("FIXME: Test timeouts")
r = requests.post(_url(ctnr("/containers/{}/attach")), timeout=5)
self.assertIn(r.status_code, (101, 500), r.text)
def test_logs_containers(self):
r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true")))
self.assertEqual(r.status_code, 200, r.text)
def test_post_create_compat(self):
"""Create network and container then connect to network"""
net = requests.post(
PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestNetwork"}
)
self.assertEqual(net.status_code, 201, net.text)
create = requests.post(
PODMAN_URL + "/v1.40/containers/create?name=postCreate",
json={
"Cmd": ["date"],
"Image": "alpine:latest",
"NetworkDisabled": False,
"NetworkConfig": {
"EndpointConfig": {"TestNetwork": {"Aliases": ["test_post_create"]}}
},
},
)
self.assertEqual(create.status_code, 201, create.text)
payload = json.loads(create.text)
self.assertIsNotNone(payload["Id"])
# This cannot be done until full completion of the network connect
# stack and network disconnect stack are complete
# connect = requests.post(
# PODMAN_URL + "/v1.40/networks/TestNetwork/connect",
# json={"Container": payload["Id"]},
# )
# self.assertEqual(connect.status_code, 200, connect.text)
# self.assertEqual(connect.text, "OK\n")
def test_commit(self):
r = requests.post(_url(ctnr("/commit?container={}")))
self.assertEqual(r.status_code, 200, r.text)
validateObjectFields(r.text)
def test_images(self):
r = requests.get(_url("/images/json"))
self.assertEqual(r.status_code, 200, r.text)
validateObjectFields(r.content)
def test_inspect_image(self):
r = requests.get(_url("/images/alpine/json"))
self.assertEqual(r.status_code, 200, r.text)
obj = validateObjectFields(r.content)
_ = parse(obj["Created"])
def test_delete_image(self):
r = requests.delete(_url("/images/alpine?force=true"))
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
def test_pull(self):
r = requests.post(_url("/images/pull?reference=alpine"), timeout=15)
self.assertEqual(r.status_code, 200, r.status_code)
text = r.text
keys = {
"error": False,
"id": False,
"images": False,
"stream": False,
}
# Read and record stanza's from pull
for line in str.splitlines(text):
obj = json.loads(line)
key_list = list(obj.keys())
for k in key_list:
keys[k] = True
self.assertFalse(keys["error"], "Expected no errors")
self.assertTrue(keys["id"], "Expected to find id stanza")
self.assertTrue(keys["images"], "Expected to find images stanza")
self.assertTrue(keys["stream"], "Expected to find stream progress stanza's")
def test_search(self):
# Had issues with this test hanging when repositories not happy
def do_search():
r = requests.get(_url("/images/search?term=alpine"), timeout=5)
self.assertEqual(r.status_code, 200, r.text)
json.loads(r.text)
search = Process(target=do_search)
search.start()
search.join(timeout=10)
self.assertFalse(search.is_alive(), "/images/search took too long")
def test_ping(self):
r = requests.get(PODMAN_URL + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
r = requests.head(PODMAN_URL + "/_ping")
self.assertEqual(r.status_code, 200, r.text)
r = requests.get(_url("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
r = requests.get(_url("/_ping"))
self.assertEqual(r.status_code, 200, r.text)
if __name__ == "__main__":
unittest.main()