| """ |
| Test lldb Python event APIs. |
| """ |
| |
| import os, time |
| import re |
| import unittest2 |
| import lldb, lldbutil |
| from lldbtest import * |
| |
| class EventAPITestCase(TestBase): |
| |
| mydir = os.path.join("python_api", "event") |
| |
| @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") |
| @python_api_test |
| @dsym_test |
| def test_listen_for_and_print_event_with_dsym(self): |
| """Exercise SBEvent API.""" |
| self.buildDsym() |
| self.do_listen_for_and_print_event() |
| |
| @python_api_test |
| @dwarf_test |
| def test_listen_for_and_print_event_with_dwarf(self): |
| """Exercise SBEvent API.""" |
| self.buildDwarf() |
| self.do_listen_for_and_print_event() |
| |
| @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") |
| @python_api_test |
| @dsym_test |
| def test_wait_for_event_with_dsym(self): |
| """Exercise SBListener.WaitForEvent() API.""" |
| self.buildDsym() |
| self.do_wait_for_event() |
| |
| @python_api_test |
| @dwarf_test |
| def test_wait_for_event_with_dwarf(self): |
| """Exercise SBListener.WaitForEvent() API.""" |
| self.buildDwarf() |
| self.do_wait_for_event() |
| |
| @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") |
| @python_api_test |
| @dsym_test |
| def test_add_listener_to_broadcaster_with_dsym(self): |
| """Exercise some SBBroadcaster APIs.""" |
| self.buildDsym() |
| self.do_add_listener_to_broadcaster() |
| |
| @python_api_test |
| @dwarf_test |
| def test_add_listener_to_broadcaster_with_dwarf(self): |
| """Exercise some SBBroadcaster APIs.""" |
| self.buildDwarf() |
| self.do_add_listener_to_broadcaster() |
| |
| def setUp(self): |
| # Call super's setUp(). |
| TestBase.setUp(self) |
| # Find the line number to of function 'c'. |
| self.line = line_number('main.c', '// Find the line number of function "c" here.') |
| |
| def do_listen_for_and_print_event(self): |
| """Create a listener and use SBEvent API to print the events received.""" |
| exe = os.path.join(os.getcwd(), "a.out") |
| |
| # Create a target by the debugger. |
| target = self.dbg.CreateTarget(exe) |
| self.assertTrue(target, VALID_TARGET) |
| |
| # Now create a breakpoint on main.c by name 'c'. |
| breakpoint = target.BreakpointCreateByName('c', 'a.out') |
| |
| # Now launch the process, and do not stop at the entry point. |
| process = target.LaunchSimple(None, None, os.getcwd()) |
| self.assertTrue(process.GetState() == lldb.eStateStopped, |
| PROCESS_STOPPED) |
| |
| # Get a handle on the process's broadcaster. |
| broadcaster = process.GetBroadcaster() |
| |
| # Create an empty event object. |
| event = lldb.SBEvent() |
| |
| # Create a listener object and register with the broadcaster. |
| listener = lldb.SBListener("my listener") |
| rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) |
| self.assertTrue(rc, "AddListener successfully retruns") |
| |
| traceOn = self.TraceOn() |
| if traceOn: |
| lldbutil.print_stacktraces(process) |
| |
| # Create MyListeningThread class to wait for any kind of event. |
| import threading |
| class MyListeningThread(threading.Thread): |
| def run(self): |
| count = 0 |
| # Let's only try at most 4 times to retrieve any kind of event. |
| # After that, the thread exits. |
| while not count > 3: |
| if traceOn: |
| print "Try wait for event..." |
| if listener.WaitForEventForBroadcasterWithType(5, |
| broadcaster, |
| lldb.SBProcess.eBroadcastBitStateChanged, |
| event): |
| if traceOn: |
| desc = lldbutil.get_description(event) |
| print "Event description:", desc |
| print "Event data flavor:", event.GetDataFlavor() |
| print "Process state:", lldbutil.state_type_to_str(process.GetState()) |
| print |
| else: |
| if traceOn: |
| print "timeout occurred waiting for event..." |
| count = count + 1 |
| return |
| |
| # Let's start the listening thread to retrieve the events. |
| my_thread = MyListeningThread() |
| my_thread.start() |
| |
| # Use Python API to continue the process. The listening thread should be |
| # able to receive the state changed events. |
| process.Continue() |
| |
| # Use Python API to kill the process. The listening thread should be |
| # able to receive the state changed event, too. |
| process.Kill() |
| |
| # Wait until the 'MyListeningThread' terminates. |
| my_thread.join() |
| |
| def do_wait_for_event(self): |
| """Get the listener associated with the debugger and exercise WaitForEvent API.""" |
| exe = os.path.join(os.getcwd(), "a.out") |
| |
| # Create a target by the debugger. |
| target = self.dbg.CreateTarget(exe) |
| self.assertTrue(target, VALID_TARGET) |
| |
| # Now create a breakpoint on main.c by name 'c'. |
| breakpoint = target.BreakpointCreateByName('c', 'a.out') |
| #print "breakpoint:", breakpoint |
| self.assertTrue(breakpoint and |
| breakpoint.GetNumLocations() == 1, |
| VALID_BREAKPOINT) |
| |
| # Get the debugger listener. |
| listener = self.dbg.GetListener() |
| |
| # Now launch the process, and do not stop at entry point. |
| error = lldb.SBError() |
| process = target.Launch (listener, None, None, None, None, None, None, 0, False, error) |
| self.assertTrue(error.Success() and process, PROCESS_IS_VALID) |
| |
| # Get a handle on the process's broadcaster. |
| broadcaster = process.GetBroadcaster() |
| self.assertTrue(broadcaster, "Process with valid broadcaster") |
| |
| # Create an empty event object. |
| event = lldb.SBEvent() |
| self.assertFalse(event, "Event should not be valid initially") |
| |
| # Create MyListeningThread to wait for any kind of event. |
| import threading |
| class MyListeningThread(threading.Thread): |
| def run(self): |
| count = 0 |
| # Let's only try at most 3 times to retrieve any kind of event. |
| while not count > 3: |
| if listener.WaitForEvent(5, event): |
| #print "Got a valid event:", event |
| #print "Event data flavor:", event.GetDataFlavor() |
| #print "Event type:", lldbutil.state_type_to_str(event.GetType()) |
| return |
| count = count + 1 |
| print "Timeout: listener.WaitForEvent" |
| |
| return |
| |
| # Use Python API to kill the process. The listening thread should be |
| # able to receive a state changed event. |
| process.Kill() |
| |
| # Let's start the listening thread to retrieve the event. |
| my_thread = MyListeningThread() |
| my_thread.start() |
| |
| # Wait until the 'MyListeningThread' terminates. |
| my_thread.join() |
| |
| self.assertTrue(event, |
| "My listening thread successfully received an event") |
| |
| def do_add_listener_to_broadcaster(self): |
| """Get the broadcaster associated with the process and wait for broadcaster events.""" |
| exe = os.path.join(os.getcwd(), "a.out") |
| |
| # Create a target by the debugger. |
| target = self.dbg.CreateTarget(exe) |
| self.assertTrue(target, VALID_TARGET) |
| |
| # Now create a breakpoint on main.c by name 'c'. |
| breakpoint = target.BreakpointCreateByName('c', 'a.out') |
| #print "breakpoint:", breakpoint |
| self.assertTrue(breakpoint and |
| breakpoint.GetNumLocations() == 1, |
| VALID_BREAKPOINT) |
| |
| # Now launch the process, and do not stop at the entry point. |
| process = target.LaunchSimple(None, None, os.getcwd()) |
| self.assertTrue(process.GetState() == lldb.eStateStopped, |
| PROCESS_STOPPED) |
| |
| # Get a handle on the process's broadcaster. |
| broadcaster = process.GetBroadcaster() |
| self.assertTrue(broadcaster, "Process with valid broadcaster") |
| |
| # Create an empty event object. |
| event = lldb.SBEvent() |
| self.assertFalse(event, "Event should not be valid initially") |
| |
| # Create a listener object and register with the broadcaster. |
| listener = lldb.SBListener("TestEvents.listener") |
| rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) |
| self.assertTrue(rc, "AddListener successfully retruns") |
| |
| # The finite state machine for our custom listening thread, with an |
| # initail state of 0, which means a "running" event is expected. |
| # It changes to 1 after "running" is received. |
| # It cahnges to 2 after "stopped" is received. |
| # 2 will be our final state and the test is complete. |
| self.state = 0 |
| |
| # Create MyListeningThread to wait for state changed events. |
| # By design, a "running" event is expected following by a "stopped" event. |
| import threading |
| class MyListeningThread(threading.Thread): |
| def run(self): |
| #print "Running MyListeningThread:", self |
| |
| # Regular expression pattern for the event description. |
| pattern = re.compile("data = {.*, state = (.*)}$") |
| |
| # Let's only try at most 6 times to retrieve our events. |
| count = 0 |
| while True: |
| if listener.WaitForEventForBroadcasterWithType(5, |
| broadcaster, |
| lldb.SBProcess.eBroadcastBitStateChanged, |
| event): |
| desc = lldbutil.get_description(event) |
| #print "Event description:", desc |
| match = pattern.search(desc) |
| if not match: |
| break; |
| if self.context.state == 0 and match.group(1) == 'running': |
| self.context.state = 1 |
| continue |
| elif self.context.state == 1 and match.group(1) == 'stopped': |
| # Whoopee, both events have been received! |
| self.context.state = 2 |
| break |
| else: |
| break |
| print "Timeout: listener.WaitForEvent" |
| count = count + 1 |
| if count > 6: |
| break |
| |
| return |
| |
| # Use Python API to continue the process. The listening thread should be |
| # able to receive the state changed events. |
| process.Continue() |
| |
| # Start the listening thread to receive the "running" followed by the |
| # "stopped" events. |
| my_thread = MyListeningThread() |
| # Supply the enclosing context so that our listening thread can access |
| # the 'state' variable. |
| my_thread.context = self |
| my_thread.start() |
| |
| # Wait until the 'MyListeningThread' terminates. |
| my_thread.join() |
| |
| # We are no longer interested in receiving state changed events. |
| # Remove our custom listener before the inferior is killed. |
| broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) |
| |
| # The final judgement. :-) |
| self.assertTrue(self.state == 2, |
| "Both expected state changed events received") |
| |
| |
| if __name__ == '__main__': |
| import atexit |
| lldb.SBDebugger.Initialize() |
| atexit.register(lambda: lldb.SBDebugger.Terminate()) |
| unittest2.main() |