Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions launch/launch/actions/execute_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
'sigterm_timeout', default=5),
sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration(
'sigkill_timeout', default=5),
emulate_tty: bool = False,
emulate_tty: bool = True,
prefix: Optional[SomeSubstitutionsType] = None,
output: Text = 'log',
output_format: Text = '[{this.name}] {line}',
Expand Down Expand Up @@ -175,7 +175,7 @@ def __init__(
as a string or a list of strings and Substitutions to be resolved
at runtime, defaults to the LaunchConfiguration called
'sigkill_timeout'
:param: emulate_tty emulate a tty (terminal), defaults to False, but can
:param: emulate_tty emulate a tty (terminal), defaults to True, but can
be overridden with the LaunchConfiguration called 'emulate_tty',
the value of which is evaluated as true or false according to
:py:func:`evaluate_condition_expression`.
Expand Down Expand Up @@ -345,6 +345,8 @@ def __on_signal_process_event(
raise RuntimeError('Signal event received before execution.')
if self._subprocess_transport is None:
raise RuntimeError('Signal event received before subprocess transport available.')
if self._subprocess_protocol is None:
raise RuntimeError('Signal event received before subprocess protocol available.')
if self._subprocess_protocol.complete.done():
# the process is done or is cleaning up, no need to signal
self.__logger.debug(
Expand Down Expand Up @@ -390,6 +392,8 @@ def __on_process_stdin(
def __on_process_stdout(
self, event: ProcessIO
) -> Optional[SomeActionsType]:
if self.__stdout_buffer.closed:
raise RuntimeError('OnProcessIO (stdout) received after stdout buffer closed.')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This this check, I get tracebacks like this:

[DEBUG] [launch]: Traceback (most recent call last):
  File "/home/william/ros2_ws/build/launch/launch/launch_service.py", line 350, in run
    self.__loop_from_run_thread.run_until_complete(run_loop_task)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/william/ros2_ws/build/launch/launch/launch_service.py", line 240, in __run_loop
    await process_one_event_task
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 126, in send
    return self.gen.send(value)
  File "/home/william/ros2_ws/build/launch/launch/launch_service.py", line 177, in _process_one_event
    await self.__process_event(next_event)
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/home/william/ros2_ws/build/launch/launch/launch_service.py", line 186, in __process_event
    entities = event_handler.handle(event, self.__context)
  File "/home/william/ros2_ws/build/launch/launch/event_handlers/on_process_io.py", line 73, in handle
    return self.__on_stdout(event)
  File "/home/william/ros2_ws/build/launch/launch/actions/execute_process.py", line 396, in __on_process_stdout
    raise RuntimeError('OnProcessIO (stdout) received after stdout buffer closed.')
RuntimeError: OnProcessIO (stdout) received after stdout buffer closed.

[ERROR] [launch]: Caught exception in launch (see debug for traceback): OnProcessIO (stdout) received after stdout buffer closed.

I believe this because this line buffering logic was not designed in a way to allow for ProcessIO events to occur after the ProcessExit event comes. @ivanpauno do you have any ideas about how we could change it to allow for that race condition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm flushing the buffers after a ProcessExit event.

OnProcessExit(
target_action=self,
on_exit=self.__flush_buffers,
),
.
The method for flushing buffers leaves them in a wrong state for handling another ProcessIO event:
def __flush_buffers(self, event, context):
with self.__stdout_buffer as buf:
line = buf.getvalue()
if line != '':
self.__stdout_logger.info(
self.__output_format.format(line=line, this=self)
)
with self.__stderr_buffer as buf:
line = buf.getvalue()
if line != '':
self.__stderr_logger.info(
self.__output_format.format(line=line, this=self)
)

One easy change, is to restore the buffers to a correct state after handling ProcessExit event.
It will, at least, don't raise an error with a new ProcessIO event.
The only bad thing: I think it's impossible to ensure the buffer will be completely flushed after the process exited.

Another option: Ignore ProcessIO events after flushing the buffers, instead of raising an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an error here, as you proposed is also fine. If the daemon node of launch_testing is causing this race condition, I think the best solution is stop using a daemon node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore ProcessIO events after flushing the buffers, instead of raising an error.

Hmm, if we cannot guarantee order, neither of those options will do. I don't know how many or how often we get this sort of out-of-band ProcessIO events, but we could stop buffering after ProcessExit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if we cannot guarantee order, neither of those options will do. I don't know how many or how often we get this sort of out-of-band ProcessIO events, but we could stop buffering after ProcessExit.

Sounds reasonable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the daemon node of launch_testing is causing this race condition, I think the best solution is stop using a daemon node.

That's not the cause, the cause is using a pty for stdout and stderr of the subprocess, which means there is different timing (apparently) for when data becomes available on those pipes and the process exiting. Perhaps instead we should not flush until the stdout and stderr are closed (I think you can setup a protocol event for when that happens).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically this one: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.SubprocessProtocol.pipe_connection_lost

I'll need to think about the lifetime of these objects and make sure things stick around long enough.

self.__stdout_buffer.write(event.text.decode(errors='replace'))
self.__stdout_buffer.seek(0)
last_line = None
Expand All @@ -409,6 +413,8 @@ def __on_process_stdout(
def __on_process_stderr(
self, event: ProcessIO
) -> Optional[SomeActionsType]:
if self.__stderr_buffer.closed:
raise RuntimeError('OnProcessIO (stderr) received after stderr buffer closed.')
self.__stderr_buffer.write(event.text.decode(errors='replace'))
self.__stderr_buffer.seek(0)
last_line = None
Expand Down
2 changes: 1 addition & 1 deletion launch/test/launch/actions/test_emulate_tty.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def tty_expected_unless_windows():

@pytest.mark.parametrize('test_input,expected', [
# use the default defined by ExecuteProcess
(None, not tty_expected_unless_windows()),
(None, tty_expected_unless_windows()),
# redundantly override the default via LaunchConfiguration
('true', tty_expected_unless_windows()),
# override the default via LaunchConfiguration
Expand Down
4 changes: 3 additions & 1 deletion launch_testing/launch_testing/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(self,
self._test_tr = threading.Thread(
target=self._run_test,
name='test_runner_thread',
daemon=True
)

def run(self):
Expand Down Expand Up @@ -160,6 +159,9 @@ def run(self):

self._results.append(inactive_results)

# Join the test thread before returning to avoid it being daemonized.
self._test_tr.join()

return self._results

def _run_test(self):
Expand Down