blob: 45493960985f5b16b0451d9fa68874a945bf54c0 [file] [log] [blame]
Dan Shi784df0c2014-11-26 10:11:15 -08001# Copyright 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import mox
6import unittest
7
8import common
9
10import django.core.exceptions
11from autotest_lib.client.common_lib import base_utils as utils
12from autotest_lib.frontend import setup_django_environment
13from autotest_lib.frontend.server import models as server_models
14from autotest_lib.site_utils import server_manager
15
16
17class QueriableList(list):
18 """A mock list object supports queries including filter and all.
19 """
20
21 def filter(self, **kwargs):
22 """Mock the filter call in django model.
23 """
24 raise NotImplementedError()
25
26
27 def all(self):
28 """Return all items in the list.
29
30 @return: All items in the list.
31 """
32 return [item for item in self]
33
34
35class ServerManagerUnittests(mox.MoxTestBase):
36 """Unittest for testing server_manager module.
37 """
38
39 def setUp(self):
40 """Initialize the unittest."""
41 super(ServerManagerUnittests, self).setUp()
42
43 # Initialize test objects.
44 self.DRONE_ROLE = mox.MockObject(
45 server_models.ServerRole,
46 attrs={'role': server_models.ServerRole.ROLE.DRONE})
47 self.SCHEDULER_ROLE = mox.MockObject(
48 server_models.ServerRole,
49 attrs={'role': server_models.ServerRole.ROLE.SCHEDULER})
50 self.DRONE_ATTRIBUTE = mox.MockObject(
51 server_models.ServerAttribute,
52 attrs={'attribute': 'max_processes', 'value':1})
53 self.PRIMARY_DRONE = mox.MockObject(
54 server_models.Server,
55 attrs={'hostname': 'primary_drone_hostname',
56 'status': server_models.Server.STATUS.PRIMARY,
57 'roles': QueriableList([self.DRONE_ROLE]),
58 'attributes': QueriableList([self.DRONE_ATTRIBUTE])})
59 self.BACKUP_DRONE = mox.MockObject(
60 server_models.Server,
61 attrs={'hostname': 'backup_drone_hostname',
62 'status': server_models.Server.STATUS.BACKUP,
63 'roles': QueriableList([self.DRONE_ROLE]),
64 'attributes': QueriableList([self.DRONE_ATTRIBUTE])})
65 self.PRIMARY_SCHEDULER = mox.MockObject(
66 server_models.Server,
67 attrs={'hostname': 'primary_scheduler_hostname',
68 'status': server_models.Server.STATUS.PRIMARY,
69 'roles': QueriableList([self.SCHEDULER_ROLE]),
70 'attributes': QueriableList([])})
71 self.BACKUP_SCHEDULER = mox.MockObject(
72 server_models.Server,
73 attrs={'hostname': 'backup_scheduler_hostname',
74 'status': server_models.Server.STATUS.BACKUP,
75 'roles': QueriableList([self.SCHEDULER_ROLE]),
76 'attributes': QueriableList([])})
77
78 self.mox.StubOutWithMock(server_manager, 'use_server_db')
79 self.mox.StubOutWithMock(server_models.Server.objects, 'create')
80 self.mox.StubOutWithMock(server_models.Server.objects, 'filter')
81 self.mox.StubOutWithMock(server_models.Server.objects, 'get')
82 self.mox.StubOutWithMock(server_models.ServerRole.objects, 'create')
83 self.mox.StubOutWithMock(server_models.ServerRole.objects, 'filter')
84 self.mox.StubOutWithMock(server_models.ServerAttribute.objects,
85 'create')
86 self.mox.StubOutWithMock(server_models.ServerAttribute.objects,
87 'filter')
88 self.mox.StubOutWithMock(utils, 'normalize_hostname')
89
90
91 def testCreateServerSuccess(self):
92 """Test create method can create a server successfully.
93 """
94 utils.normalize_hostname(self.BACKUP_DRONE.hostname)
95 server_models.Server.objects.get(
96 hostname=self.BACKUP_DRONE.hostname
97 ).AndRaise(django.core.exceptions.ObjectDoesNotExist)
98 server_models.Server.objects.create(
99 hostname=mox.IgnoreArg(), status=mox.IgnoreArg(),
100 date_created=mox.IgnoreArg(), note=mox.IgnoreArg()
101 ).AndReturn(self.BACKUP_DRONE)
102 server_models.ServerRole.objects.create(
103 server=mox.IgnoreArg(), role=server_models.ServerRole.ROLE.DRONE
104 ).AndReturn(self.DRONE_ROLE)
105 self.mox.ReplayAll()
106 drone = server_manager.create(hostname=self.BACKUP_DRONE.hostname,
107 role=server_models.ServerRole.ROLE.DRONE)
108
109
110 def testAddRoleToBackupSuccess(self):
111 """Test manager can add a role to a backup server successfully.
112
113 Confirm that database call is made, and no action is taken, e.g.,
114 restart scheduler to activate a new devserver.
115 """
116 server_models.validate(role=server_models.ServerRole.ROLE.DEVSERVER)
117 server_models.ServerRole.objects.filter(
118 server=self.BACKUP_DRONE,
119 role=server_models.ServerRole.ROLE.DEVSERVER).AndReturn(None)
120 server_models.ServerRole.objects.create(
121 server=mox.IgnoreArg(),
122 role=server_models.ServerRole.ROLE.DEVSERVER
123 ).AndReturn(self.DRONE_ROLE)
124 self.mox.ReplayAll()
125 server_manager._add_role(server=self.BACKUP_DRONE,
126 role=server_models.ServerRole.ROLE.DEVSERVER)
127
128
129 def testAddRoleToBackupFail_RoleAlreadyExists(self):
130 """Test manager fails to add a role to a backup server if server already
131 has the given role.
132 """
133 server_models.validate(role=server_models.ServerRole.ROLE.DRONE)
134 server_models.ServerRole.objects.filter(
135 server=self.BACKUP_DRONE,
136 role=server_models.ServerRole.ROLE.DRONE
137 ).AndReturn([self.DRONE_ROLE])
138 self.mox.ReplayAll()
139 self.assertRaises(server_manager.ServerActionError,
140 server_manager._add_role,
141 server=self.BACKUP_DRONE,
142 role=server_models.ServerRole.ROLE.DRONE)
143
144
145 def testDeleteRoleFromBackupSuccess(self):
146 """Test manager can delete a role from a backup server successfully.
147
148 Confirm that database call is made, and no action is taken, e.g.,
149 restart scheduler to delete an existing devserver.
150 """
151 server_models.validate(role=server_models.ServerRole.ROLE.DRONE)
152 server_models.ServerRole.objects.filter(
153 server=self.BACKUP_DRONE,
154 role=server_models.ServerRole.ROLE.DRONE
155 ).AndReturn([self.DRONE_ROLE])
156 self.mox.ReplayAll()
157 server_manager._delete_role(server=self.BACKUP_DRONE,
158 role=server_models.ServerRole.ROLE.DRONE)
159
160
161 def testDeleteRoleFromBackupFail_RoleNotExist(self):
162 """Test manager fails to delete a role from a backup server if the
163 server does not have the given role.
164 """
165 server_models.validate(role=server_models.ServerRole.ROLE.DEVSERVER)
166 server_models.ServerRole.objects.filter(
167 server=self.BACKUP_DRONE,
168 role=server_models.ServerRole.ROLE.DEVSERVER
169 ).AndReturn(None)
170 self.mox.ReplayAll()
171 self.assertRaises(server_manager.ServerActionError,
172 server_manager._delete_role, server=self.BACKUP_DRONE,
173 role=server_models.ServerRole.ROLE.DEVSERVER)
174
175
176 def testChangeStatusSuccess_BackupToPrimary(self):
177 """Test manager can change the status of a backup server to primary.
178 """
179 # TODO(dshi): After _apply_action is implemented, this unittest needs
180 # to be updated to verify various actions being taken to put a server
181 # in primary status, e.g., start scheduler for scheduler server.
182 server_models.validate(status=server_models.Server.STATUS.PRIMARY)
183 self.mox.StubOutWithMock(self.BACKUP_DRONE.roles, 'filter')
184 self.BACKUP_DRONE.roles.filter(
185 role__in=server_models.ServerRole.ROLES_REQUIRE_UNIQUE_INSTANCE
186 ).AndReturn(None)
187 self.mox.ReplayAll()
188 server_manager._change_status(
189 server=self.BACKUP_DRONE,
190 status=server_models.Server.STATUS.PRIMARY)
191
192
193 def testChangeStatusSuccess_PrimaryToBackup(self):
194 """Test manager can change the status of a primary server to backup.
195 """
196 server_models.validate(status=server_models.Server.STATUS.BACKUP)
197 self.mox.StubOutWithMock(self.PRIMARY_DRONE.roles, 'filter')
198 self.PRIMARY_DRONE.roles.filter(
199 role__in=server_models.ServerRole.ROLES_REQUIRE_UNIQUE_INSTANCE
200 ).AndReturn(None)
201 self.mox.ReplayAll()
202 server_manager._change_status(
203 server=self.PRIMARY_DRONE,
204 status=server_models.Server.STATUS.BACKUP)
205
206
207 def testChangeStatusFail_StatusNoChange(self):
208 """Test manager cannot change the status of a server with the same
209 status.
210 """
211 server_models.validate(status=server_models.Server.STATUS.BACKUP)
212 self.mox.ReplayAll()
213 self.assertRaises(server_manager.ServerActionError,
214 server_manager._change_status,
215 server=self.BACKUP_DRONE,
216 status=server_models.Server.STATUS.BACKUP)
217
218
219 def testChangeStatusFail_UniqueInstance(self):
220 """Test manager cannot change the status of a server from backup to
221 primary if there is already a primary exists for role doesn't allow
222 multiple instances.
223 """
224 server_models.validate(status=server_models.Server.STATUS.PRIMARY)
225 self.mox.StubOutWithMock(self.BACKUP_SCHEDULER.roles, 'filter')
226 self.BACKUP_SCHEDULER.roles.filter(
227 role__in=server_models.ServerRole.ROLES_REQUIRE_UNIQUE_INSTANCE
228 ).AndReturn(QueriableList([self.SCHEDULER_ROLE]))
229 server_models.Server.objects.filter(
230 roles__role=self.SCHEDULER_ROLE.role,
231 status=server_models.Server.STATUS.PRIMARY
232 ).AndReturn(QueriableList([self.PRIMARY_SCHEDULER]))
233 self.mox.ReplayAll()
234 self.assertRaises(server_manager.ServerActionError,
235 server_manager._change_status,
236 server=self.BACKUP_SCHEDULER,
237 status=server_models.Server.STATUS.PRIMARY)
238
239
240if '__main__':
241 unittest.main()