Skip to content

Commit 6104c6e

Browse files
committed
Last update including:
+ refactoring code on SemLock_imp and SemLock__rebuild_impl, + change conditional compilation value PyDEBUG to Py_REF_DEBUG, + add sizeof CounteObject in the header + add 3 unit tests
1 parent ce5dc4c commit 6104c6e

3 files changed

Lines changed: 266 additions & 127 deletions

File tree

Lib/test/_test_multiprocessing.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7389,6 +7389,7 @@ def test_preload_main_large_sys_argv(self):
73897389
ACQUIRE, RELEASE = range(2)
73907390
@unittest.skipIf(sys.platform != "darwin", "MacOSX only")
73917391
class _TestMacOSXSemaphore(BaseTestCase):
7392+
73927393
ALLOWED_TYPES = ('processes',)
73937394
@classmethod
73947395
def _run_thread(cls, sem, meth, ntime, delay):
@@ -7438,6 +7439,159 @@ def test_mix_several_acquire_release(self):
74387439
p.join()
74397440
self.assertEqual(sem.get_value(), n_rel)
74407441

7442+
@unittest.skipUnless(hasattr(_multiprocessing, '_MACOSX_SHAREDMEM_NAME'), "C Workaround for `get_value` is necessary")
7443+
def test_sharedmem_and_lock_names(self):
7444+
7445+
self.assertTrue(hasattr(_multiprocessing, '_MACOSX_SHAREDMEM_NAME'))
7446+
self.assertTrue(_multiprocessing._MACOSX_SHAREDMEM_NAME.startswith("/psm-gh125828-"))
7447+
self.assertTrue(_multiprocessing._MACOSX_SHAREDMEM_NAME.endswith(f"-{os.getpid()}"))
7448+
7449+
self.assertTrue(hasattr(_multiprocessing, '_MACOSX_SHMLOCK_NAME'))
7450+
self.assertTrue(_multiprocessing._MACOSX_SHMLOCK_NAME.startswith("/mp-gh125828-"))
7451+
self.assertTrue(_multiprocessing._MACOSX_SHMLOCK_NAME.endswith(f"-{os.getpid()}"))
7452+
7453+
@unittest.skipUnless(hasattr(_multiprocessing, '_MACOSX_SHAREDMEM_NAME'), "C Workaround for `get_value` is necessary")
7454+
def test_exist_sharedmem_and_lock(self):
7455+
import multiprocessing.synchronize as synchronize
7456+
import _multiprocessing
7457+
import _posixshmem
7458+
7459+
# shared memory
7460+
shm_name = _multiprocessing._MACOSX_SHAREDMEM_NAME
7461+
7462+
# shm lock
7463+
lock_name = _multiprocessing._MACOSX_SHMLOCK_NAME
7464+
7465+
# check if shared mem already exists.
7466+
with self.assertRaises(FileNotFoundError):
7467+
_posixshmem.shm_open(shm_name,
7468+
os.O_RDWR,
7469+
0o600)
7470+
7471+
# check if lock does not exists.
7472+
with self.assertRaises(FileNotFoundError):
7473+
_multiprocessing.sem_unlink(lock_name)
7474+
7475+
# create 2 semaphores
7476+
sems = [self.Semaphore(10), self.BoundedSemaphore(3)]
7477+
7478+
# check if shared mem already exists.
7479+
with self.assertRaises(FileExistsError):
7480+
_posixshmem.shm_open(shm_name,
7481+
os.O_CREAT | os.O_EXCL | os.O_RDWR,
7482+
0o600)
7483+
7484+
# check if lock already exists.
7485+
with self.assertRaises(FileExistsError):
7486+
_multiprocessing.SemLock(synchronize.SEMAPHORE,
7487+
1, 1,
7488+
lock_name,
7489+
unlink=0)
7490+
7491+
# remove all semaphores
7492+
while len(sems) and (s := sems.pop()):
7493+
del s
7494+
7495+
# check if shared mem does not exists.
7496+
with self.assertRaises(FileNotFoundError):
7497+
_posixshmem.shm_unlink(shm_name)
7498+
7499+
# check if lock does not exists.
7500+
with self.assertRaises(FileNotFoundError):
7501+
_multiprocessing.sem_unlink(lock_name)
7502+
7503+
def _get_counters_and_test(self, l, sem):
7504+
internal_counter, pending_acquires = l[0], l[1]
7505+
self.assertEqual(max(0, internal_counter - pending_acquires),
7506+
sem.get_value())
7507+
7508+
def _get_name_and_test(self, b, sem):
7509+
name = b.decode("utf-8").rstrip('\x00')
7510+
self.assertEqual(sem._semlock.name, name)
7511+
7512+
def test_sharedmem_details(self):
7513+
import mmap
7514+
import _posixshmem
7515+
7516+
# create semaphores
7517+
sems = [self.Semaphore(10), self.BoundedSemaphore(5), self.Semaphore(0)]
7518+
n = len(sems)
7519+
7520+
# Starts n_acqs threads to acquire sems[-1],
7521+
# which will update the sharedmem content regarding pending acquires.
7522+
n_acqs = 3
7523+
ts = [threading.Thread(target= sems[-1].acquire) for _ in range(n_acqs)]
7524+
for t in ts:
7525+
t.start()
7526+
7527+
# check content of the sharedmem: header, plus n semaphoreq.
7528+
# See file ./Modules/multiprocessing/semaphore_macosx.h
7529+
# for the shared memory layout.
7530+
shm_name = _multiprocessing._MACOSX_SHAREDMEM_NAME
7531+
try:
7532+
_fd = _posixshmem.shm_open(shm_name, os.O_RDWR, 0o600)
7533+
if _fd:
7534+
size = os.fstat(_fd).st_size
7535+
_mmap = mmap.mmap(_fd, size)
7536+
7537+
# header
7538+
start = 0
7539+
_len = 16 # 4 ints
7540+
with memoryview(_mmap[:16]).cast('i') as _buf: # signed int
7541+
header = _buf[:4].tolist()
7542+
self.assertEqual(header[0], n)
7543+
self.assertEqual(header[-2], size)
7544+
sizeof_counter = header[-1]
7545+
7546+
start += _len
7547+
len_sem_name = 16 # 16 chars for the name of the semaphore
7548+
len_internal_counter = 6 # 3 unisgned short
7549+
len_ctimestamp = 8 # # 1 ctime
7550+
struct_format = (
7551+
(len_sem_name, 'B', memoryview.tobytes, self._get_name_and_test), # unsigned char
7552+
(len_internal_counter, 'H', memoryview.tolist, self._get_counters_and_test), # unsigned short
7553+
(len_ctimestamp, 'L', memoryview.tolist, None), # unsigned long
7554+
)
7555+
# all semahores in the counters array
7556+
for s in sems:
7557+
st = start
7558+
for _len, _cast,_convert, _assert in struct_format:
7559+
with memoryview(_mmap[st:st+_len]).cast(_cast) as _buf:
7560+
if _assert is not None:
7561+
_assert(_convert(_buf), s)
7562+
st += _len # after each part of a semaphore data
7563+
start += sizeof_counter # after each semaphore data
7564+
# end of exploration
7565+
7566+
# Remove first created sem from sems array.
7567+
s = sems.pop(0)
7568+
del s
7569+
# Check that its dedicated counter in sharedmem is reset to 0.
7570+
st = 16
7571+
with memoryview(_mmap[st:st+sizeof_counter]).cast('B') as _buf:
7572+
self.assertEqual(_buf.tobytes(), bytes([0]*sizeof_counter))
7573+
7574+
# Check number of semaphores in header is changed.
7575+
with memoryview(_mmap[:16]).cast('i') as _buf: # signed int
7576+
header = _buf[:4].tolist()
7577+
self.assertEqual(header[0], len(sems))
7578+
7579+
_mmap.close()
7580+
os.close(_fd)
7581+
7582+
except Exception as e:
7583+
raise
7584+
7585+
# release sems[-1] regarding each acquire,
7586+
# then finish each thread
7587+
for t in ts:
7588+
sems[-1].release()
7589+
t.join()
7590+
7591+
# remove all semaphores
7592+
while len(sems) and (s := sems.pop()):
7593+
del s
7594+
74417595

74427596
#
74437597
# Mixins

0 commit comments

Comments
 (0)