Skip to content

Commit 2754e9a

Browse files
authored
gh-47798: Refactor the POSIX subprocess.Popen._communicate selector loop into helpers (GH-149032)
No public API change. Lift the per-iteration select/read/write loop out of Popen._communicate (POSIX) into a module-level _communicate_io_posix(), with small _flush_stdin / _make_input_view / _translate_newlines helpers alongside it. Popen._communicate calls the helper and persists the returned input offset for resume-after-timeout. Retire the private Popen._remaining_time method in favor of module-level _deadline_remaining; all call sites (POSIX and Windows) updated. Defensive behavioural deltas: the stdin and stdout/stderr .close() calls in the I/O loop now swallow BrokenPipeError / OSError, matching __exit__ and the no-input path; previously these were bare. Adds test_communicate_timeout_resume_partial_write to cover _input_offset bookkeeping across TimeoutExpired/resume.
1 parent 1e7dfbc commit 2754e9a

2 files changed

Lines changed: 184 additions & 66 deletions

File tree

Lib/subprocess.py

Lines changed: 151 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,82 @@ def __repr__(self):
250250
else:
251251
_PopenSelector = selectors.SelectSelector
252252

253+
def _communicate_io_posix(selector, stdin, input_view, input_offset,
254+
output_buffers, endtime, *, close_on_eof=False):
255+
"""
256+
Low-level POSIX I/O multiplexing loop used by Popen._communicate.
257+
258+
Handles the select loop for reading/writing but does not manage
259+
stream lifecycle or raise timeout exceptions.
260+
261+
Args:
262+
selector: A _PopenSelector with streams already registered
263+
stdin: Writable file object for input, or None
264+
input_view: memoryview of input bytes, or None
265+
input_offset: Starting offset into input_view (for resume support)
266+
output_buffers: Dict {file_object: list} to append read chunks to
267+
endtime: Deadline timestamp, or None for no timeout
268+
close_on_eof: If True, close output streams immediately when they
269+
EOF rather than leaving them open for the caller to close.
270+
Used by Popen._communicate() to match its historical behavior
271+
of releasing fds as soon as the child closes the corresponding
272+
pipe.
273+
274+
Returns:
275+
(new_input_offset, completed)
276+
- new_input_offset: How many bytes of input were written
277+
- completed: True if all I/O finished, False if timed out
278+
279+
Note:
280+
- Closes output streams on EOF only if close_on_eof=True
281+
- Does NOT raise TimeoutExpired (caller handles)
282+
- Appends to output_buffers lists in place
283+
"""
284+
stdin_fd = stdin.fileno() if stdin else None
285+
286+
while selector.get_map():
287+
remaining = _deadline_remaining(endtime)
288+
if remaining is not None and remaining <= 0:
289+
return (input_offset, False) # Timed out
290+
291+
ready = selector.select(remaining)
292+
293+
# Check timeout after select (may have woken spuriously)
294+
if endtime is not None and _time() > endtime:
295+
return (input_offset, False) # Timed out
296+
297+
for key, events in ready:
298+
if key.fd == stdin_fd:
299+
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
300+
try:
301+
input_offset += os.write(key.fd, chunk)
302+
except BrokenPipeError:
303+
selector.unregister(key.fd)
304+
try:
305+
stdin.close()
306+
except BrokenPipeError:
307+
pass
308+
else:
309+
if input_offset >= len(input_view):
310+
selector.unregister(key.fd)
311+
try:
312+
stdin.close()
313+
except BrokenPipeError:
314+
pass
315+
elif key.fileobj in output_buffers:
316+
data = os.read(key.fd, 32768)
317+
if not data:
318+
selector.unregister(key.fileobj)
319+
if close_on_eof:
320+
try:
321+
key.fileobj.close()
322+
except OSError:
323+
pass
324+
else:
325+
output_buffers[key.fileobj].append(data)
326+
327+
return (input_offset, True) # Completed
328+
253329

