diff mbox series

[bitbake-devel,v6,20/22] hashserv: tests: Allow authentication for external server tests

Message ID 20231103142640.1936827-21-JPEWhacker@gmail.com
State New
Headers show
Series Bitbake Hash Server WebSockets, Alternate Database Backend, and User Management | expand

Commit Message

Joshua Watt Nov. 3, 2023, 2:26 p.m. UTC
If BB_TEST_HASHSERV_USERNAME and BB_TEST_HASHSERV_PASSWORD are provided
for a server admin user, the authentication tests for the external
hashserver will run. In addition, any users that get created will now be
deleted when the test finishes.

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 lib/hashserv/tests.py | 109 ++++++++++++++++++++++++++++--------------
 1 file changed, 74 insertions(+), 35 deletions(-)
diff mbox series

Patch

diff --git a/lib/hashserv/tests.py b/lib/hashserv/tests.py
index 2d78f9e9..5d209ffb 100644
--- a/lib/hashserv/tests.py
+++ b/lib/hashserv/tests.py
@@ -84,17 +84,13 @@  class HashEquivalenceTestSetup(object):
         return self.server.address
 
     def start_auth_server(self):
-        self.auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
-        self.admin_client = self.start_client(self.auth_server.address, username="admin", password="password")
+        auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
+        self.auth_server_address = auth_server.address
+        self.admin_client = self.start_client(auth_server.address, username="admin", password="password")
         return self.admin_client
 
     def auth_client(self, user):
-        return self.start_client(self.auth_server.address, user["username"], user["token"])
-
-    def auth_perms(self, *permissions):
-        self.client_index += 1
-        user = self.admin_client.new_user(f"user-{self.client_index}", permissions)
-        return self.auth_client(user)
+        return self.start_client(self.auth_server_address, user["username"], user["token"])
 
     def setUp(self):
         if sys.version_info < (3, 5, 0):
@@ -120,11 +116,11 @@  class HashEquivalenceTestSetup(object):
             })
 
     def assertUserCanAuth(self, user):
-        with self.start_client(self.auth_server.address) as client:
+        with self.start_client(self.auth_server_address) as client:
             client.auth(user["username"], user["token"])
 
     def assertUserCannotAuth(self, user):
-        with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
+        with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
             client.auth(user["username"], user["token"])
 
     def create_test_hash(self, client):
@@ -157,6 +153,26 @@  class HashEquivalenceTestSetup(object):
 
 
 class HashEquivalenceCommonTests(object):
+    def auth_perms(self, *permissions):
+        self.client_index += 1
+        user = self.create_user(f"user-{self.client_index}", permissions)
+        return self.auth_client(user)
+
+    def create_user(self, username, permissions, *, client=None):
+        def remove_user(username):
+            try:
+                self.admin_client.delete_user(username)
+            except bb.asyncrpc.InvokeError:
+                pass
+
+        if client is None:
+            client = self.admin_client
+
+        user = client.new_user(username, permissions)
+        self.addCleanup(remove_user, username)
+
+        return user
+
     def test_create_hash(self):
         return self.create_test_hash(self.client)
 
@@ -571,14 +587,14 @@  class HashEquivalenceCommonTests(object):
     def test_auth_no_token_refresh_from_anon_user(self):
         self.start_auth_server()
 
-        with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError):
+        with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
             client.refresh_token()
 
     def test_auth_self_token_refresh(self):
         admin_client = self.start_auth_server()
 
         # Create a new user with no permissions
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
 
         with self.auth_client(user) as client:
             new_user = client.refresh_token()
