246 lines
8.5 KiB
Python
246 lines
8.5 KiB
Python
import sys
|
|
import multiprocessing as mp
|
|
import queue
|
|
import threading
|
|
import time
|
|
import logging
|
|
|
|
# Configure logging. Adjust level and formatting as needed.
|
|
# logging.basicConfig(level=logging.DEBUG,
|
|
# format='[%(asctime)s] %(levelname)s:%(name)s: %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
# Only set the start method if it hasn't been set already.
|
|
if sys.platform.startswith('linux') or sys.platform == 'darwin': # For Linux or macOS
|
|
mp.set_start_method("spawn")
|
|
elif mp.get_start_method(allow_none=True) is None:
|
|
mp.set_start_method("spawn")
|
|
except RuntimeError as e:
|
|
logger.debug("Start method has already been set. Details: %s", e)
|
|
|
|
|
|
class ParentPipe:
|
|
"""
|
|
A thread-safe wrapper around the 'parent end' of a multiprocessing pipe.
|
|
All actual pipe operations happen in a dedicated worker thread, so it's safe
|
|
for multiple threads to call send(), recv(), or poll() on the same ParentPipe
|
|
without interfering.
|
|
"""
|
|
def __init__(self, parent_synthesize_pipe):
|
|
self.name = "ParentPipe"
|
|
self._pipe = parent_synthesize_pipe # The raw pipe.
|
|
self._closed = False # A flag to mark if close() has been called.
|
|
|
|
# The request queue for sending operations to the worker.
|
|
self._request_queue = queue.Queue()
|
|
|
|
# This event signals the worker thread to stop.
|
|
self._stop_event = threading.Event()
|
|
|
|
# Worker thread that executes actual .send(), .recv(), .poll() calls.
|
|
self._worker_thread = threading.Thread(
|
|
target=self._pipe_worker,
|
|
name=f"{self.name}_Worker",
|
|
daemon=True
|
|
)
|
|
self._worker_thread.start()
|
|
|
|
def _pipe_worker(self):
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
request = self._request_queue.get(timeout=0.1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if request["type"] == "CLOSE":
|
|
# Exit worker loop on CLOSE request.
|
|
break
|
|
|
|
try:
|
|
if request["type"] == "SEND":
|
|
data = request["data"]
|
|
logger.debug("[%s] Worker: sending => %s", self.name, data)
|
|
self._pipe.send(data)
|
|
request["result_queue"].put(None)
|
|
|
|
elif request["type"] == "RECV":
|
|
logger.debug("[%s] Worker: receiving...", self.name)
|
|
data = self._pipe.recv()
|
|
request["result_queue"].put(data)
|
|
|
|
elif request["type"] == "POLL":
|
|
timeout = request.get("timeout", 0.0)
|
|
logger.debug("[%s] Worker: poll() with timeout: %s", self.name, timeout)
|
|
result = self._pipe.poll(timeout)
|
|
request["result_queue"].put(result)
|
|
|
|
except (EOFError, BrokenPipeError, OSError) as e:
|
|
# When the other end has closed or an error occurs,
|
|
# log and notify the waiting thread.
|
|
logger.debug("[%s] Worker: pipe closed or error occurred (%s). Shutting down.", self.name, e)
|
|
request["result_queue"].put(None)
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.exception("[%s] Worker: unexpected error.", self.name)
|
|
request["result_queue"].put(e)
|
|
break
|
|
|
|
logger.debug("[%s] Worker: stopping.", self.name)
|
|
try:
|
|
self._pipe.close()
|
|
except Exception as e:
|
|
logger.debug("[%s] Worker: error during pipe close: %s", self.name, e)
|
|
|
|
def send(self, data):
|
|
"""
|
|
Synchronously asks the worker thread to perform .send().
|
|
"""
|
|
if self._closed:
|
|
logger.debug("[%s] send() called but pipe is already closed", self.name)
|
|
return
|
|
logger.debug("[%s] send() requested with: %s", self.name, data)
|
|
result_queue = queue.Queue()
|
|
request = {
|
|
"type": "SEND",
|
|
"data": data,
|
|
"result_queue": result_queue
|
|
}
|
|
self._request_queue.put(request)
|
|
result_queue.get() # Wait until sending completes.
|
|
logger.debug("[%s] send() completed", self.name)
|
|
|
|
def recv(self):
|
|
"""
|
|
Synchronously asks the worker to perform .recv() and returns the data.
|
|
"""
|
|
if self._closed:
|
|
logger.debug("[%s] recv() called but pipe is already closed", self.name)
|
|
return None
|
|
logger.debug("[%s] recv() requested", self.name)
|
|
result_queue = queue.Queue()
|
|
request = {
|
|
"type": "RECV",
|
|
"result_queue": result_queue
|
|
}
|
|
self._request_queue.put(request)
|
|
data = result_queue.get()
|
|
|
|
# Log a preview for huge byte blobs.
|
|
if isinstance(data, tuple) and len(data) == 2 and isinstance(data[1], bytes):
|
|
data_preview = (data[0], f"<{len(data[1])} bytes>")
|
|
else:
|
|
data_preview = data
|
|
logger.debug("[%s] recv() returning => %s", self.name, data_preview)
|
|
return data
|
|
|
|
def poll(self, timeout=0.0):
|
|
"""
|
|
Synchronously checks whether data is available.
|
|
Returns True if data is ready, or False otherwise.
|
|
"""
|
|
if self._closed:
|
|
return False
|
|
logger.debug("[%s] poll() requested with timeout: %s", self.name, timeout)
|
|
result_queue = queue.Queue()
|
|
request = {
|
|
"type": "POLL",
|
|
"timeout": timeout,
|
|
"result_queue": result_queue
|
|
}
|
|
self._request_queue.put(request)
|
|
try:
|
|
# Use a slightly longer timeout to give the worker a chance.
|
|
result = result_queue.get(timeout=timeout + 0.1)
|
|
except queue.Empty:
|
|
result = False
|
|
logger.debug("[%s] poll() returning => %s", self.name, result)
|
|
return result
|
|
|
|
def close(self):
|
|
"""
|
|
Closes the pipe and stops the worker thread. The _closed flag makes
|
|
sure no further operations are attempted.
|
|
"""
|
|
if self._closed:
|
|
return
|
|
logger.debug("[%s] close() called", self.name)
|
|
self._closed = True
|
|
stop_request = {"type": "CLOSE", "result_queue": queue.Queue()}
|
|
self._request_queue.put(stop_request)
|
|
self._stop_event.set()
|
|
self._worker_thread.join()
|
|
logger.debug("[%s] closed", self.name)
|
|
|
|
|
|
def SafePipe(debug=False):
|
|
"""
|
|
Returns a pair: (thread-safe parent pipe, raw child pipe).
|
|
"""
|
|
parent_synthesize_pipe, child_synthesize_pipe = mp.Pipe()
|
|
parent_pipe = ParentPipe(parent_synthesize_pipe)
|
|
return parent_pipe, child_synthesize_pipe
|
|
|
|
|
|
def child_process_code(child_end):
|
|
"""
|
|
Example child process code that receives messages, logs them,
|
|
sends acknowledgements, and then closes.
|
|
"""
|
|
for i in range(3):
|
|
msg = child_end.recv()
|
|
logger.debug("[Child] got: %s", msg)
|
|
child_end.send(f"ACK: {msg}")
|
|
child_end.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parent_pipe, child_pipe = SafePipe()
|
|
|
|
# Create child process with the child_process_code function.
|
|
p = mp.Process(target=child_process_code, args=(child_pipe,))
|
|
p.start()
|
|
|
|
# Event to signal sender threads to stop if needed.
|
|
stop_polling_event = threading.Event()
|
|
|
|
def sender_thread(n):
|
|
try:
|
|
parent_pipe.send(f"hello_from_thread_{n}")
|
|
except Exception as e:
|
|
logger.debug("[sender_thread_%s] send exception: %s", n, e)
|
|
return
|
|
|
|
# Use a poll loop with error handling.
|
|
for _ in range(10):
|
|
try:
|
|
if parent_pipe.poll(0.1):
|
|
reply = parent_pipe.recv()
|
|
logger.debug("[sender_thread_%s] got: %s", n, reply)
|
|
break
|
|
else:
|
|
logger.debug("[sender_thread_%s] no data yet...", n)
|
|
except (OSError, EOFError, BrokenPipeError) as e:
|
|
logger.debug("[sender_thread_%s] poll/recv exception: %s. Exiting thread.", n, e)
|
|
break
|
|
|
|
# Allow exit if a shutdown is signaled.
|
|
if stop_polling_event.is_set():
|
|
logger.debug("[sender_thread_%s] stop event set. Exiting thread.", n)
|
|
break
|
|
|
|
threads = []
|
|
for i in range(3):
|
|
t = threading.Thread(target=sender_thread, args=(i,))
|
|
t.start()
|
|
threads.append(t)
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Signal shutdown to any polling threads, then close the pipe.
|
|
stop_polling_event.set()
|
|
parent_pipe.close()
|
|
p.join()
|