from concurrent.futures import ProcessPoolExecutorexecutor = ProcessPoolExecutor(max_workers=4)def detect_image(image: bytes) -> str:# cpu-bound taskreturn 'cat'async def http_handler(request: Request) -> Response:image = await request.read()loop = asyncio.get_running_loop()result = await loop.run_in_executor(executor, detect_image, image)return Response(result)
from concurrent.futures import ProcessPoolExecutorfrom multiprocessing import shared_memoryexecutor = ProcessPoolExecutor(max_workers=4)def detect_image(shm: shared_memory.SharedMemory) -> str:# read image from shared memoryimage = shm.buf# cpu-bound task ...return 'cat'async def http_handler(request: Request) -> Response:loop = asyncio.get_running_loop()image = await request.read()# store image to shared memoryshm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))shm.buf[:len(image)] = imageresult = await loop.run_in_executor(executor, detect_image, shm)return Response(result)
...def detect_image(shm: shared_memory.SharedMemory) -> str:image = shm.buf# cpu-bound task ...result = 'cat'shm.close()return resultasync def http_handler(request: Request) -> Response:loop = asyncio.get_running_loop()image = await request.read()# store image to shared memoryshm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(image))shm.buf[:len(image)] = imageresult = await loop.run_in_executor(executor, detect_image, shm)shm.close()shm.unlink()return Response(result)
Такой подход можно использовать для простых сценариев, но проблема в том, что много всего нужно писать руками. При большой кодовой базе можно легко запутаться, что-то где-то не удалить или, наоборот, удалить лишний раз.
from concurrent.futures import ProcessPoolExecutorfrom multiprocessing.managers import SharedMemoryManagerdef detect_image(shm) -> str:image = shm.buf# cpu-bound task ...return 'cat'async def http_handler(request: Request) -> Response:loop = asyncio.get_running_loop()image = await request.read()# store image to shared memorywith SharedMemoryManager() as manager:shm = manager.SharedMemory(size=len(image))shm.buf[:len(image)] = imageresult = await loop.run_in_executor(executor, detect_image, shm)return Response(result)
from multiprocessing import shared_memoryclass SharedMemoryWrapper:def __init__(self, data: bytes) -> None:self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data))self._owner = Trueself._shm.buf[:len(data)] = data@propertydef buf(self) -> memoryview:return self._shm.bufdef __getstate__(self) -> dict[str, Any]:self._owner = Falsereturn {'name': self._shm.name}def __setstate__(self, state: dict[str, Any]) -> None:self._shm = shared_memory.SharedMemory(name=state['name'], create=False)self._owner = Truedef __del__(self) -> None:self._shm.close()if self._owner:self._shm.unlink()
from concurrent.futures import ProcessPoolExecutordef detect_image(shm: SharedMemoryWrapper) -> str:image = shm.buf# cpu-bound task ...return 'cat'async def http_handler(request: Request) -> Response:loop = asyncio.get_running_loop()image = await request.read()shm = SharedMemoryWrapper(image)result = await loop.run_in_executor(executor, detect_image, shm)return Response(result)
from multiprocessing import shared_memoryclass SharedMemoryWrapper:def __init__(self, data: bytes) -> None:self._shm = shared_memory.SharedMemory(name='shared-mem-for-image', create=True, size=len(data) + 1)self._shm.buf[:len(data)] = dataself._shm.buf[-1] = 1 # reference counter@propertydef buf(self) -> memoryview:return self._shm.bufdef __getstate__(self) -> dict[str, Any]:self._shm.buf[-1] += 1return {'name': self._shm.name}def __setstate__(self, state: dict[str, Any]) -> None:self._shm = shared_memory.SharedMemory(name=state['name'], create=False)def __del__(self) -> None:self._shm.buf[-1] -= 1rc = self._shm.buf[-1]self._shm.close()if rc == 0:self._shm.unlink()
from multiprocessing import Manager, LockCOUNTERS = Manager().dict()class GlobalRC:def __init__(self) -> None:self._counters = COUNTERSself._lock = Lock()def incr(self, shm_name: str) -> None:with self._lock:if shm_name not in self._counters:self._counters[shm_name] = 0self._counters[shm_name] += 1def decr(self, shm_name: str) -> None:with self._lock:self._counters[shm_name] -= 1curr = self._counters[shm_name]if curr == 0:self._counters.pop(shm_name)return curr
atomic = ffi.verify("""uint32_t load_uint32(uint32_t *v) {return __atomic_load_n(v, __ATOMIC_SEQ_CST);};void store_uint32(uint32_t *v, uint32_t n) {uint32_t i = n;__atomic_store(v, &i, __ATOMIC_SEQ_CST);};uint32_t add_and_fetch_uint32(uint32_t *v, uint32_t i) {return __atomic_add_fetch(v, i, __ATOMIC_SEQ_CST);};uint32_t sub_and_fetch_uint32(uint32_t *v, uint32_t i) {return __atomic_sub_fetch(v, i, __ATOMIC_SEQ_CST);};""")
class AtomicCounter:def __init__(self, view: memoryview):self._ptr = ffi.cast('uint32_t*', ffi.from_buffer(view[:self.size()]))def get(self):return atomic.load_uint32(self._ptr)def set(self, n):return atomic.store_uint32(self._ptr, n)def inc(self):return atomic.add_and_fetch_uint32(self._ptr, 1)def dec(self):return atomic.sub_and_fetch_uint32(self._ptr, 1)@staticmethoddef size():return ffi.sizeof('uint32_t')