254330
if _mswindows:
255331
# On Windows we just need to close `Popen._handle` when we no longer need
@@ -289,6 +365,45 @@ def _cleanup():
289365
DEVNULL = -3
290366

291367

368+
def _deadline_remaining(endtime):
369+
"""Calculate remaining time until deadline."""
370+
if endtime is None:
371+
return None
372+
return endtime - _time()
373+
374+
375+
def _flush_stdin(stdin):
376+
"""Flush stdin, ignoring BrokenPipeError and closed file ValueError."""
377+
try:
378+
stdin.flush()
379+
except BrokenPipeError:
380+
pass # communicate() must ignore BrokenPipeError.
381+
except ValueError:
382+
# Ignore ValueError: I/O operation on closed file.
383+
if not stdin.closed:
384+
raise
385+
386+
387+
def _make_input_view(input_data):
388+
"""Convert input data to a byte memoryview for writing.
389+
390+
Handles the case where input_data is already a memoryview with
391+
non-byte elements (e.g., int32 array) by casting to a byte view.
392+
This ensures len(view) returns the byte count, not element count.
393+
"""
394+
if not input_data:
395+
return None
396+
if isinstance(input_data, memoryview):
397+
return input_data.cast("b") # ensure byte view for correct len()
398+
return memoryview(input_data)
399+
400+
401+
def _translate_newlines(data, encoding, errors):
402+
"""Decode bytes to str and translate newlines to \n."""
403+
data = data.decode(encoding, errors)
404+
return data.replace("\r\n", "\n").replace("\r", "\n")
405+
406+
292407
# XXX This function is only used by multiprocessing and the test suite,
293408
# but it's here so that it can be imported when Python is compiled without
294409
# threads.
@@ -1149,8 +1264,8 @@ def universal_newlines(self, universal_newlines):
11491264
self.text_mode = bool(universal_newlines)
11501265

11511266
def _translate_newlines(self, data, encoding, errors):
1152-
data = data.decode(encoding, errors)
1153-
return data.replace("\r\n", "\n").replace("\r", "\n")
1267+
# Subclass-overridable hook; defers to the module-level helper.
1268+
return _translate_newlines(data, encoding, errors)
11541269

11551270
def __enter__(self):
11561271
return self
@@ -1277,7 +1392,7 @@ def communicate(self, input=None, timeout=None):
12771392
# See the detailed comment in .wait().
12781393
if timeout is not None:
12791394
sigint_timeout = min(self._sigint_wait_secs,
1280-
self._remaining_time(endtime))
1395+
_deadline_remaining(endtime))
12811396
else:
12821397
sigint_timeout = self._sigint_wait_secs
12831398
self._sigint_wait_secs = 0 # nothing else should wait.
@@ -1290,7 +1405,7 @@ def communicate(self, input=None, timeout=None):
12901405
finally:
12911406
self._communication_started = True
12921407
try:
1293-
self.wait(timeout=self._remaining_time(endtime))
1408+
self.wait(timeout=_deadline_remaining(endtime))
12941409
except TimeoutExpired as exc:
12951410
exc.timeout = timeout
12961411
raise
@@ -1304,14 +1419,6 @@ def poll(self):
13041419
return self._internal_poll()
13051420

13061421

1307-
def _remaining_time(self, endtime):
1308-
"""Convenience for _communicate when computing timeouts."""
1309-
if endtime is None:
1310-
return None
1311-
else:
1312-
return endtime - _time()
1313-
1314-
13151422
def _check_timeout(self, endtime, orig_timeout, stdout_seq, stderr_seq,
13161423
skip_check_and_raise=False):
13171424
"""Convenience for checking if a timeout has expired."""
@@ -1337,7 +1444,7 @@ def wait(self, timeout=None):
13371444
# generated SIGINT and will exit rapidly.
13381445
if timeout is not None:
13391446
sigint_timeout = min(self._sigint_wait_secs,
1340-
self._remaining_time(endtime))
1447+
_deadline_remaining(endtime))
13411448
else:
13421449
sigint_timeout = self._sigint_wait_secs
13431450
self._sigint_wait_secs = 0 # nothing else should wait.
@@ -1704,19 +1811,19 @@ def _communicate(self, input, endtime, orig_timeout):
17041811
# thread remains writing and the fd left open in case the user
17051812
# calls communicate again.
17061813
if hasattr(self, "_stdin_thread"):
1707-
self._stdin_thread.join(self._remaining_time(endtime))
1814+
self._stdin_thread.join(_deadline_remaining(endtime))
17081815
if self._stdin_thread.is_alive():
17091816
raise TimeoutExpired(self.args, orig_timeout)
17101817

