Skip to content

Instantly share code, notes, and snippets.

@mywaiting
Created January 8, 2025 02:29
Show Gist options
  • Save mywaiting/49e4104e2fc94c2a44f5e165d9e0be0f to your computer and use it in GitHub Desktop.
Save mywaiting/49e4104e2fc94c2a44f5e165d9e0be0f to your computer and use it in GitHub Desktop.
python multiprocessing queue 在处理带引用数据时,可能存在无法解引用而导致原始数据受到污染的情况
import pprint
import multiprocessing
from multiprocessing.queues import Queue
#
# python multiprocessing queue 在处理带引用数据时,可能存在 pickle 无法解引用而导致原始数据受到污染的情况
# 此处只是测试用示例实现
#
# 解决方法见 https://stackoverflow.com/q/28593103
# 原始 issue 见 https://bugs.python.org/issue17025
#
class Test:
task = {
'taskid': 'taskid',
'project': {
'projectid': 'test_project',
},
'url': '',
'fetch': {
'method': 'GET',
'headers': {
'Cookie': 'a=b',
'a': 'b'
},
'timeout': 60,
'context': [1, 2, 3]
}
}
def __init__(self):
self.q = Queue(10, ctx=multiprocessing.get_context())
self.test_a()
self.test_b()
def test_a(self):
task = dict(self.task)
task['taskid'] = 'test_a'
task['url'] = 'test_a_url'
task['fetch']['timeout'] = 'original_data_has_changed_here'
self.q.put(task)
t = self.q.get()
print('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
pprint.pp(t)
def test_b(self):
task = dict(self.task) # means has shallow copy value
self.q.put(task)
t = self.q.get()
print('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
pprint.pp(t)
# task.fetch.timeout should be 60
if t['fetch']['timeout'] != 60:
raise ValueError(f'task.fetch/timeout has been changed')
if __name__ == '__main__':
Test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment