Merge "Handle NR tests in cellular base test class"
diff --git a/acts/framework/acts/controllers/amarisoft_lib/config_utils.py b/acts/framework/acts/controllers/amarisoft_lib/config_utils.py
index 3dad7a3..bafe144 100644
--- a/acts/framework/acts/controllers/amarisoft_lib/config_utils.py
+++ b/acts/framework/acts/controllers/amarisoft_lib/config_utils.py
@@ -1,4 +1,4 @@
-!/usr/bin/env python3
+#!/usr/bin/env python3
#
# Copyright 2022 - Google
#
diff --git a/acts/framework/acts/controllers/cellular_lib/LteSimulation.py b/acts/framework/acts/controllers/cellular_lib/LteSimulation.py
index 846c9e4..4b5591d 100644
--- a/acts/framework/acts/controllers/cellular_lib/LteSimulation.py
+++ b/acts/framework/acts/controllers/cellular_lib/LteSimulation.py
@@ -211,7 +211,8 @@
41: 39650,
42: 41590,
43: 45590,
- 66: 66436
+ 66: 66436,
+ 67: 67336
}
# Peak throughput lookup tables for each TDD subframe
diff --git a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
index 3fc1813..16c7e8c 100644
--- a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
+++ b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
@@ -15,14 +15,13 @@
# limitations under the License.
import backoff
-import itertools
import os
import logging
import paramiko
import psutil
-import shutil
import socket
import tarfile
+import tempfile
import time
import usbinfo
@@ -254,26 +253,33 @@
else:
raise ValueError('A suitable build could not be found.')
- tmp_path = '/tmp/%s_%s' % (str(int(
- time.time() * 10000)), fuchsia_device.board_type)
- os.mkdir(tmp_path)
- if file_download_needed:
- job.run('gsutil cp %s %s' % (file_to_download, tmp_path))
- logging.info('Downloading %s to %s' % (file_to_download, tmp_path))
- image_tgz = os.path.basename(file_to_download)
- else:
- job.run('cp %s %s' % (fuchsia_device.specific_image, tmp_path))
- logging.info('Copying %s to %s' % (file_to_download, tmp_path))
- image_tgz = os.path.basename(fuchsia_device.specific_image)
+ tmp_dir = tempfile.TemporaryDirectory(suffix=fuchsia_device.board_type)
+ with tmp_dir as tmp_path:
+ if file_download_needed:
+ logging.info('Downloading %s to %s' % (file_to_download, tmp_path))
+ job.run('gsutil cp %s %s' % (file_to_download, tmp_dir.name))
+ image_tgz = os.path.join(tmp_path,
+ os.path.basename(file_to_download))
+ else:
+ image_tgz = file_to_download
- job.run('tar xfvz %s/%s -C %s' % (tmp_path, image_tgz, tmp_path))
- all_files = []
- for root, _dirs, files in itertools.islice(os.walk(tmp_path), 1, None):
- for filename in files:
- all_files.append(os.path.join(root, filename))
- for filename in all_files:
- shutil.move(filename, tmp_path)
+ tar = tarfile.open(image_tgz, 'r:gz')
+ tar.extractall(tmp_path)
+ tar.close()
+ reboot_to_bootloader(fuchsia_device, use_ssh,
+ fuchsia_reconnect_after_reboot_time)
+
+ logging.info(
+ f'Flashing {fuchsia_device.orig_ip} with {image_tgz} using authorized keys "{fuchsia_device.authorized_file}".'
+ )
+ run_flash_script(fuchsia_device, tmp_path)
+ return True
+
+
+def reboot_to_bootloader(fuchsia_device,
+ use_ssh=False,
+ fuchsia_reconnect_after_reboot_time=5):
if use_ssh:
logging.info('Sending reboot command via SSH to '
'get into bootloader.')
@@ -327,16 +333,17 @@
flash_process_found = True
if not flash_process_found:
break
- logging.info(f'Flashing {fuchsia_device.orig_ip} with {tmp_path}/{image_tgz} using authorized keys "{fuchsia_device.authorized_file}".')
+
+
+def run_flash_script(fuchsia_device, flash_dir):
try:
- flash_output = job.run(f'bash {tmp_path}/flash.sh --ssh-key={fuchsia_device.authorized_file} -s {fuchsia_device.serial_number}', timeout=120)
+ flash_output = job.run(
+ f'bash {flash_dir}/flash.sh --ssh-key={fuchsia_device.authorized_file} -s {fuchsia_device.serial_number}',
+ timeout=120)
logging.debug(flash_output.stderr)
except job.TimeoutError as err:
raise TimeoutError(err)
- try:
- os.rmdir(tmp_path)
- except Exception:
- job.run('rm -fr %s' % tmp_path)
+
logging.info('Waiting %s seconds for device'
' to come back up after flashing.' % AFTER_FLASH_BOOT_TIME)
time.sleep(AFTER_FLASH_BOOT_TIME)
@@ -361,4 +368,3 @@
else:
raise ValueError('Invalid IP: %s after flashing.' %
fuchsia_device.orig_ip)
- return True
diff --git a/acts_tests/tests/google/fuchsia/flash/FlashTest.py b/acts_tests/tests/google/fuchsia/flash/FlashTest.py
index 572a431..03662f2 100644
--- a/acts_tests/tests/google/fuchsia/flash/FlashTest.py
+++ b/acts_tests/tests/google/fuchsia/flash/FlashTest.py
@@ -18,15 +18,13 @@
the Sponge test result properties. Uses the built in flashing tool for
fuchsia_devices.
"""
-import time
-
-import acts.libs.proc.job as job
-
-from acts import signals
+from acts import asserts
from acts.base_test import BaseTestClass
from acts.controllers.fuchsia_lib.base_lib import DeviceOffline
from acts.utils import get_device
+MAX_FLASH_ATTEMPTS = 3
+
class FlashTest(BaseTestClass):
def setup_class(self):
@@ -56,36 +54,35 @@
return super().teardown_class()
def test_flash_devices(self):
- flash_retry_max = 3
- flash_counter = 0
- for fuchsia_device in self.fuchsia_devices:
- while flash_counter < flash_retry_max:
- try:
- fuchsia_device.reboot(reboot_type='flash',
- use_ssh=True,
- unreachable_timeout=120,
- ping_timeout=120)
- flash_counter = flash_retry_max
- except Exception as err:
- if fuchsia_device.device_pdu_config:
- self.log.info(
- 'Flashing failed with error: {}'.format(err))
- self.log.info('Hard rebooting fuchsia_device({}) and '
- 'retrying.'.format(
- fuchsia_device.orig_ip))
- fuchsia_device.reboot(reboot_type='hard',
- testbed_pdus=self.pdu_devices)
- flash_counter = flash_counter + 1
- if flash_counter == flash_retry_max:
- raise err
- time.sleep(1)
- else:
- raise err
- self.log.info("fuchsia_device(%s) has been flashed." %
- fuchsia_device.orig_ip)
+ for device in self.fuchsia_devices:
flash_counter = 0
+ while True:
+ try:
+ device.reboot(reboot_type='flash',
+ use_ssh=True,
+ unreachable_timeout=120,
+ ping_timeout=120)
+ self.log.info(f'{device.orig_ip} has been flashed.')
+ break
+ except Exception as err:
+ self.log.error(
+ f'Failed to flash {device.orig_ip} with error:\n{err}')
- return True
+ if not device.device_pdu_config:
+ asserts.abort_all(
+ f'Failed to flash {device.orig_ip} and no PDU available for hard reboot'
+ )
+
+ flash_counter = flash_counter + 1
+ if flash_counter == MAX_FLASH_ATTEMPTS:
+ asserts.abort_all(
+ f'Failed to flash {device.orig_ip} after {MAX_FLASH_ATTEMPTS} attempts'
+ )
+
+ self.log.info(
+ f'Hard rebooting {device.orig_ip} and retrying flash.')
+ device.reboot(reboot_type='hard',
+ testbed_pdus=self.pdu_devices)
def test_report_dut_version(self):
"""Empty test to ensure the version of the DUT is reported in the Sponge