@@ -601,7 +617,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_token_refresh(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
 
         with self.auth_perms() as client, self.assertRaises(InvokeError):
             client.refresh_token(user["username"])
@@ -617,7 +633,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_self_get_user(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
         user_info = user.copy()
         del user_info["token"]
 
@@ -632,7 +648,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_get_user(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
         user_info = user.copy()
         del user_info["token"]
 
@@ -649,7 +665,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_reconnect(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
         user_info = user.copy()
         del user_info["token"]
 
@@ -665,7 +681,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_delete_user(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
 
         # No self service
         with self.auth_client(user) as client, self.assertRaises(InvokeError):
@@ -685,7 +701,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_set_user_perms(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
 
         self.assertUserPerms(user, [])
 
@@ -710,7 +726,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_get_all_users(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", [])
+        user = self.create_user("test-user", [])
 
         with self.auth_client(user) as client, self.assertRaises(InvokeError):
             client.get_all_users()
@@ -744,10 +760,10 @@  class HashEquivalenceCommonTests(object):
         permissions.sort()
 
         with self.auth_perms() as client, self.assertRaises(InvokeError):
-            client.new_user("test-user", permissions)
+            self.create_user("test-user", permissions, client=client)
 
         with self.auth_perms("@user-admin") as client:
-            user = client.new_user("test-user", permissions)
+            user = self.create_user("test-user", permissions, client=client)
             self.assertIn("token", user)
             self.assertEqual(user["username"], "test-user")
             self.assertEqual(user["permissions"], permissions)
@@ -755,7 +771,7 @@  class HashEquivalenceCommonTests(object):
     def test_auth_become_user(self):
         admin_client = self.start_auth_server()
 
-        user = admin_client.new_user("test-user", ["@read", "@report"])
+        user = self.create_user("test-user", ["@read", "@report"])
         user_info = user.copy()
         del user_info["token"]
 
@@ -898,7 +914,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         user = admin_client.new_user("test-user", ["@read", "@report"])
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", user["username"],
             "--password", user["token"],
             "refresh-token"
@@ -916,7 +932,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         print("New token is %r" % new_token)
 
         self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", user["username"],
             "--password", new_token,
             "get-user"
@@ -928,7 +944,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         user = admin_client.new_user("test-user", ["@read"])
 
         self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", admin_client.username,
             "--password", admin_client.password,
             "set-user-perms",
@@ -946,7 +962,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         user = admin_client.new_user("test-user", ["@read"])
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", admin_client.username,
             "--password", admin_client.password,
             "get-user",
@@ -957,7 +973,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         self.assertIn("Permissions:", p.stdout)
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", user["username"],
             "--password", user["token"],
             "get-user",
@@ -973,7 +989,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         admin_client.new_user("test-user2", ["@read"])
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", admin_client.username,
             "--password", admin_client.password,
             "get-all-users",
@@ -987,7 +1003,7 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         admin_client = self.start_auth_server()
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", admin_client.username,
             "--password", admin_client.password,
             "new-user",
@@ -1017,14 +1033,13 @@  class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
         user = admin_client.new_user("test-user", ["@read"])
 
         p = self.run_hashclient([
-            "--address", self.auth_server.address,
+            "--address", self.auth_server_address,
             "--login", admin_client.username,
             "--password", admin_client.password,
             "delete-user",
             "-u", user["username"],
         ], check=True)
 
-
         self.assertIsNone(admin_client.get_user(user["username"]))
 
     def test_get_db_usage(self):
@@ -1104,19 +1119,43 @@  class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocket
 
 
 class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
-    def start_test_server(self):
-        if 'BB_TEST_HASHSERV' not in os.environ:
-            self.skipTest('BB_TEST_HASHSERV not defined to test an external server')
+    def get_env(self, name):
+        v = os.environ.get(name)
+        if not v:
+            self.skipTest(f'{name} not defined to test an external server')
+        return v
 
-        return os.environ['BB_TEST_HASHSERV']
+    def start_test_server(self):
+        return self.get_env('BB_TEST_HASHSERV')
 
     def start_server(self, *args, **kwargs):
         self.skipTest('Cannot start local server when testing external servers')
 
+    def start_auth_server(self):
+
+        self.auth_server_address = self.server_address
+        self.admin_client = self.start_client(
+            self.server_address,
+            username=self.get_env('BB_TEST_HASHSERV_USERNAME'),
+            password=self.get_env('BB_TEST_HASHSERV_PASSWORD'),
+        )
+        return self.admin_client
+
     def setUp(self):
         super().setUp()
+        if "BB_TEST_HASHSERV_USERNAME" in os.environ:
+            self.client = self.start_client(
+                self.server_address,
+                username=os.environ["BB_TEST_HASHSERV_USERNAME"],
+                password=os.environ["BB_TEST_HASHSERV_PASSWORD"],
+            )
         self.client.remove({"method": self.METHOD})
 
     def tearDown(self):
         self.client.remove({"method": self.METHOD})
         super().tearDown()
+
+
+    def test_auth_get_all_users(self):
+        self.skipTest("Cannot test all users with external server")
+