17111818
# Wait for the reader threads, or time out. If we time out, the
17121819
# threads remain reading and the fds left open in case the user
17131820
# calls communicate again.
17141821
if self.stdout is not None:
1715-
self.stdout_thread.join(self._remaining_time(endtime))
1822+
self.stdout_thread.join(_deadline_remaining(endtime))
17161823
if self.stdout_thread.is_alive():
17171824
raise TimeoutExpired(self.args, orig_timeout)
17181825
if self.stderr is not None:
1719-
self.stderr_thread.join(self._remaining_time(endtime))
1826+
self.stderr_thread.join(_deadline_remaining(endtime))
17201827
if self.stderr_thread.is_alive():
17211828
raise TimeoutExpired(self.args, orig_timeout)
17221829

@@ -2210,7 +2317,7 @@ def _wait(self, timeout):
22102317
break
22112318
finally:
22122319
self._waitpid_lock.release()
2213-
remaining = self._remaining_time(endtime)
2320+
remaining = _deadline_remaining(endtime)
22142321
if remaining <= 0:
22152322
raise TimeoutExpired(self.args, timeout)
22162323
delay = min(delay * 2, remaining, .05)
@@ -2234,14 +2341,7 @@ def _communicate(self, input, endtime, orig_timeout):
22342341
if self.stdin and not self._communication_started:
22352342
# Flush stdio buffer. This might block, if the user has
22362343
# been writing to .stdin in an uncontrolled fashion.
2237-
try:
2238-
self.stdin.flush()
2239-
except BrokenPipeError:
2240-
pass # communicate() must ignore BrokenPipeError.
2241-
except ValueError:
2242-
# ignore ValueError: I/O operation on closed file.
2243-
if not self.stdin.closed:
2244-
raise
2344+
_flush_stdin(self.stdin)
22452345
if not input:
22462346
try:
22472347
self.stdin.close()
@@ -2266,11 +2366,8 @@ def _communicate(self, input, endtime, orig_timeout):
22662366

22672367
self._save_input(input)
22682368

2269-
if self._input:
2270-
if not isinstance(self._input, memoryview):
2271-
input_view = memoryview(self._input)
2272-
else:
2273-
input_view = self._input.cast("b") # byte input required
2369+
input_view = _make_input_view(self._input)
2370+
input_offset = self._input_offset if self._input else 0
22742371

22752372
with _PopenSelector() as selector:
22762373
if self.stdin and not self.stdin.closed and self._input:
@@ -2280,43 +2377,31 @@ def _communicate(self, input, endtime, orig_timeout):
22802377
if self.stderr and not self.stderr.closed:
22812378
selector.register(self.stderr, selectors.EVENT_READ)
22822379

