blob: 1ad85f70ca32d6d81d06407eda2d1da747a6549a [file] [log] [blame]
Yusuf Mohsinally05c3c552014-05-07 23:56:42 -07001# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import re
10import subprocess
11import time
12import uuid
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import autotest
17from autotest_lib.server.cros import vboot_constants as vboot
18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
19from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
20from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
21from autotest_lib.server.cros.servo import chrome_ec
22from autotest_lib.server.cros.servo_test import ServoTest
23
24
25class ConnectionError(Exception):
26 """Raised on an error of connecting DUT."""
27 pass
28
29
30class FAFTBase(ServoTest):
31 """The base class of FAFT classes.
32
33 It launches the FAFTClient on DUT, such that the test can access its
34 firmware functions and interfaces. It also provides some methods to
35 handle the reboot mechanism, in order to ensure FAFTClient is still
36 connected after reboot.
37 """
38 def initialize(self, host):
39 """Create a FAFTClient object and install the dependency."""
40 super(FAFTBase, self).initialize(host)
41 self._client = host
42 self._autotest_client = autotest.Autotest(self._client)
43 self._autotest_client.install()
44 self.faft_client = RPCProxy(host)
45
46 def wait_for_client(self, install_deps=False, timeout=100):
47 """Wait for the client to come back online.
48
49 New remote processes will be launched if their used flags are enabled.
50
51 @param install_deps: If True, install Autotest dependency when ready.
52 @param timeout: Time in seconds to wait for the client SSH daemon to
53 come up.
54 @raise ConnectionError: Failed to connect DUT.
55 """
56 if not self._client.wait_up(timeout):
57 raise ConnectionError()
58 if install_deps:
59 self._autotest_client.install()
60 # Check the FAFT client is avaiable.
61 self.faft_client.system.is_available()
62
63 def wait_for_client_offline(self, timeout=60, orig_boot_id=None):
64 """Wait for the client to come offline.
65
66 @param timeout: Time in seconds to wait the client to come offline.
67 @param orig_boot_id: A string containing the original boot id.
68 @raise ConnectionError: Failed to connect DUT.
69 """
70 # When running against panther, we see that sometimes
71 # ping_wait_down() does not work correctly. There needs to
72 # be some investigation to the root cause.
73 # If we sleep for 120s before running get_boot_id(), it
74 # does succeed. But if we change this to ping_wait_down()
75 # there are implications on the wait time when running
76 # commands at the fw screens.
77 if not self._client.ping_wait_down(timeout):
78 if orig_boot_id and self._client.get_boot_id() != orig_boot_id:
79 logging.warn('Reboot done very quickly.')
80 return
81 raise ConnectionError()
82
83
84class FirmwareTest(FAFTBase):
85 """
86 The base class of Fully Automated Firmware Test Sequence.
87
88 Many firmware tests require several reboot cycles and verify the resulted
89 system states. To do that, an Autotest test case should detailly handle
90 every action on each step. It makes the test case hard to read and many
91 duplicated code. The base class FirmwareTest is to solve this problem.
92
93 The actions of one reboot cycle is defined in a dict, namely FAFT_STEP.
94 There are four functions in the FAFT_STEP dict:
95 state_checker: a function to check the current is valid or not,
96 returning True if valid, otherwise, False to break the whole
97 test sequence.
98 userspace_action: a function to describe the action ran in userspace.
99 reboot_action: a function to do reboot, default: sync_and_warm_reboot.
100 firmware_action: a function to describe the action ran after reboot.
101
102 And configurations:
103 install_deps_after_boot: if True, install the Autotest dependency after
104 boot; otherwise, do nothing. It is for the cases of recovery mode
105 test. The test boots a USB/SD image instead of an internal image.
106 The previous installed Autotest dependency on the internal image
107 is lost. So need to install it again.
108
109 The default FAFT_STEP checks nothing in state_checker and does nothing in
110 userspace_action and firmware_action. Its reboot_action is a hardware
111 reboot. You can change the default FAFT_STEP by calling
112 self.register_faft_template(FAFT_STEP).
113
114 A FAFT test case consists of several FAFT_STEP's, namely FAFT_SEQUENCE.
115 FAFT_SEQUENCE is an array of FAFT_STEP's. Any missing fields on FAFT_STEP
116 fall back to default.
117
118 In the run_once(), it should register and run FAFT_SEQUENCE like:
119 def run_once(self):
120 self.register_faft_sequence(FAFT_SEQUENCE)
121 self.run_faft_sequnce()
122
123 Note that in the last step, we only run state_checker. The
124 userspace_action, reboot_action, and firmware_action are not executed.
125
126 Attributes:
127 _faft_template: The default FAFT_STEP of each step. The actions would
128 be over-written if the registered FAFT_SEQUENCE is valid.
129 _faft_sequence: The registered FAFT_SEQUENCE.
130 _install_image_path: The URL or the path on the host to the Chrome OS
131 test image to be installed.
132 _firmware_update: Boolean. True if firmware update needed after
133 installing the image.
134 """
135 version = 1
136
137 # Mapping of partition number of kernel and rootfs.
138 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
139 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
140 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
141 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
142
143 CHROMEOS_MAGIC = "CHROMEOS"
144 CORRUPTED_MAGIC = "CORRUPTD"
145
146 _SERVOD_LOG = '/var/log/servod.log'
147
148 _ROOTFS_PARTITION_NUMBER = 3
149
150 _HTTP_PREFIX = 'http://'
151 _DEVSERVER_PORT = '8090'
152
153 _faft_template = {}
154 _faft_sequence = ()
155
156 _install_image_path = None
157 _firmware_update = False
158
159 _backup_firmware_sha = ()
160 _backup_kernel_sha = dict()
161 _backup_cgpt_attr = dict()
162 _backup_gbb_flags = None
163 _backup_dev_mode = None
164
165 # Class level variable, keep track the states of one time setup.
166 # This variable is preserved across tests which inherit this class.
167 _global_setup_done = {
168 'gbb_flags': False,
169 'reimage': False,
170 'usb_check': False,
171 }
172
173 @classmethod
174 def check_setup_done(cls, label):
175 """Check if the given setup is done.
176
177 @param label: The label of the setup.
178 """
179 return cls._global_setup_done[label]
180
181 @classmethod
182 def mark_setup_done(cls, label):
183 """Mark the given setup done.
184
185 @param label: The label of the setup.
186 """
187 cls._global_setup_done[label] = True
188
189 @classmethod
190 def unmark_setup_done(cls, label):
191 """Mark the given setup not done.
192
193 @param label: The label of the setup.
194 """
195 cls._global_setup_done[label] = False
196
197 def initialize(self, host, cmdline_args, ec_wp=None):
198 super(FirmwareTest, self).initialize(host)
199 self.run_id = str(uuid.uuid4())
200 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
201 self.register_faft_template({
202 'state_checker': (None),
203 'userspace_action': (None),
204 'reboot_action': (self.sync_and_warm_reboot),
205 'firmware_action': (None)
206 })
207 # Parse arguments from command line
208 args = {}
209 self.power_control = host.POWER_CONTROL_RPM
210 for arg in cmdline_args:
211 match = re.search("^(\w+)=(.+)", arg)
212 if match:
213 args[match.group(1)] = match.group(2)
214 if 'power_control' in args:
215 self.power_control = args['power_control']
216 if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
217 raise error.TestError('Valid values for --args=power_control '
218 'are %s. But you entered wrong argument '
219 'as "%s".'
220 % (host.POWER_CONTROL_VALID_ARGS,
221 self.power_control))
222 if 'image' in args:
223 self._install_image_path = args['image']
224 logging.info('Install Chrome OS test image path: %s',
225 self._install_image_path)
226 if 'firmware_update' in args and args['firmware_update'].lower() \
227 not in ('0', 'false', 'no'):
228 if self._install_image_path:
229 self._firmware_update = True
230 logging.info('Also update firmware after installing.')
231 else:
232 logging.warning('Firmware update will not not performed '
233 'since no image is specified.')
234
235 self.faft_config = FAFTConfig(
236 self.faft_client.system.get_platform_name())
237 self.checkers = FAFTCheckers(self, self.faft_client)
238
239 if self.faft_config.chrome_ec:
240 self.ec = chrome_ec.ChromeEC(self.servo)
241
242 self.setup_uart_capture()
243 self.setup_servo_log()
244 self.install_test_image(self._install_image_path, self._firmware_update)
245 self.record_system_info()
246 self.setup_gbb_flags()
247 self.stop_service('update-engine')
248 self.setup_ec_write_protect(ec_wp)
249 logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
250
251 def cleanup(self):
252 """Autotest cleanup function."""
253 # Unset state checker in case it's set by subclass
254 self.register_faft_template({'state_checker': None})
255
256 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
257 try:
258 self.faft_client.system.is_available()
259 except:
260 # Remote is not responding. Revive DUT so that subsequent tests
261 # don't fail.
262 self._restore_routine_from_timeout()
263 self.restore_dev_mode()
264 self.restore_ec_write_protect()
265 self.restore_gbb_flags()
266 self.start_service('update-engine')
267 self.record_servo_log()
268 self.record_faft_client_log()
269 self.cleanup_uart_capture()
270 self._faft_sequence = ()
271 self._faft_template = {}
272 super(FirmwareTest, self).cleanup()
273 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
274
275 def record_system_info(self):
276 """Record some critical system info to the attr keyval.
277
278 This info is used by generate_test_report and local_dash later.
279 """
280 self.write_attr_keyval({
281 'fw_version': self.faft_client.ec.get_version(),
282 'hwid': self.faft_client.system.get_crossystem_value('hwid'),
283 'fwid': self.faft_client.system.get_crossystem_value('fwid'),
284 })
285
286 def invalidate_firmware_setup(self):
287 """Invalidate all firmware related setup state.
288
289 This method is called when the firmware is re-flashed. It resets all
290 firmware related setup states so that the next test setup properly
291 again.
292 """
293 self.unmark_setup_done('gbb_flags')
294
295 def _retrieve_recovery_reason_from_trap(self):
296 """Try to retrieve the recovery reason from a trapped recovery screen.
297
298 @return: The recovery_reason, 0 if any error.
299 """
300 recovery_reason = 0
301 logging.info('Try to retrieve recovery reason...')
302 if self.servo.get_usbkey_direction() == 'dut':
303 self.wait_fw_screen_and_plug_usb()
304 else:
305 self.servo.switch_usbkey('dut')
306
307 try:
308 self.wait_for_client(install_deps=True)
309 lines = self.faft_client.system.run_shell_command_get_output(
310 'crossystem recovery_reason')
311 recovery_reason = int(lines[0])
312 logging.info('Got the recovery reason %d.', recovery_reason)
313 except ConnectionError:
314 logging.error('Failed to get the recovery reason due to connection '
315 'error.')
316 return recovery_reason
317
318 def _reset_client(self):
319 """Reset client to a workable state.
320
321 This method is called when the client is not responsive. It may be
322 caused by the following cases:
323 - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
324 - corrupted firmware;
325 - corrutped OS image.
326 """
327 # DUT may halt on a firmware screen. Try cold reboot.
328 logging.info('Try cold reboot...')
329 self.cold_reboot()
330 self.wait_for_client_offline()
331 self.wait_dev_screen_and_ctrl_d()
332 try:
333 self.wait_for_client()
334 return
335 except ConnectionError:
336 logging.warn('Cold reboot doesn\'t help, still connection error.')
337
338 # DUT may be broken by a corrupted firmware. Restore firmware.
339 # We assume the recovery boot still works fine. Since the recovery
340 # code is in RO region and all FAFT tests don't change the RO region
341 # except GBB.
342 if self.is_firmware_saved():
343 self._ensure_client_in_recovery()
344 logging.info('Try restore the original firmware...')
345 if self.is_firmware_changed():
346 try:
347 self.restore_firmware()
348 return
349 except ConnectionError:
350 logging.warn('Restoring firmware doesn\'t help, still '
351 'connection error.')
352
353 # Perhaps it's kernel that's broken. Let's try restoring it.
354 if self.is_kernel_saved():
355 self._ensure_client_in_recovery()
356 logging.info('Try restore the original kernel...')
357 if self.is_kernel_changed():
358 try:
359 self.restore_kernel()
360 return
361 except ConnectionError:
362 logging.warn('Restoring kernel doesn\'t help, still '
363 'connection error.')
364
365 # DUT may be broken by a corrupted OS image. Restore OS image.
366 self._ensure_client_in_recovery()
367 logging.info('Try restore the OS image...')
368 self.faft_client.system.run_shell_command('chromeos-install --yes')
369 self.sync_and_warm_reboot()
370 self.wait_for_client_offline()
371 self.wait_dev_screen_and_ctrl_d()
372 try:
373 self.wait_for_client(install_deps=True)
374 logging.info('Successfully restore OS image.')
375 return
376 except ConnectionError:
377 logging.warn('Restoring OS image doesn\'t help, still connection '
378 'error.')
379
380 def _ensure_client_in_recovery(self):
381 """Ensure client in recovery boot; reboot into it if necessary.
382
383 @raise TestError: if failed to boot the USB image.
384 """
385 logging.info('Try boot into USB image...')
386 self.servo.switch_usbkey('host')
387 self.enable_rec_mode_and_reboot()
388 self.wait_fw_screen_and_plug_usb()
389 try:
390 self.wait_for_client(install_deps=True)
391 except ConnectionError:
392 raise error.TestError('Failed to boot the USB image.')
393
394 def _restore_routine_from_timeout(self, next_step=None):
395 """A routine to try to restore the system from a timeout error.
396
397 This method is called when FAFT failed to connect DUT after reboot.
398
399 @param next_step: Optional, a FAFT_STEP dict of the next step, which is
400 used for diagnostic.
401 @raise TestFail: This exception is already raised, with a decription
402 why it failed.
403 """
404 # DUT is disconnected. Capture the UART output for debug.
405 self.record_uart_capture()
406
407 next_checker_matched = False
408 if next_step is not None:
409 next_test = {}
410 next_test.update(self._faft_template)
411 next_test.update(next_step)
412
413 # TODO(waihong@chromium.org): Implement replugging the Ethernet to
414 # identify if it is a network flaky.
415
416 recovery_reason = self._retrieve_recovery_reason_from_trap()
417 if next_step is not None and recovery_reason:
418 if self._call_action(next_test['state_checker']):
419 # Repluging the USB can pass the state_checker of the next step,
420 # meaning that the firmware failed to boot into USB directly.
421 next_checker_matched = True
422
423 # Reset client to a workable state.
424 self._reset_client()
425
426 # Raise the proper TestFail exception.
427 if next_checker_matched:
428 raise error.TestFail('Firmware failed to auto-boot USB in the '
429 'recovery boot (reason: %d)' % recovery_reason)
430 elif recovery_reason:
431 raise error.TestFail('Trapped in the recovery screen (reason: %d) '
432 'and timed out' % recovery_reason)
433 else:
434 raise error.TestFail('Timed out waiting for DUT reboot')
435
436 def assert_test_image_in_usb_disk(self, usb_dev=None, install_shim=False):
437 """Assert an USB disk plugged-in on servo and a test image inside.
438
439 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
440 If None, it is detected automatically.
441 @param install_shim: True to verify an install shim instead of a test
442 image.
443 @raise TestError: if USB disk not detected or not a test (install shim)
444 image.
445 """
446 if self.check_setup_done('usb_check'):
447 return
448 if usb_dev:
449 assert self.servo.get_usbkey_direction() == 'host'
450 else:
451 self.servo.switch_usbkey('host')
452 usb_dev = self.servo.probe_host_usb_dev()
453 if not usb_dev:
454 raise error.TestError(
455 'An USB disk should be plugged in the servo board.')
456
457 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
458 logging.info('usb dev is %s', usb_dev)
459 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
460 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
461
462 if install_shim:
463 dir_list = self.servo.system_output('ls -a %s' %
464 os.path.join(tmpd, 'root'))
465 check_passed = '.factory_installer' in dir_list
466 else:
467 check_passed = self.servo.system_output(
468 'grep -i "CHROMEOS_RELEASE_DESCRIPTION=.*test" %s' %
469 os.path.join(tmpd, 'etc/lsb-release'),
470 ignore_status=True)
471 for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
472 self.servo.system(cmd)
473
474 if not check_passed:
475 raise error.TestError(
476 'No Chrome OS %s found on the USB flash plugged into servo' %
477 'install shim' if install_shim else 'test')
478
479 self.mark_setup_done('usb_check')
480
481 def setup_usbkey(self, usbkey, host=None, install_shim=False):
482 """Setup the USB disk for the test.
483
484 It checks the setup of USB disk and a valid ChromeOS test image inside.
485 It also muxes the USB disk to either the host or DUT by request.
486
487 @param usbkey: True if the USB disk is required for the test, False if
488 not required.
489 @param host: Optional, True to mux the USB disk to host, False to mux it
490 to DUT, default to do nothing.
491 @param install_shim: True to verify an install shim instead of a test
492 image.
493 """
494 if usbkey:
495 self.assert_test_image_in_usb_disk(install_shim=install_shim)
496 elif host is None:
497 # USB disk is not required for the test. Better to mux it to host.
498 host = True
499
500 if host is True:
501 self.servo.switch_usbkey('host')
502 elif host is False:
503 self.servo.switch_usbkey('dut')
504
505 def get_usbdisk_path_on_dut(self):
506 """Get the path of the USB disk device plugged-in the servo on DUT.
507
508 Returns:
509 A string representing USB disk path, like '/dev/sdb', or None if
510 no USB disk is found.
511 """
512 cmd = 'ls -d /dev/s*[a-z]'
513 original_value = self.servo.get_usbkey_direction()
514
515 # Make the dut unable to see the USB disk.
516 self.servo.switch_usbkey('off')
517 no_usb_set = set(
518 self.faft_client.system.run_shell_command_get_output(cmd))
519
520 # Make the dut able to see the USB disk.
521 self.servo.switch_usbkey('dut')
522 time.sleep(self.faft_config.between_usb_plug)
523 has_usb_set = set(
524 self.faft_client.system.run_shell_command_get_output(cmd))
525
526 # Back to its original value.
527 if original_value != self.servo.get_usbkey_direction():
528 self.servo.switch_usbkey(original_value)
529
530 diff_set = has_usb_set - no_usb_set
531 if len(diff_set) == 1:
532 return diff_set.pop()
533 else:
534 return None
535
536 def get_server_address(self):
537 """Get the server address seen from the client.
538
539 @return: A string of the server address.
540 """
541 r = self.faft_client.system.run_shell_command_get_output(
542 "echo $SSH_CLIENT")
543 return r[0].split()[0]
544
545 def install_test_image(self, image_path=None, firmware_update=False):
546 """Install the test image specied by the path onto the USB and DUT disk.
547
548 The method first copies the image to USB disk and reboots into it via
549 recovery mode. Then runs 'chromeos-install' (and possible
550 chromeos-firmwareupdate') to install it to DUT disk.
551
552 Sample command line:
553
554 run_remote_tests.sh --servo --board=daisy --remote=w.x.y.z \
555 --args="image=/tmp/chromiumos_test_image.bin firmware_update=True" \
556 server/site_tests/firmware_XXXX/control
557
558 This test requires an automated recovery to occur while simulating
559 inserting and removing the usb key from the servo. To allow this the
560 following hardware setup is required:
561 1. servo2 board connected via servoflex.
562 2. USB key inserted in the servo2.
563 3. servo2 connected to the dut via dut_hub_in in the usb 2.0 slot.
564 4. network connected via usb dongle in the dut in usb 3.0 slot.
565
566 @param image_path: An URL or a path on the host to the test image.
567 @param firmware_update: Also update the firmware after installing.
568 @raise TestError: If devserver failed to start.
569 """
570 if not image_path:
571 return
572
573 if self.check_setup_done('reimage'):
574 return
575
576 if image_path.startswith(self._HTTP_PREFIX):
577 # TODO(waihong@chromium.org): Add the check of the URL to ensure
578 # it is a test image.
579 devserver = None
580 image_url = image_path
581 elif self.servo.is_localhost():
582 # If servod is localhost, i.e. both servod and FAFT see the same
583 # file system, do nothing.
584 devserver = None
585 image_url = image_path
586 else:
587 image_dir, image_base = os.path.split(image_path)
588 logging.info('Starting devserver to serve the image...')
589 # The following stdout and stderr arguments should not be None,
590 # even we don't use them. Otherwise, the socket of devserve is
591 # created as fd 1 (as no stdout) but it still thinks stdout is fd
592 # 1 and dump the log to the socket. Wrong HTTP protocol happens.
593 devserver = subprocess.Popen(['/usr/lib/devserver/devserver.py',
594 '--archive_dir=%s' % image_dir,
595 '--port=%s' % self._DEVSERVER_PORT],
596 stdout=subprocess.PIPE,
597 stderr=subprocess.PIPE)
598 image_url = '%s%s:%s/static/%s' % (
599 self._HTTP_PREFIX,
600 self.get_server_address(),
601 self._DEVSERVER_PORT,
602 image_base)
603
604 # Wait devserver startup completely
605 time.sleep(self.faft_config.devserver)
606 # devserver is a service running forever. If it is terminated,
607 # some error does happen.
608 if devserver.poll():
609 raise error.TestError('Starting devserver failed, '
610 'returning %d.' % devserver.returncode)
611
612 logging.info('Ask Servo to install the image from %s', image_url)
613 self.servo.image_to_servo_usb(image_url)
614
615 self.assert_test_image_in_usb_disk()
616
617 if devserver and devserver.poll() is None:
618 logging.info('Shutting down devserver...')
619 devserver.terminate()
620
621 # DUT is powered off while imaging servo USB.
622 # Now turn it on.
623 self.servo.power_short_press()
624 self.wait_for_client()
625 self.servo.switch_usbkey('dut')
626
627 install_cmd = 'chromeos-install --yes'
628 if firmware_update:
629 install_cmd += ' && chromeos-firmwareupdate --mode recovery'
630 self.backup_firmware()
631 self.backup_kernel()
632
633 self.register_faft_sequence((
634 { # Step 1, request recovery boot
635 'state_checker': (self.checkers.crossystem_checker, {
636 'mainfw_type': ('developer', 'normal'),
637 }),
638 'userspace_action': (
639 self.faft_client.system.request_recovery_boot),
640 'firmware_action': self.wait_fw_screen_and_plug_usb,
641 'install_deps_after_boot': True,
642 },
643 { # Step 2, expected recovery boot
644 'state_checker': (self.checkers.crossystem_checker, {
645 'mainfw_type': 'recovery',
646 'recovery_reason' : vboot.RECOVERY_REASON['US_TEST'],
647 }),
648 'userspace_action': (self.faft_client.system.run_shell_command,
649 install_cmd),
650 'reboot_action': self.cold_reboot,
651 'install_deps_after_boot': True,
652 },
653 { # Step 3, expected normal or developer boot (not recovery)
654 'state_checker': (self.checkers.crossystem_checker, {
655 'mainfw_type': ('developer', 'normal')
656 }),
657 },
658 ))
659 self.run_faft_sequence()
660
661 if firmware_update:
662 self.clear_saved_firmware()
663 self.clear_saved_kernel()
664
665 # 'Unplug' any USB keys in the servo from the dut.
666 self.servo.switch_usbkey('host')
667 # Mark usb_check done so it won't check a test image in USB anymore.
668 self.mark_setup_done('usb_check')
669 self.mark_setup_done('reimage')
670
671 def stop_service(self, service):
672 """Stops a upstart service on the client.
673
674 @param service: The name of the upstart service.
675 """
676 logging.info('Stopping %s...', service)
677 command = 'status %s | grep stop || stop %s' % (service, service)
678 self.faft_client.system.run_shell_command(command)
679
680 def start_service(self, service):
681 """Starts a upstart service on the client.
682
683 @param service: The name of the upstart service.
684 """
685 logging.info('Starting %s...', service)
686 command = 'status %s | grep start || start %s' % (service, service)
687 self.faft_client.system.run_shell_command(command)
688
689 def write_gbb_flags(self, new_flags):
690 """Write the GBB flags to the current firmware.
691
692 @param new_flags: The flags to write.
693 """
694 gbb_flags = self.faft_client.bios.get_gbb_flags()
695 if gbb_flags == new_flags:
696 return
697 logging.info('Changing GBB flags from 0x%x to 0x%x.',
698 gbb_flags, new_flags)
699 self.faft_client.system.run_shell_command(
700 '/usr/share/vboot/bin/set_gbb_flags.sh 0x%x' % new_flags)
701 self.faft_client.bios.reload()
702 # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
703 if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
704 self.run_faft_step({
705 'firmware_action': self.wait_dev_screen_and_ctrl_d,
706 })
707
708 def clear_set_gbb_flags(self, clear_mask, set_mask):
709 """Clear and set the GBB flags in the current flashrom.
710
711 @param clear_mask: A mask of flags to be cleared.
712 @param set_mask: A mask of flags to be set.
713 """
714 gbb_flags = self.faft_client.bios.get_gbb_flags()
715 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
716 self.write_gbb_flags(new_flags)
717
718 def check_ec_capability(self, required_cap=None, suppress_warning=False):
719 """Check if current platform has required EC capabilities.
720
721 @param required_cap: A list containing required EC capabilities. Pass in
722 None to only check for presence of Chrome EC.
723 @param suppress_warning: True to suppress any warning messages.
724 @return: True if requirements are met. Otherwise, False.
725 """
726 if not self.faft_config.chrome_ec:
727 if not suppress_warning:
728 logging.warn('Requires Chrome EC to run this test.')
729 return False
730
731 if not required_cap:
732 return True
733
734 for cap in required_cap:
735 if cap not in self.faft_config.ec_capability:
736 if not suppress_warning:
737 logging.warn('Requires EC capability "%s" to run this '
738 'test.', cap)
739 return False
740
741 return True
742
743 def check_root_part_on_non_recovery(self, part):
744 """Check the partition number of root device and on normal/dev boot.
745
746 @param part: A string of partition number, e.g.'3'.
747 @return: True if the root device matched and on normal/dev boot;
748 otherwise, False.
749 """
750 return self.checkers.root_part_checker(part) and \
751 self.checkers.crossystem_checker({
752 'mainfw_type': ('normal', 'developer'),
753 })
754
755 def _join_part(self, dev, part):
756 """Return a concatenated string of device and partition number.
757
758 @param dev: A string of device, e.g.'/dev/sda'.
759 @param part: A string of partition number, e.g.'3'.
760 @return: A concatenated string of device and partition number,
761 e.g.'/dev/sda3'.
762
763 >>> seq = FirmwareTest()
764 >>> seq._join_part('/dev/sda', '3')
765 '/dev/sda3'
766 >>> seq._join_part('/dev/mmcblk0', '2')
767 '/dev/mmcblk0p2'
768 """
769 if 'mmcblk' in dev:
770 return dev + 'p' + part
771 else:
772 return dev + part
773
774 def copy_kernel_and_rootfs(self, from_part, to_part):
775 """Copy kernel and rootfs from from_part to to_part.
776
777 @param from_part: A string of partition number to be copied from.
778 @param to_part: A string of partition number to be copied to.
779 """
780 root_dev = self.faft_client.system.get_root_dev()
781 logging.info('Copying kernel from %s to %s. Please wait...',
782 from_part, to_part)
783 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
784 (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
785 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
786 logging.info('Copying rootfs from %s to %s. Please wait...',
787 from_part, to_part)
788 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
789 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
790 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
791
792 def ensure_kernel_boot(self, part):
793 """Ensure the request kernel boot.
794
795 If not, it duplicates the current kernel to the requested kernel
796 and sets the requested higher priority to ensure it boot.
797
798 @param part: A string of kernel partition number or 'a'/'b'.
799 """
800 if not self.checkers.root_part_checker(part):
801 if self.faft_client.kernel.diff_a_b():
802 self.copy_kernel_and_rootfs(
803 from_part=self.OTHER_KERNEL_MAP[part],
804 to_part=part)
805 self.run_faft_step({
806 'userspace_action': (self.reset_and_prioritize_kernel, part),
807 })
808
809 def set_hardware_write_protect(self, enable):
810 """Set hardware write protect pin.
811
812 @param enable: True if asserting write protect pin. Otherwise, False.
813 """
814 self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
815 self.servo.set('fw_wp_en', 'on')
816 self.servo.set('fw_wp', 'on' if enable else 'off')
817
818 def set_ec_write_protect_and_reboot(self, enable):
819 """Set EC write protect status and reboot to take effect.
820
821 The write protect state is only activated if both hardware write
822 protect pin is asserted and software write protect flag is set.
823 This method asserts/deasserts hardware write protect pin first, and
824 set corresponding EC software write protect flag.
825
826 If the device uses non-Chrome EC, set the software write protect via
827 flashrom.
828
829 If the device uses Chrome EC, a reboot is required for write protect
830 to take effect. Since the software write protect flag cannot be unset
831 if hardware write protect pin is asserted, we need to deasserted the
832 pin first if we are deactivating write protect. Similarly, a reboot
833 is required before we can modify the software flag.
834
835 @param enable: True if activating EC write protect. Otherwise, False.
836 """
837 self.set_hardware_write_protect(enable)
838 if self.faft_config.chrome_ec:
839 self.set_chrome_ec_write_protect_and_reboot(enable)
840 else:
841 self.faft_client.ec.set_write_protect(enable)
842 self.sync_and_warm_reboot()
843
844 def set_chrome_ec_write_protect_and_reboot(self, enable):
845 """Set Chrome EC write protect status and reboot to take effect.
846
847 @param enable: True if activating EC write protect. Otherwise, False.
848 """
849 if enable:
850 # Set write protect flag and reboot to take effect.
851 self.ec.set_flash_write_protect(enable)
852 self.sync_and_ec_reboot()
853 else:
854 # Reboot after deasserting hardware write protect pin to deactivate
855 # write protect. And then remove software write protect flag.
856 self.sync_and_ec_reboot()
857 self.ec.set_flash_write_protect(enable)
858
859 def setup_ec_write_protect(self, ec_wp):
860 """Setup for EC write-protection.
861
862 It makes sure the EC in the requested write-protection state. If not, it
863 flips the state. Flipping the write-protection requires DUT reboot.
864
865 @param ec_wp: True to request EC write-protected; False to request EC
866 not write-protected; None to do nothing.
867 """
868 if ec_wp is None:
869 self._old_ec_wp = None
870 return
871 self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
872 if ec_wp != self._old_ec_wp:
873 logging.info('The test required EC is %swrite-protected. Reboot '
874 'and flip the state.', '' if ec_wp else 'not ')
875 self.run_faft_step({
876 'reboot_action': (self.set_ec_write_protect_and_reboot, ec_wp),
877 'firmware_action': self.wait_dev_screen_and_ctrl_d,
878 })
879
880 def restore_ec_write_protect(self):
881 """Restore the original EC write-protection."""
882 if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
883 return
884 if not self.checkers.crossystem_checker(
885 {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
886 logging.info('Restore the original EC write protection and reboot.')
887 self.run_faft_step({
888 'reboot_action': (self.set_ec_write_protect_and_reboot,
889 self._old_ec_wp),
890 'firmware_action': self.wait_dev_screen_and_ctrl_d,
891 })
892
893 def press_ctrl_d(self, press_secs=''):
894 """Send Ctrl-D key to DUT.
895
896 @param press_secs : Str. Time to press key.
897 """
898 self.servo.ctrl_d(press_secs)
899
900 def press_ctrl_u(self):
901 """Send Ctrl-U key to DUT.
902
903 @raise TestError: if a non-Chrome EC device or no Ctrl-U command given
904 on a no-build-in-keyboard device.
905 """
906 if not self.faft_config.has_keyboard:
907 self.servo.ctrl_u()
908 elif self.check_ec_capability(['keyboard'], suppress_warning=True):
909 self.ec.key_down('<ctrl_l>')
910 self.ec.key_down('u')
911 self.ec.key_up('u')
912 self.ec.key_up('<ctrl_l>')
913 elif self.faft_config.has_keyboard:
914 raise error.TestError(
915 "Can't send Ctrl-U to DUT without using Chrome EC.")
916 else:
917 raise error.TestError(
918 "Should specify the ctrl_u_cmd argument.")
919
920 def press_enter(self, press_secs=''):
921 """Send Enter key to DUT.
922
923 @param press_secs: Seconds of holding the key.
924 """
925 self.servo.enter_key(press_secs)
926
927 def wait_dev_screen_and_ctrl_d(self):
928 """Wait for firmware warning screen and press Ctrl-D."""
929 time.sleep(self.faft_config.dev_screen)
930 self.press_ctrl_d()
931
932 def wait_fw_screen_and_ctrl_d(self):
933 """Wait for firmware warning screen and press Ctrl-D."""
934 time.sleep(self.faft_config.firmware_screen)
935 self.press_ctrl_d()
936
937 def wait_fw_screen_and_ctrl_u(self):
938 """Wait for firmware warning screen and press Ctrl-U."""
939 time.sleep(self.faft_config.firmware_screen)
940 self.press_ctrl_u()
941
942 def wait_fw_screen_and_trigger_recovery(self, need_dev_transition=False):
943 """Wait for firmware warning screen and trigger recovery boot.
944
945 @param need_dev_transition: True when needs dev mode transition, only
946 for Alex/ZGB.
947 """
948 time.sleep(self.faft_config.firmware_screen)
949
950 # Pressing Enter for too long triggers a second key press.
951 # Let's press it without delay
952 self.press_enter(press_secs=0)
953
954 # For Alex/ZGB, there is a dev warning screen in text mode.
955 # Skip it by pressing Ctrl-D.
956 if need_dev_transition:
957 time.sleep(self.faft_config.legacy_text_screen)
958 self.press_ctrl_d()
959
960 def wait_fw_screen_and_unplug_usb(self):
961 """Wait for firmware warning screen and then unplug the servo USB."""
962 time.sleep(self.faft_config.load_usb)
963 self.servo.switch_usbkey('host')
964 time.sleep(self.faft_config.between_usb_plug)
965
966 def wait_fw_screen_and_plug_usb(self):
967 """Wait for firmware warning screen and then unplug and plug the USB."""
968 self.wait_fw_screen_and_unplug_usb()
969 self.servo.switch_usbkey('dut')
970
971 def wait_fw_screen_and_press_power(self):
972 """Wait for firmware warning screen and press power button."""
973 time.sleep(self.faft_config.firmware_screen)
974 # While the firmware screen, the power button probing loop sleeps
975 # 0.25 second on every scan. Use the normal delay (1.2 second) for
976 # power press.
977 self.servo.power_normal_press()
978
979 def wait_longer_fw_screen_and_press_power(self):
980 """Wait for firmware screen without timeout and press power button."""
981 time.sleep(self.faft_config.dev_screen_timeout)
982 self.wait_fw_screen_and_press_power()
983
984 def wait_fw_screen_and_close_lid(self):
985 """Wait for firmware warning screen and close lid."""
986 time.sleep(self.faft_config.firmware_screen)
987 self.servo.lid_close()
988
989 def wait_longer_fw_screen_and_close_lid(self):
990 """Wait for firmware screen without timeout and close lid."""
991 time.sleep(self.faft_config.firmware_screen)
992 self.wait_fw_screen_and_close_lid()
993
994 def setup_uart_capture(self):
995 """Setup the CPU/EC UART capture."""
996 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
997 self.servo.set('cpu_uart_capture', 'on')
998 self.ec_uart_file = None
999 if self.faft_config.chrome_ec:
1000 try:
1001 self.servo.set('ec_uart_capture', 'on')
1002 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
1003 except error.TestFail as e:
1004 if 'No control named' in str(e):
1005 logging.warn('The servod is too old that ec_uart_capture '
1006 'not supported.')
1007 else:
1008 logging.info('Not a Google EC, cannot capture ec console output.')
1009
1010 def record_uart_capture(self):
1011 """Record the CPU/EC UART output stream to files."""
1012 if self.cpu_uart_file:
1013 with open(self.cpu_uart_file, 'a') as f:
1014 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
1015 if self.ec_uart_file and self.faft_config.chrome_ec:
1016 with open(self.ec_uart_file, 'a') as f:
1017 f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
1018
1019 def cleanup_uart_capture(self):
1020 """Cleanup the CPU/EC UART capture."""
1021 # Flush the remaining UART output.
1022 self.record_uart_capture()
1023 self.servo.set('cpu_uart_capture', 'off')
1024 if self.ec_uart_file and self.faft_config.chrome_ec:
1025 self.servo.set('ec_uart_capture', 'off')
1026
1027 def fetch_servo_log(self):
1028 """Fetch the servo log."""
1029 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
1030 servo_log = self.servo.system_output(cmd)
1031 return None if servo_log == 'NOTFOUND' else servo_log
1032
1033 def setup_servo_log(self):
1034 """Setup the servo log capturing."""
1035 self.servo_log_original_len = -1
1036 if self.servo.is_localhost():
1037 # No servo log recorded when servod runs locally.
1038 return
1039
1040 servo_log = self.fetch_servo_log()
1041 if servo_log:
1042 self.servo_log_original_len = len(servo_log)
1043 else:
1044 logging.warn('Servo log file not found.')
1045
1046 def record_servo_log(self):
1047 """Record the servo log to the results directory."""
1048 if self.servo_log_original_len != -1:
1049 servo_log = self.fetch_servo_log()
1050 servo_log_file = os.path.join(self.resultsdir, 'servod.log')
1051 with open(servo_log_file, 'a') as f:
1052 f.write(servo_log[self.servo_log_original_len:])
1053
1054 def record_faft_client_log(self):
1055 """Record the faft client log to the results directory."""
1056 client_log = self.faft_client.system.dump_log(True)
1057 client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
1058 with open(client_log_file, 'w') as f:
1059 f.write(client_log)
1060
1061 def setup_gbb_flags(self):
1062 """Setup the GBB flags for FAFT test."""
1063 if self.faft_config.gbb_version < 1.1:
1064 logging.info('Skip modifying GBB on versions older than 1.1.')
1065 return
1066
1067 if self.check_setup_done('gbb_flags'):
1068 return
1069
1070 self._backup_gbb_flags = self.faft_client.bios.get_gbb_flags()
1071
1072 logging.info('Set proper GBB flags for test.')
1073 self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
1074 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
1075 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
1076 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK,
1077 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
1078 vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
1079 self.mark_setup_done('gbb_flags')
1080
1081 def drop_backup_gbb_flags(self):
1082 """Drops the backup GBB flags.
1083
1084 This can be used when a test intends to permanently change GBB flags.
1085 """
1086 self._backup_gbb_flags = None
1087
1088 def restore_gbb_flags(self):
1089 """Restore GBB flags to their original state."""
1090 if not self._backup_gbb_flags:
1091 return
1092 self.write_gbb_flags(self._backup_gbb_flags)
1093 self.unmark_setup_done('gbb_flags')
1094
1095 def setup_tried_fwb(self, tried_fwb):
1096 """Setup for fw B tried state.
1097
1098 It makes sure the system in the requested fw B tried state. If not, it
1099 tries to do so.
1100
1101 @param tried_fwb: True if requested in tried_fwb=1;
1102 False if tried_fwb=0.
1103 """
1104 if tried_fwb:
1105 if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
1106 logging.info(
1107 'Firmware is not booted with tried_fwb. Reboot into it.')
1108 self.run_faft_step({
1109 'userspace_action': self.faft_client.system.set_try_fw_b,
1110 })
1111 else:
1112 if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
1113 logging.info(
1114 'Firmware is booted with tried_fwb. Reboot to clear.')
1115 self.run_faft_step({})
1116
1117 def power_on(self):
1118 """Switch DUT AC power on."""
1119 self._client.power_on(self.power_control)
1120
1121 def power_off(self):
1122 """Switch DUT AC power off."""
1123 self._client.power_off(self.power_control)
1124
1125 def power_cycle(self):
1126 """Power cycle DUT AC power."""
1127 self._client.power_cycle(self.power_control)
1128
1129 def enable_rec_mode_and_reboot(self):
1130 """Switch to rec mode and reboot.
1131
1132 This method emulates the behavior of the old physical recovery switch,
1133 i.e. switch ON + reboot + switch OFF, and the new keyboard controlled
1134 recovery mode, i.e. just press Power + Esc + Refresh.
1135 """
1136 if self.faft_config.chrome_ec:
1137 # Reset twice to emulate a long recovery-key-combo hold.
1138 cold_reset_num = 2 if self.faft_config.long_rec_combo else 1
1139 for i in range(cold_reset_num):
1140 if i:
1141 time.sleep(self.faft_config.ec_boot_to_console)
1142 # Cold reset to clear EC_IN_RW signal
1143 self.servo.set('cold_reset', 'on')
1144 time.sleep(self.faft_config.hold_cold_reset)
1145 self.servo.set('cold_reset', 'off')
1146 time.sleep(self.faft_config.ec_boot_to_console)
1147 self.ec.reboot("ap-off")
1148 time.sleep(self.faft_config.ec_boot_to_console)
1149 self.ec.set_hostevent(chrome_ec.HOSTEVENT_KEYBOARD_RECOVERY)
1150 self.servo.power_short_press()
1151 elif self.faft_config.broken_rec_mode:
1152 self.power_cycle()
1153 logging.info('Booting to recovery mode.')
1154 self.servo.custom_recovery_mode()
1155 else:
1156 self.servo.enable_recovery_mode()
1157 self.cold_reboot()
1158 time.sleep(self.faft_config.ec_boot_to_console)
1159 self.servo.disable_recovery_mode()
1160
1161 def enable_dev_mode_and_reboot(self):
1162 """Switch to developer mode and reboot."""
1163 if self.faft_config.keyboard_dev:
1164 self.enable_keyboard_dev_mode()
1165 else:
1166 self.servo.enable_development_mode()
1167 self.faft_client.system.run_shell_command(
1168 'chromeos-firmwareupdate --mode todev && reboot')
1169
1170 def enable_normal_mode_and_reboot(self):
1171 """Switch to normal mode and reboot."""
1172 if self.faft_config.keyboard_dev:
1173 self.disable_keyboard_dev_mode()
1174 else:
1175 self.servo.disable_development_mode()
1176 self.faft_client.system.run_shell_command(
1177 'chromeos-firmwareupdate --mode tonormal && reboot')
1178
1179 def wait_fw_screen_and_switch_keyboard_dev_mode(self, dev):
1180 """Wait for firmware screen and then switch into or out of dev mode.
1181
1182 @param dev: True if switching into dev mode. Otherwise, False.
1183 """
1184 time.sleep(self.faft_config.firmware_screen)
1185 if dev:
1186 self.press_ctrl_d()
1187 time.sleep(self.faft_config.confirm_screen)
1188 if self.faft_config.rec_button_dev_switch:
1189 logging.info('RECOVERY button pressed to switch to dev mode')
1190 self.servo.set('rec_mode', 'on')
1191 time.sleep(self.faft_config.hold_cold_reset)
1192 self.servo.set('rec_mode', 'off')
1193 else:
1194 logging.info('ENTER pressed to switch to dev mode')
1195 self.press_enter()
1196 else:
1197 self.press_enter()
1198 time.sleep(self.faft_config.confirm_screen)
1199 self.press_enter()
1200
1201 def enable_keyboard_dev_mode(self):
1202 """Enable keyboard controlled developer mode"""
1203 logging.info("Enabling keyboard controlled developer mode")
1204 # Plug out USB disk for preventing recovery boot without warning
1205 self.servo.switch_usbkey('host')
1206 # Rebooting EC with rec mode on. Should power on AP.
1207 self.enable_rec_mode_and_reboot()
1208 self.wait_for_client_offline()
1209 self.wait_fw_screen_and_switch_keyboard_dev_mode(dev=True)
1210
1211 # TODO (crosbug.com/p/16231) remove this conditional completely if/when
1212 # issue is resolved.
1213 if self.faft_config.platform == 'Parrot':
1214 self.wait_for_client_offline()
1215 self.cold_reboot()
1216
1217 def disable_keyboard_dev_mode(self):
1218 """Disable keyboard controlled developer mode"""
1219 logging.info("Disabling keyboard controlled developer mode")
1220 if (not self.faft_config.chrome_ec and
1221 not self.faft_config.broken_rec_mode):
1222 self.servo.disable_recovery_mode()
1223 self.cold_reboot()
1224 self.wait_for_client_offline()
1225 self.wait_fw_screen_and_switch_keyboard_dev_mode(dev=False)
1226
1227 def setup_dev_mode(self, dev_mode):
1228 """Setup for development mode.
1229
1230 It makes sure the system in the requested normal/dev mode. If not, it
1231 tries to do so.
1232
1233 @param dev_mode: True if requested in dev mode; False if normal mode.
1234 """
1235 # Change the default firmware_action for dev mode passing the fw screen.
1236 self.register_faft_template({
1237 'firmware_action': (self.wait_dev_screen_and_ctrl_d if dev_mode
1238 else None),
1239 })
1240 if dev_mode:
1241 if (not self.faft_config.keyboard_dev and
1242 not self.checkers.crossystem_checker({'devsw_cur': '1'})):
1243 logging.info('Dev switch is not on. Now switch it on.')
1244 self.servo.enable_development_mode()
1245 if not self.checkers.crossystem_checker({'devsw_boot': '1',
1246 'mainfw_type': 'developer'}):
1247 logging.info('System is not in dev mode. Reboot into it.')
1248 if self._backup_dev_mode is None:
1249 self._backup_dev_mode = False
1250 self.run_faft_step({
1251 'userspace_action': None if self.faft_config.keyboard_dev
1252 else (self.faft_client.system.run_shell_command,
1253 'chromeos-firmwareupdate --mode todev && reboot'),
1254 'reboot_action': self.enable_keyboard_dev_mode if
1255 self.faft_config.keyboard_dev else None,
1256 })
1257 else:
1258 if (not self.faft_config.keyboard_dev and
1259 not self.checkers.crossystem_checker({'devsw_cur': '0'})):
1260 logging.info('Dev switch is not off. Now switch it off.')
1261 self.servo.disable_development_mode()
1262 if not self.checkers.crossystem_checker({'devsw_boot': '0',
1263 'mainfw_type': 'normal'}):
1264 logging.info('System is not in normal mode. Reboot into it.')
1265 if self._backup_dev_mode is None:
1266 self._backup_dev_mode = True
1267 self.run_faft_step({
1268 'userspace_action': None if self.faft_config.keyboard_dev
1269 else (self.faft_client.system.run_shell_command,
1270 'chromeos-firmwareupdate --mode tonormal && reboot'),
1271 'reboot_action': self.disable_keyboard_dev_mode if
1272 self.faft_config.keyboard_dev else None,
1273 })
1274
1275 def restore_dev_mode(self):
1276 """Restores original dev mode status if it has changed."""
1277 if self._backup_dev_mode is not None:
1278 self.setup_dev_mode(self._backup_dev_mode)
1279 self._backup_dev_mode = None
1280
1281 def setup_rw_boot(self, section='a'):
1282 """Make sure firmware is in RW-boot mode.
1283
1284 If the given firmware section is in RO-boot mode, turn off the RO-boot
1285 flag and reboot DUT into RW-boot mode.
1286
1287 @param section: A firmware section, either 'a' or 'b'.
1288 """
1289 flags = self.faft_client.bios.get_preamble_flags(section)
1290 if flags & vboot.PREAMBLE_USE_RO_NORMAL:
1291 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
1292 self.run_faft_step({
1293 'userspace_action': (self.faft_client.bios.set_preamble_flags,
1294 (section, flags))
1295 })
1296
1297 def setup_kernel(self, part):
1298 """Setup for kernel test.
1299
1300 It makes sure both kernel A and B bootable and the current boot is
1301 the requested kernel part.
1302
1303 @param part: A string of kernel partition number or 'a'/'b'.
1304 """
1305 self.ensure_kernel_boot(part)
1306 logging.info('Checking the integrity of kernel B and rootfs B...')
1307 if (self.faft_client.kernel.diff_a_b() or
1308 not self.faft_client.rootfs.verify_rootfs('B')):
1309 logging.info('Copying kernel and rootfs from A to B...')
1310 self.copy_kernel_and_rootfs(from_part=part,
1311 to_part=self.OTHER_KERNEL_MAP[part])
1312 self.reset_and_prioritize_kernel(part)
1313
1314 def reset_and_prioritize_kernel(self, part):
1315 """Make the requested partition highest priority.
1316
1317 This function also reset kerenl A and B to bootable.
1318
1319 @param part: A string of partition number to be prioritized.
1320 """
1321 root_dev = self.faft_client.system.get_root_dev()
1322 # Reset kernel A and B to bootable.
1323 self.faft_client.system.run_shell_command(
1324 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
1325 self.faft_client.system.run_shell_command(
1326 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
1327 # Set kernel part highest priority.
1328 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
1329 (self.KERNEL_MAP[part], root_dev))
1330
1331 def warm_reboot(self):
1332 """Request a warm reboot.
1333
1334 A wrapper for underlying servo warm reset.
1335 """
1336 # Use cold reset if the warm reset is broken.
1337 if self.faft_config.broken_warm_reset:
1338 logging.info('broken_warm_reset is True. Cold rebooting instead.')
1339 self.cold_reboot()
1340 else:
1341 self.servo.get_power_state_controller().warm_reset()
1342
1343 def cold_reboot(self):
1344 """Request a cold reboot.
1345
1346 A wrapper for underlying servo cold reset.
1347 """
1348 if self.faft_config.broken_warm_reset:
1349 self.servo.set('pwr_button', 'press')
1350 self.servo.set('cold_reset', 'on')
1351 self.servo.set('cold_reset', 'off')
1352 time.sleep(self.faft_config.ec_boot_to_pwr_button)
1353 self.servo.set('pwr_button', 'release')
1354 else:
1355 self.servo.get_power_state_controller().cold_reset()
1356
1357 def sync_and_warm_reboot(self):
1358 """Request the client sync and do a warm reboot.
1359
1360 This is the default reboot action on FAFT.
1361 """
1362 self.faft_client.system.run_shell_command('sync')
1363 time.sleep(self.faft_config.sync)
1364 self.warm_reboot()
1365
1366 def sync_and_cold_reboot(self):
1367 """Request the client sync and do a cold reboot.
1368
1369 This reboot action is used to reset EC for recovery mode.
1370 """
1371 self.faft_client.system.run_shell_command('sync')
1372 time.sleep(self.faft_config.sync)
1373 self.cold_reboot()
1374
1375 def sync_and_ec_reboot(self, flags=''):
1376 """Request the client sync and do a EC triggered reboot.
1377
1378 @param flags: Optional, a space-separated string of flags passed to EC
1379 reboot command, including:
1380 default: EC soft reboot;
1381 'hard': EC cold/hard reboot.
1382 """
1383 self.faft_client.system.run_shell_command('sync')
1384 time.sleep(self.faft_config.sync)
1385 self.ec.reboot(flags)
1386 time.sleep(self.faft_config.ec_boot_to_console)
1387 self.check_lid_and_power_on()
1388
1389 def reboot_with_factory_install_shim(self):
1390 """Request reboot with factory install shim to reset TPM.
1391
1392 Factory install shim requires dev mode enabled. So this method switches
1393 firmware to dev mode first and reboot. The client uses factory install
1394 shim to reset TPM values.
1395 """
1396 # Unplug USB first to avoid the complicated USB autoboot cases.
1397 self.servo.switch_usbkey('host')
1398 is_dev = self.checkers.crossystem_checker({'devsw_boot': '1'})
1399 if not is_dev:
1400 self.enable_dev_mode_and_reboot()
1401 time.sleep(self.faft_config.sync)
1402 self.enable_rec_mode_and_reboot()
1403 self.wait_fw_screen_and_plug_usb()
1404 time.sleep(self.faft_config.install_shim_done)
1405 self.warm_reboot()
1406
1407 def full_power_off_and_on(self):
1408 """Shutdown the device by pressing power button and power on again."""
1409 # Press power button to trigger Chrome OS normal shutdown process.
1410 # We use a customized delay since the normal-press 1.2s is not enough.
1411 self.servo.power_key(self.faft_config.hold_pwr_button)
1412 time.sleep(self.faft_config.shutdown)
1413 # Short press power button to boot DUT again.
1414 self.servo.power_short_press()
1415
1416 def check_lid_and_power_on(self):
1417 """
1418 On devices with EC software sync, system powers on after EC reboots if
1419 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1420 This method checks lid switch state and presses power button if
1421 necessary.
1422 """
1423 if self.servo.get("lid_open") == "no":
1424 time.sleep(self.faft_config.software_sync)
1425 self.servo.power_short_press()
1426
1427 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1428 """Modify the kernel header magic in USB stick.
1429
1430 The kernel header magic is the first 8-byte of kernel partition.
1431 We modify it to make it fail on kernel verification check.
1432
1433 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1434 @param from_magic: A string of magic which we change it from.
1435 @param to_magic: A string of magic which we change it to.
1436 @raise TestError: if failed to change magic.
1437 """
1438 assert len(from_magic) == 8
1439 assert len(to_magic) == 8
1440 # USB image only contains one kernel.
1441 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1442 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1443 current_magic = self.servo.system_output(read_cmd)
1444 if current_magic == to_magic:
1445 logging.info("The kernel magic is already %s.", current_magic)
1446 return
1447 if current_magic != from_magic:
1448 raise error.TestError("Invalid kernel image on USB: wrong magic.")
1449
1450 logging.info('Modify the kernel magic in USB, from %s to %s.',
1451 from_magic, to_magic)
1452 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1453 " 2>/dev/null" % (to_magic, kernel_part))
1454 self.servo.system(write_cmd)
1455
1456 if self.servo.system_output(read_cmd) != to_magic:
1457 raise error.TestError("Failed to write new magic.")
1458
1459 def corrupt_usb_kernel(self, usb_dev):
1460 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1461
1462 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1463 """
1464 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1465 self.CORRUPTED_MAGIC)
1466
1467 def restore_usb_kernel(self, usb_dev):
1468 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1469
1470 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1471 """
1472 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1473 self.CHROMEOS_MAGIC)
1474
1475 def _call_action(self, action_tuple, check_status=False):
1476 """Call the action function with/without arguments.
1477
1478 @param action_tuple: A function, or a tuple (function, args, error_msg),
1479 in which, args and error_msg are optional. args is
1480 either a value or a tuple if multiple arguments.
1481 This can also be a list containing multiple
1482 function or tuple. In this case, these actions are
1483 called in sequence.
1484 @param check_status: Check the return value of action function. If not
1485 succeed, raises a TestFail exception.
1486 @return: The result value of the action function.
1487 @raise TestError: An error when the action function is not callable.
1488 @raise TestFail: When check_status=True, action function not succeed.
1489 """
1490 if isinstance(action_tuple, list):
1491 return all([self._call_action(action, check_status=check_status)
1492 for action in action_tuple])
1493
1494 action = action_tuple
1495 args = ()
1496 error_msg = 'Not succeed'
1497 if isinstance(action_tuple, tuple):
1498 action = action_tuple[0]
1499 if len(action_tuple) >= 2:
1500 args = action_tuple[1]
1501 if not isinstance(args, tuple):
1502 args = (args,)
1503 if len(action_tuple) >= 3:
1504 error_msg = action_tuple[2]
1505
1506 if action is None:
1507 return
1508
1509 if not callable(action):
1510 raise error.TestError('action is not callable!')
1511
1512 info_msg = 'calling %s' % str(action)
1513 if args:
1514 info_msg += ' with args %s' % str(args)
1515 logging.info(info_msg)
1516 ret = action(*args)
1517
1518 if check_status and not ret:
1519 raise error.TestFail('%s: %s returning %s' %
1520 (error_msg, info_msg, str(ret)))
1521 return ret
1522
1523 def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1524 post_power_action=None, shutdown_timeout=None):
1525 """Run shutdown_action(), which makes DUT shutdown, and power it on.
1526
1527 @param shutdown_action: function which makes DUT shutdown, like
1528 pressing power key.
1529 @param pre_power_action: function which is called before next power on.
1530 @param post_power_action: function which is called after next power on.
1531 @param shutdown_timeout: a timeout to confirm DUT shutdown.
1532 @raise TestFail: if the shutdown_action() failed to turn DUT off.
1533 """
1534 self._call_action(shutdown_action)
1535 logging.info('Wait to ensure DUT shut down...')
1536 try:
1537 if shutdown_timeout is None:
1538 shutdown_timeout = self.faft_config.shutdown_timeout
1539 self.wait_for_client(timeout=shutdown_timeout)
1540 raise error.TestFail(
1541 'Should shut the device down after calling %s.' %
1542 str(shutdown_action))
1543 except ConnectionError:
1544 logging.info(
1545 'DUT is surely shutdown. We are going to power it on again...')
1546
1547 if pre_power_action:
1548 self._call_action(pre_power_action)
1549 self.servo.power_short_press()
1550 if post_power_action:
1551 self._call_action(post_power_action)
1552
1553 def register_faft_template(self, template):
1554 """Register FAFT template, the default FAFT_STEP of each step.
1555
1556 Any missing field falls back to the original faft_template.
1557
1558 @param template: A FAFT_STEP dict.
1559 """
1560 self._faft_template.update(template)
1561
1562 def register_faft_sequence(self, sequence):
1563 """Register FAFT sequence.
1564
1565 @param sequence: A FAFT_SEQUENCE array which consisted of FAFT_STEP
1566 dicts.
1567 """
1568 self._faft_sequence = sequence
1569
1570 def run_faft_step(self, step, no_reboot=False, next_step=None):
1571 """Run a single FAFT step.
1572
1573 Any missing field falls back to faft_template. An empty step means
1574 running the default faft_template.
1575
1576 @param step: A FAFT_STEP dict of this step to run.
1577 @param next_step: A FAFT_STEP dict of next step.
1578 @param no_reboot: True to prevent running reboot_action and
1579 firmware_action.
1580 @parm next_step: Optional, a FAFT_STEP dict of the next step, which is
1581 used for diagnostic.
1582 @raise TestError: An error when the given step is not valid.
1583 @raise TestFail: Test failed in waiting DUT reboot.
1584 """
1585 FAFT_STEP_KEYS = ('state_checker', 'userspace_action', 'reboot_action',
1586 'firmware_action', 'install_deps_after_boot')
1587
1588 test = {}
1589 test.update(self._faft_template)
1590 test.update(step)
1591
1592 for key in test:
1593 if key not in FAFT_STEP_KEYS:
1594 raise error.TestError('Invalid key in FAFT step: %s', key)
1595
1596 # Record the UART output regularly.
1597 self.record_uart_capture()
1598
1599 if test['state_checker']:
1600 logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1601 self._call_action(test['state_checker'], check_status=True)
1602 logging.info("-[FAFT]-[ end state_checker ]----------------")
1603
1604 boot_id = None
1605 retry = 3
1606 while retry:
1607 try:
1608 boot_id = self._client.get_boot_id()
1609 break
1610 except error.AutoservRunError:
1611 retry -= 1
1612 if retry:
1613 logging.info('Retry to get boot_id...')
1614 else:
1615 logging.warning('Failed to get boot_id.')
1616 logging.info('boot_id: %s', boot_id)
1617
1618 logging.info("-[FAFT]-[ start userspace_action ]----------")
1619 self._call_action(test['userspace_action'])
1620 logging.info("-[FAFT]-[ end userspace_action ]------------")
1621
1622 # Don't run reboot_action and firmware_action if no_reboot is True.
1623 if not no_reboot:
1624 logging.info("-[FAFT]-[ start reboot_action ]----------")
1625 self._call_action(test['reboot_action'])
1626 logging.info("-[FAFT]-[ end reboot_action ]------------")
1627 self.wait_for_client_offline(orig_boot_id=boot_id)
1628 logging.info("-[FAFT]-[ start firmware_action ]----------")
1629 self._call_action(test['firmware_action'])
1630 logging.info("-[FAFT]-[ end firmware_action ]------------")
1631
1632 try:
1633 if 'install_deps_after_boot' in test:
1634 logging.info("-[FAFT]-[ start install_deps_after_boot ]---")
1635 self.wait_for_client(
1636 install_deps=test['install_deps_after_boot'])
1637 logging.info("-[FAFT]-[ end install_deps_after_boot ]-----")
1638 else:
1639 self.wait_for_client()
1640 # Stop update-engine as it may change firmware/kernel.
1641 self.stop_service('update-engine')
1642 except ConnectionError:
1643 logging.error('wait_for_client() timed out.')
1644 self._restore_routine_from_timeout(next_step)
1645
1646 def run_faft_sequence(self):
1647 """Run FAFT sequence which was previously registered."""
1648 sequence = self._faft_sequence
1649 for index, step in enumerate(sequence):
1650 logging.info('======== Running FAFT sequence step %d ========',
1651 index + 1)
1652 # Don't reboot in the last step.
1653 if index == len(sequence) - 1:
1654 self.run_faft_step(step, no_reboot=True)
1655 else:
1656 self.run_faft_step(step, next_step=sequence[index + 1])
1657
1658 def get_current_firmware_sha(self):
1659 """Get current firmware sha of body and vblock.
1660
1661 @return: Current firmware sha follows the order (
1662 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1663 """
1664 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1665 self.faft_client.bios.get_body_sha('a'),
1666 self.faft_client.bios.get_sig_sha('b'),
1667 self.faft_client.bios.get_body_sha('b'))
1668 if not all(current_firmware_sha):
1669 raise error.TestError('Failed to get firmware sha.')
1670 return current_firmware_sha
1671
1672 def is_firmware_changed(self):
1673 """Check if the current firmware changed, by comparing its SHA.
1674
1675 @return: True if it is changed, otherwise Flase.
1676 """
1677 # Device may not be rebooted after test.
1678 self.faft_client.bios.reload()
1679
1680 current_sha = self.get_current_firmware_sha()
1681
1682 if current_sha == self._backup_firmware_sha:
1683 return False
1684 else:
1685 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1686 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1687 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1688 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1689 logging.info("Firmware changed:")
1690 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1691 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1692 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1693 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1694 return True
1695
1696 def backup_firmware(self, suffix='.original'):
1697 """Backup firmware to file, and then send it to host.
1698
1699 @param suffix: a string appended to backup file name
1700 """
1701 remote_temp_dir = self.faft_client.system.create_temp_dir()
1702 self.faft_client.bios.dump_whole(os.path.join(remote_temp_dir, 'bios'))
1703 self._client.get_file(os.path.join(remote_temp_dir, 'bios'),
1704 os.path.join(self.resultsdir, 'bios' + suffix))
1705
1706 self._backup_firmware_sha = self.get_current_firmware_sha()
1707 logging.info('Backup firmware stored in %s with suffix %s',
1708 self.resultsdir, suffix)
1709
1710 def is_firmware_saved(self):
1711 """Check if a firmware saved (called backup_firmware before).
1712
1713 @return: True if the firmware is backuped; otherwise False.
1714 """
1715 return self._backup_firmware_sha != ()
1716
1717 def clear_saved_firmware(self):
1718 """Clear the firmware saved by the method backup_firmware."""
1719 self._backup_firmware_sha = ()
1720
1721 def restore_firmware(self, suffix='.original'):
1722 """Restore firmware from host in resultsdir.
1723
1724 @param suffix: a string appended to backup file name
1725 """
1726 if not self.is_firmware_changed():
1727 return
1728
1729 # Backup current corrupted firmware.
1730 self.backup_firmware(suffix='.corrupt')
1731
1732 # Restore firmware.
1733 remote_temp_dir = self.faft_client.system.create_temp_dir()
1734 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1735 os.path.join(remote_temp_dir, 'bios'))
1736
1737 self.faft_client.bios.write_whole(
1738 os.path.join(remote_temp_dir, 'bios'))
1739 self.sync_and_warm_reboot()
1740 self.wait_for_client_offline()
1741 self.wait_dev_screen_and_ctrl_d()
1742 self.wait_for_client()
1743
1744 logging.info('Successfully restore firmware.')
1745
1746 def setup_firmwareupdate_shellball(self, shellball=None):
1747 """Deside a shellball to use in firmware update test.
1748
1749 Check if there is a given shellball, and it is a shell script. Then,
1750 send it to the remote host. Otherwise, use
1751 /usr/sbin/chromeos-firmwareupdate.
1752
1753 @param shellball: path of a shellball or default to None.
1754
1755 @return: Path of shellball in remote host. If use default shellball,
1756 reutrn None.
1757 """
1758 updater_path = None
1759 if shellball:
1760 # Determine the firmware file is a shellball or a raw binary.
1761 is_shellball = (utils.system_output("file %s" % shellball).find(
1762 "shell script") != -1)
1763 if is_shellball:
1764 logging.info('Device will update firmware with shellball %s',
1765 shellball)
1766 temp_dir = self.faft_client.system.create_temp_dir(
1767 'shellball_')
1768 temp_shellball = os.path.join(temp_dir, 'updater.sh')
1769 self._client.send_file(shellball, temp_shellball)
1770 updater_path = temp_shellball
1771 else:
1772 raise error.TestFail(
1773 'The given shellball is not a shell script.')
1774 return updater_path
1775
1776 def is_kernel_changed(self):
1777 """Check if the current kernel is changed, by comparing its SHA1 hash.
1778
1779 @return: True if it is changed; otherwise, False.
1780 """
1781 changed = False
1782 for p in ('A', 'B'):
1783 backup_sha = self._backup_kernel_sha.get(p, None)
1784 current_sha = self.faft_client.kernel.get_sha(p)
1785 if backup_sha != current_sha:
1786 changed = True
1787 logging.info('Kernel %s is changed', p)
1788 return changed
1789
1790 def backup_kernel(self, suffix='.original'):
1791 """Backup kernel to files, and the send them to host.
1792
1793 @param suffix: a string appended to backup file name.
1794 """
1795 remote_temp_dir = self.faft_client.system.create_temp_dir()
1796 for p in ('A', 'B'):
1797 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1798 self.faft_client.kernel.dump(p, remote_path)
1799 self._client.get_file(
1800 remote_path,
1801 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1802 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1803 logging.info('Backup kernel stored in %s with suffix %s',
1804 self.resultsdir, suffix)
1805
1806 def is_kernel_saved(self):
1807 """Check if kernel images are saved (backup_kernel called before).
1808
1809 @return: True if the kernel is saved; otherwise, False.
1810 """
1811 return len(self._backup_kernel_sha) != 0
1812
1813 def clear_saved_kernel(self):
1814 """Clear the kernel saved by backup_kernel()."""
1815 self._backup_kernel_sha = dict()
1816
1817 def restore_kernel(self, suffix='.original'):
1818 """Restore kernel from host in resultsdir.
1819
1820 @param suffix: a string appended to backup file name.
1821 """
1822 if not self.is_kernel_changed():
1823 return
1824
1825 # Backup current corrupted kernel.
1826 self.backup_kernel(suffix='.corrupt')
1827
1828 # Restore kernel.
1829 remote_temp_dir = self.faft_client.system.create_temp_dir()
1830 for p in ('A', 'B'):
1831 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1832 self._client.send_file(
1833 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1834 remote_path)
1835 self.faft_client.kernel.write(p, remote_path)
1836
1837 self.sync_and_warm_reboot()
1838 self.wait_for_client_offline()
1839 self.wait_dev_screen_and_ctrl_d()
1840 self.wait_for_client()
1841
1842 logging.info('Successfully restored kernel.')
1843
1844 def backup_cgpt_attributes(self):
1845 """Backup CGPT partition table attributes."""
1846 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1847
1848 def restore_cgpt_attributes(self):
1849 """Restore CGPT partition table attributes."""
1850 current_table = self.faft_client.cgpt.get_attributes()
1851 if current_table == self._backup_cgpt_attr:
1852 return
1853 logging.info('CGPT table is changed. Original: %r. Current: %r.',
1854 self._backup_cgpt_attr,
1855 current_table)
1856 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1857
1858 self.sync_and_warm_reboot()
1859 self.wait_for_client_offline()
1860 self.wait_dev_screen_and_ctrl_d()
1861 self.wait_for_client()
1862
1863 logging.info('Successfully restored CGPT table.')