Dask, похоже, делится памятью для глобальной переменной, что, как я думал, было невозможно

#python #dask

Вопрос:

Вот несколько воспроизводимых кодов:

  1. Вот Dask с глобальной переменной:
 # method_file.py

import dask
import random

class TestClass():

    def from_dataframe(self, pdf):
        global data
        data = pdf

    @staticmethod
    def work(elem):
        data.iloc[0, 0] = 9999
        return len(data)   elem

    def do(self):
        tasks = [dask.delayed(TestClass.work)(random.randint(1,500)) for x in range(10)]
        re = dask.compute(*tasks)
        return re
 
 # main_file.py

from method_file import TestClass
import numpy as np
import pandas as pd 

if __name__ == '__main__':
    ar = np.arange(500000000).reshape(5000000, 100)
    pdf = pd.DataFrame(ar)

    tc = TestClass()
    tc.from_dataframe(pdf)
    print(tc.do())
    print(pdf.head(3))
 
 python3 main_file.py
 

Это выводит:

 (5000117, 5000054, 5000304, 5000111, 5000010, 5000264, 5000201, 5000346, 5000486, 5000376)
     0    1    2    3    4    5    6   ...   93   94   95   96   97   98   99
0  9999    1    2    3    4    5    6  ...   93   94   95   96   97   98   99
1   100  101  102  103  104  105  106  ...  193  194  195  196  197  198  199
2   200  201  202  203  204  205  206  ...  293  294  295  296  297  298  299

[3 rows x 100 columns]

 

Это означает work , что метод смог прочитать data глобальную переменную. Мало того, что это даже изменило pdf переменную! Я знаю, что многопроцессорная обработка с fork помощью также может считывать данные таким образом, как мы видим ниже.

  1. Вот многопроцессорная обработка с fork методом запуска.
 # method_file2.py

from multiprocessing import Pool
import multiprocessing
import random

class TestClass():

    def from_dataframe(self, pdf):
        global data
        data = pdf

    @staticmethod
    def work(elem):
        data.iloc[0, 0] = 9999
        return len(data)   elem

    def do(self):

        multiprocessing.set_start_method('fork')

        pool = Pool(6)
        procs = [pool.apply_async(TestClass.work, args=(random.randint(1,500), )) for i in range(1, 10)]
        re = [proc.get() for proc in procs]
        return re
 
 # main_file2.py

from method_file2 import TestClass
import numpy as np
import pandas as pd 

if __name__ == '__main__':
    ar = np.arange(500000000).reshape(5000000, 100)
    pdf = pd.DataFrame(ar)

    tc = TestClass()
    tc.from_dataframe(pdf)
    print(tc.do())
    print(pdf.head(3))
 
 python3 main_file2.py
 

Это выводит:

 [5000456, 5000346, 5000122, 5000120, 5000358, 5000067, 5000375, 5000444, 5000288]
    0    1    2    3    4    5    6   ...   93   94   95   96   97   98   99
0    0    1    2    3    4    5    6  ...   93   94   95   96   97   98   99
1  100  101  102  103  104  105  106  ...  193  194  195  196  197  198  199
2  200  201  202  203  204  205  206  ...  293  294  295  296  297  298  299

[3 rows x 100 columns]
 

Как вы можете видеть, он может считывать, так как состояние основного процесса скопировано, но он не может изменять объект ( pdf ).

Я не буду включать здесь больше кодов для краткости, но я также попытался выбрать время для выбора pdf объекта, и я точно знаю, что Dask не выбирает этот фрейм данных. Как возможно, что Dask может таким образом обмениваться памятью между процессами?

Ответ №1:

Я не вижу, чтобы вы настраивали какие-либо процессы. Планировщик Dask по умолчанию использует пул потоков, поэтому все задачи могут видеть одни и те же переменные. Видишь https://docs.dask.org/en/latest/scheduler-overview.html