2283-
while selector.get_map():
2284-
timeout = self._remaining_time(endtime)
2285-
if timeout is not None and timeout <= 0:
2286-
self._check_timeout(endtime, orig_timeout,
2287-
stdout, stderr,
2288-
skip_check_and_raise=True)
2289-
raise RuntimeError( # Impossible :)
2290-
'_check_timeout(..., skip_check_and_raise=True) '
2291-
'failed to raise TimeoutExpired.')
2292-
2293-
ready = selector.select(timeout)
2294-
self._check_timeout(endtime, orig_timeout, stdout, stderr)
2295-
2296-
# XXX Rewrite these to use non-blocking I/O on the file
2297-
# objects; they are no longer using C stdio!
2298-
2299-
for key, events in ready:
2300-
if key.fileobj is self.stdin:
2301-
chunk = input_view[self._input_offset :
2302-
self._input_offset + _PIPE_BUF]
2303-
try:
2304-
self._input_offset += os.write(key.fd, chunk)
2305-
except BrokenPipeError:
2306-
selector.unregister(key.fileobj)
2307-
key.fileobj.close()
2308-
else:
2309-
if self._input_offset >= len(input_view):
2310-
selector.unregister(key.fileobj)
2311-
key.fileobj.close()
2312-
elif key.fileobj in (self.stdout, self.stderr):
2313-
data = os.read(key.fd, 32768)
2314-
if not data:
2315-
selector.unregister(key.fileobj)
2316-
key.fileobj.close()
2317-
self._fileobj2output[key.fileobj].append(data)
2380+
stdin_to_write = (self.stdin if self.stdin and self._input
2381+
and not self.stdin.closed else None)
2382+
# Persist the returned offset on self so a subsequent
2383+
# communicate() after a TimeoutExpired resumes mid-input
2384+
# rather than re-sending bytes the child already consumed.
2385+
new_offset, completed = _communicate_io_posix(
2386+
selector,
2387+
stdin_to_write,
2388+
input_view,
2389+
input_offset,
2390+
self._fileobj2output,
2391+
endtime,
2392+
close_on_eof=True)
2393+
if self._input:
2394+
self._input_offset = new_offset
2395+
2396+
if not completed:
2397+
self._check_timeout(endtime, orig_timeout, stdout, stderr,
2398+
skip_check_and_raise=True)
2399+
raise RuntimeError( # Impossible :)
2400+
'_check_timeout(..., skip_check_and_raise=True) '
2401+
'failed to raise TimeoutExpired.')
2402+
23182403
try:
2319-
self.wait(timeout=self._remaining_time(endtime))
2404+
self.wait(timeout=_deadline_remaining(endtime))
23202405
except TimeoutExpired as exc:
23212406
exc.timeout = orig_timeout
23222407
raise

Lib/test/test_subprocess.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,39 @@ def test_communicate_timeout_large_input(self):
11301130
p.kill()
11311131
p.wait()
11321132

1133+
def test_communicate_timeout_resume_partial_write(self):
1134+
"""Resume writing input after a partial-write TimeoutExpired.
1135+
1136+
Exercises the _input_offset bookkeeping across the
1137+
_communicate_io_posix factoring: a first communicate() must time out
1138+
mid-write, and a subsequent communicate() must finish delivering the
1139+
remaining bytes so the child receives the full input intact.
1140+
"""
1141+
# 1 MiB easily exceeds typical pipe buffers (~64 KiB) so writing
1142+
# blocks once the buffer fills before the child starts reading.
1143+
input_data = bytes(range(256)) * 4096 # 1 MiB, distinctive pattern
1144+
self.assertEqual(len(input_data), 1024 * 1024)
1145+
1146+
p = subprocess.Popen(
1147+
[sys.executable, "-c",
1148+
"import sys, time; "
1149+
"time.sleep(0.5); "
1150+
"sys.stdout.buffer.write(sys.stdin.buffer.read())"],
1151+
stdin=subprocess.PIPE,
1152+
stdout=subprocess.PIPE,
1153+
stderr=subprocess.PIPE)
1154+
try:
1155+
with self.assertRaises(subprocess.TimeoutExpired):
1156+
p.communicate(input_data, timeout=0.05)
1157+
1158+
# Resume: no new input, generous timeout to avoid CI flakes.
1159+
stdout, stderr = p.communicate(timeout=support.LONG_TIMEOUT)
1160+
self.assertEqual(len(stdout), len(input_data))
1161+
self.assertEqual(stdout, input_data)
1162+
finally:
1163+
p.kill()
1164+
p.wait()
1165+
11331166
# Test for the fd leak reported in http://bugs.python.org/issue2791.
11341167
def test_communicate_pipe_fd_leak(self):
11351168
for stdin_pipe in (False, True):

0 commit comments

Comments
 (0)