Created
January 8, 2025 02:29
-
-
Save mywaiting/49e4104e2fc94c2a44f5e165d9e0be0f to your computer and use it in GitHub Desktop.
python multiprocessing queue 在处理带引用数据时,可能存在无法解引用而导致原始数据受到污染的情况
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pprint | |
import multiprocessing | |
from multiprocessing.queues import Queue | |
# | |
# python multiprocessing queue 在处理带引用数据时,可能存在 pickle 无法解引用而导致原始数据受到污染的情况 | |
# 此处只是测试用示例实现 | |
# | |
# 解决方法见 https://stackoverflow.com/q/28593103 | |
# 原始 issue 见 https://bugs.python.org/issue17025 | |
# | |
class Test: | |
task = { | |
'taskid': 'taskid', | |
'project': { | |
'projectid': 'test_project', | |
}, | |
'url': '', | |
'fetch': { | |
'method': 'GET', | |
'headers': { | |
'Cookie': 'a=b', | |
'a': 'b' | |
}, | |
'timeout': 60, | |
'context': [1, 2, 3] | |
} | |
} | |
def __init__(self): | |
self.q = Queue(10, ctx=multiprocessing.get_context()) | |
self.test_a() | |
self.test_b() | |
def test_a(self): | |
task = dict(self.task) | |
task['taskid'] = 'test_a' | |
task['url'] = 'test_a_url' | |
task['fetch']['timeout'] = 'original_data_has_changed_here' | |
self.q.put(task) | |
t = self.q.get() | |
print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') | |
pprint.pp(t) | |
def test_b(self): | |
task = dict(self.task) # means has shallow copy value | |
self.q.put(task) | |
t = self.q.get() | |
print('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') | |
pprint.pp(t) | |
# task.fetch.timeout should be 60 | |
if t['fetch']['timeout'] != 60: | |
raise ValueError(f'task.fetch/timeout has been changed') | |
if __name__ == '__main__': | |
Test() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment