blob: 4b6bbac051e0522159b555e92cf10147100d1e16 [file] [log] [blame]
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.networkstack.tethering
import android.app.Notification
import android.app.NotificationManager
import android.content.Context
import android.content.pm.ActivityInfo
import android.content.pm.ApplicationInfo
import android.content.pm.PackageManager
import android.content.pm.ResolveInfo
import android.content.res.Resources
import android.net.ConnectivityManager.TETHERING_WIFI
import android.os.Handler
import android.os.HandlerThread
import android.os.Looper
import android.net.NetworkCapabilities
import android.net.NetworkCapabilities.NET_CAPABILITY_NOT_ROAMING
import android.os.UserHandle
import android.telephony.TelephonyManager
import androidx.test.filters.SmallTest
import androidx.test.platform.app.InstrumentationRegistry
import androidx.test.runner.AndroidJUnit4
import com.android.internal.util.test.BroadcastInterceptingContext
import com.android.networkstack.tethering.TetheringNotificationUpdater.DOWNSTREAM_NONE
import com.android.networkstack.tethering.TetheringNotificationUpdater.EVENT_SHOW_NO_UPSTREAM
import com.android.networkstack.tethering.TetheringNotificationUpdater.NO_UPSTREAM_NOTIFICATION_ID
import com.android.networkstack.tethering.TetheringNotificationUpdater.RESTRICTED_NOTIFICATION_ID
import com.android.networkstack.tethering.TetheringNotificationUpdater.ROAMING_NOTIFICATION_ID
import com.android.networkstack.tethering.TetheringNotificationUpdater.VERIZON_CARRIER_ID
import com.android.testutils.waitForIdle
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.fail
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.any
import org.mockito.ArgumentMatchers.anyInt
import org.mockito.ArgumentMatchers.eq
import org.mockito.Mock
import org.mockito.Mockito.doReturn
import org.mockito.Mockito.mock
import org.mockito.Mockito.never
import org.mockito.Mockito.reset
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.mockito.Mockito.verifyZeroInteractions
import org.mockito.MockitoAnnotations
const val TEST_SUBID = 1
const val WIFI_MASK = 1 shl TETHERING_WIFI
const val TEST_DISALLOW_TITLE = "Tether function is disallowed"
const val TEST_DISALLOW_MESSAGE = "Please contact your admin"
const val TEST_NO_UPSTREAM_TITLE = "Hotspot has no internet access"
const val TEST_NO_UPSTREAM_MESSAGE = "Device cannot connect to internet."
const val TEST_NO_UPSTREAM_BUTTON = "Turn off hotspot"
const val TEST_ROAMING_TITLE = "Hotspot is on"
const val TEST_ROAMING_MESSAGE = "Additional charges may apply while roaming."
@RunWith(AndroidJUnit4::class)
@SmallTest
class TetheringNotificationUpdaterTest {
// lateinit used here for mocks as they need to be reinitialized between each test and the test
// should crash if they are used before being initialized.
@Mock private lateinit var mockContext: Context
@Mock private lateinit var notificationManager: NotificationManager
@Mock private lateinit var telephonyManager: TelephonyManager
@Mock private lateinit var testResources: Resources
// lateinit for these classes under test, as they should be reset to a different instance for
// every test but should always be initialized before use (or the test should crash).
private lateinit var context: TestContext
private lateinit var notificationUpdater: TetheringNotificationUpdater
private lateinit var fakeTetheringThread: HandlerThread
private val ROAMING_CAPABILITIES = NetworkCapabilities()
private val HOME_CAPABILITIES = NetworkCapabilities().addCapability(NET_CAPABILITY_NOT_ROAMING)
private val NOTIFICATION_ICON_ID = R.drawable.stat_sys_tether_general
private val TIMEOUT_MS = 500L
private inner class TestContext(c: Context) : BroadcastInterceptingContext(c) {
override fun createContextAsUser(user: UserHandle, flags: Int) =
if (user == UserHandle.ALL) mockContext else this
override fun getSystemService(name: String) =
if (name == Context.TELEPHONY_SERVICE) telephonyManager
else super.getSystemService(name)
}
private inner class WrappedNotificationUpdater(c: Context, looper: Looper)
: TetheringNotificationUpdater(c, looper) {
override fun getResourcesForSubId(c: Context, subId: Int) =
if (subId == TEST_SUBID) testResources else super.getResourcesForSubId(c, subId)
}
private fun setupResources() {
doReturn(5).`when`(testResources)
.getInteger(R.integer.delay_to_show_no_upstream_after_no_backhaul)
doReturn(true).`when`(testResources)
.getBoolean(R.bool.config_upstream_roaming_notification)
doReturn(TEST_DISALLOW_TITLE).`when`(testResources)
.getString(R.string.disable_tether_notification_title)
doReturn(TEST_DISALLOW_MESSAGE).`when`(testResources)
.getString(R.string.disable_tether_notification_message)
doReturn(TEST_NO_UPSTREAM_TITLE).`when`(testResources)
.getString(R.string.no_upstream_notification_title)
doReturn(TEST_NO_UPSTREAM_MESSAGE).`when`(testResources)
.getString(R.string.no_upstream_notification_message)
doReturn(TEST_NO_UPSTREAM_BUTTON).`when`(testResources)
.getString(R.string.no_upstream_notification_disable_button)
doReturn(TEST_ROAMING_TITLE).`when`(testResources)
.getString(R.string.upstream_roaming_notification_title)
doReturn(TEST_ROAMING_MESSAGE).`when`(testResources)
.getString(R.string.upstream_roaming_notification_message)
}
@Before
fun setUp() {
MockitoAnnotations.initMocks(this)
context = TestContext(InstrumentationRegistry.getInstrumentation().context)
doReturn(notificationManager).`when`(mockContext)
.getSystemService(Context.NOTIFICATION_SERVICE)
fakeTetheringThread = HandlerThread(this::class.java.simpleName)
fakeTetheringThread.start()
notificationUpdater = WrappedNotificationUpdater(context, fakeTetheringThread.looper)
setupResources()
}
@After
fun tearDown() {
fakeTetheringThread.quitSafely()
}
private fun Notification.title() = this.extras.getString(Notification.EXTRA_TITLE)
private fun Notification.text() = this.extras.getString(Notification.EXTRA_TEXT)
private fun verifyNotification(iconId: Int, title: String, text: String, id: Int) {
verify(notificationManager, never()).cancel(any(), eq(id))
val notificationCaptor = ArgumentCaptor.forClass(Notification::class.java)
verify(notificationManager, times(1))
.notify(any(), eq(id), notificationCaptor.capture())
val notification = notificationCaptor.getValue()
assertEquals(iconId, notification.smallIcon.resId)
assertEquals(title, notification.title())
assertEquals(text, notification.text())
reset(notificationManager)
}
private fun verifyNotificationCancelled(
notificationIds: List<Int>,
resetAfterVerified: Boolean = true
) {
notificationIds.forEach {
verify(notificationManager, times(1)).cancel(any(), eq(it))
}
if (resetAfterVerified) reset(notificationManager)
}
@Test
fun testRestrictedNotification() {
// Set test sub id.
notificationUpdater.onActiveDataSubscriptionIdChanged(TEST_SUBID)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// User restrictions on. Show restricted notification.
notificationUpdater.notifyTetheringDisabledByRestriction()
verifyNotification(NOTIFICATION_ICON_ID, TEST_DISALLOW_TITLE, TEST_DISALLOW_MESSAGE,
RESTRICTED_NOTIFICATION_ID)
// User restrictions off. Clear notification.
notificationUpdater.tetheringRestrictionLifted()
verifyNotificationCancelled(listOf(RESTRICTED_NOTIFICATION_ID))
// No downstream.
notificationUpdater.onDownstreamChanged(DOWNSTREAM_NONE)
verifyZeroInteractions(notificationManager)
// User restrictions on again. Show restricted notification.
notificationUpdater.notifyTetheringDisabledByRestriction()
verifyNotification(NOTIFICATION_ICON_ID, TEST_DISALLOW_TITLE, TEST_DISALLOW_MESSAGE,
RESTRICTED_NOTIFICATION_ID)
}
val MAX_BACKOFF_MS = 200L
/**
* Waits for all messages, including delayed ones, to be processed.
*
* This will wait until the handler has no more messages to be processed including
* delayed ones, or the timeout has expired. It uses an exponential backoff strategy
* to wait longer and longer to consume less CPU, with the max granularity being
* MAX_BACKOFF_MS.
*
* @return true if all messages have been processed including delayed ones, false if timeout
*
* TODO: Move this method to com.android.testutils.HandlerUtils.kt.
*/
private fun Handler.waitForDelayedMessage(what: Int?, timeoutMs: Long) {
fun hasMatchingMessages() =
if (what == null) hasMessagesOrCallbacks() else hasMessages(what)
val expiry = System.currentTimeMillis() + timeoutMs
var delay = 5L
while (System.currentTimeMillis() < expiry && hasMatchingMessages()) {
// None of Handler, Looper, Message and MessageQueue expose any way to retrieve
// the time when the next (let alone the last) message will be processed, so
// short of examining the internals with reflection sleep() is the only solution.
Thread.sleep(delay)
delay = (delay * 2)
.coerceAtMost(expiry - System.currentTimeMillis())
.coerceAtMost(MAX_BACKOFF_MS)
}
val timeout = expiry - System.currentTimeMillis()
if (timeout <= 0) fail("Delayed message did not process yet after ${timeoutMs}ms")
waitForIdle(timeout)
}
@Test
fun testNoUpstreamNotification() {
// Set test sub id.
notificationUpdater.onActiveDataSubscriptionIdChanged(TEST_SUBID)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Wifi downstream.
notificationUpdater.onDownstreamChanged(WIFI_MASK)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// There is no upstream. Show no upstream notification.
notificationUpdater.onUpstreamCapabilitiesChanged(null)
notificationUpdater.handler.waitForDelayedMessage(EVENT_SHOW_NO_UPSTREAM, TIMEOUT_MS)
verifyNotification(NOTIFICATION_ICON_ID, TEST_NO_UPSTREAM_TITLE, TEST_NO_UPSTREAM_MESSAGE,
NO_UPSTREAM_NOTIFICATION_ID)
// Same capabilities changed. Nothing happened.
notificationUpdater.onUpstreamCapabilitiesChanged(null)
verifyZeroInteractions(notificationManager)
// Upstream come back. Clear no upstream notification.
notificationUpdater.onUpstreamCapabilitiesChanged(HOME_CAPABILITIES)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID))
// No upstream again. Show no upstream notification.
notificationUpdater.onUpstreamCapabilitiesChanged(null)
notificationUpdater.handler.waitForDelayedMessage(EVENT_SHOW_NO_UPSTREAM, TIMEOUT_MS)
verifyNotification(NOTIFICATION_ICON_ID, TEST_NO_UPSTREAM_TITLE, TEST_NO_UPSTREAM_MESSAGE,
NO_UPSTREAM_NOTIFICATION_ID)
// No downstream.
notificationUpdater.onDownstreamChanged(DOWNSTREAM_NONE)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Wifi downstream and home capabilities.
notificationUpdater.onDownstreamChanged(WIFI_MASK)
notificationUpdater.onUpstreamCapabilitiesChanged(HOME_CAPABILITIES)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Set R.integer.delay_to_show_no_upstream_after_no_backhaul to -1 and change to no upstream
// again. Don't put up no upstream notification.
doReturn(-1).`when`(testResources)
.getInteger(R.integer.delay_to_show_no_upstream_after_no_backhaul)
notificationUpdater.onUpstreamCapabilitiesChanged(null)
notificationUpdater.handler.waitForDelayedMessage(EVENT_SHOW_NO_UPSTREAM, TIMEOUT_MS)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID))
}
@Test
fun testGetResourcesForSubId() {
doReturn(telephonyManager).`when`(telephonyManager).createForSubscriptionId(anyInt())
doReturn(1234).`when`(telephonyManager).getSimCarrierId()
doReturn("000000").`when`(telephonyManager).getSimOperator()
val subId = -2 // Use invalid subId to avoid getting resource from cache or real subId.
val config = context.resources.configuration
var res = notificationUpdater.getResourcesForSubId(context, subId)
assertEquals(config.mcc, res.configuration.mcc)
assertEquals(config.mnc, res.configuration.mnc)
doReturn(VERIZON_CARRIER_ID).`when`(telephonyManager).getSimCarrierId()
res = notificationUpdater.getResourcesForSubId(context, subId)
assertEquals(config.mcc, res.configuration.mcc)
assertEquals(config.mnc, res.configuration.mnc)
doReturn("20404").`when`(telephonyManager).getSimOperator()
res = notificationUpdater.getResourcesForSubId(context, subId)
assertEquals(311, res.configuration.mcc)
assertEquals(480, res.configuration.mnc)
}
@Test
fun testRoamingNotification() {
// Set test sub id.
notificationUpdater.onActiveDataSubscriptionIdChanged(TEST_SUBID)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Wifi downstream.
notificationUpdater.onDownstreamChanged(WIFI_MASK)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Upstream capabilities changed to roaming state. Show roaming notification.
notificationUpdater.onUpstreamCapabilitiesChanged(ROAMING_CAPABILITIES)
verifyNotification(NOTIFICATION_ICON_ID, TEST_ROAMING_TITLE, TEST_ROAMING_MESSAGE,
ROAMING_NOTIFICATION_ID)
// Same capabilities change. Nothing happened.
notificationUpdater.onUpstreamCapabilitiesChanged(ROAMING_CAPABILITIES)
verifyZeroInteractions(notificationManager)
// Upstream capabilities changed to home state. Clear roaming notification.
notificationUpdater.onUpstreamCapabilitiesChanged(HOME_CAPABILITIES)
verifyNotificationCancelled(listOf(ROAMING_NOTIFICATION_ID))
// Upstream capabilities changed to roaming state again. Show roaming notification.
notificationUpdater.onUpstreamCapabilitiesChanged(ROAMING_CAPABILITIES)
verifyNotification(NOTIFICATION_ICON_ID, TEST_ROAMING_TITLE, TEST_ROAMING_MESSAGE,
ROAMING_NOTIFICATION_ID)
// No upstream. Clear roaming notification and show no upstream notification.
notificationUpdater.onUpstreamCapabilitiesChanged(null)
notificationUpdater.handler.waitForDelayedMessage(EVENT_SHOW_NO_UPSTREAM, TIMEOUT_MS)
verifyNotificationCancelled(listOf(ROAMING_NOTIFICATION_ID), false)
verifyNotification(NOTIFICATION_ICON_ID, TEST_NO_UPSTREAM_TITLE, TEST_NO_UPSTREAM_MESSAGE,
NO_UPSTREAM_NOTIFICATION_ID)
// No downstream.
notificationUpdater.onDownstreamChanged(DOWNSTREAM_NONE)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
// Wifi downstream again.
notificationUpdater.onDownstreamChanged(WIFI_MASK)
notificationUpdater.handler.waitForDelayedMessage(EVENT_SHOW_NO_UPSTREAM, TIMEOUT_MS)
verifyNotificationCancelled(listOf(ROAMING_NOTIFICATION_ID), false)
verifyNotification(NOTIFICATION_ICON_ID, TEST_NO_UPSTREAM_TITLE, TEST_NO_UPSTREAM_MESSAGE,
NO_UPSTREAM_NOTIFICATION_ID)
// Set R.bool.config_upstream_roaming_notification to false and change upstream
// network to roaming state again. No roaming notification.
doReturn(false).`when`(testResources)
.getBoolean(R.bool.config_upstream_roaming_notification)
notificationUpdater.onUpstreamCapabilitiesChanged(ROAMING_CAPABILITIES)
verifyNotificationCancelled(listOf(NO_UPSTREAM_NOTIFICATION_ID, ROAMING_NOTIFICATION_ID))
}
@Test
fun testGetSettingsPackageName() {
val defaultSettingsPackageName = "com.android.settings"
val testSettingsPackageName = "com.android.test.settings"
val pm = mock(PackageManager::class.java)
doReturn(null).`when`(pm).resolveActivity(any(), anyInt())
assertEquals(defaultSettingsPackageName,
TetheringNotificationUpdater.getSettingsPackageName(pm))
val resolveInfo = ResolveInfo().apply {
activityInfo = ActivityInfo().apply {
name = "test"
applicationInfo = ApplicationInfo().apply {
packageName = testSettingsPackageName
}
}
}
doReturn(resolveInfo).`when`(pm).resolveActivity(any(), anyInt())
assertEquals(testSettingsPackageName,
TetheringNotificationUpdater.getSettingsPackageName(pm))
}
}