Как запустить обучение машинному обучению в фоновом режиме?

#python-3.x #machine-learning #subprocess

Вопрос:

У меня есть функция в векторном классификаторе поддержки, которая работает в планировщике на облачной платформе Google. Эта функция извлекает новые данные, добавляет их к исходным данным, обучает модель новым данным и сохраняет их в облачном хранилище Google. Все это занимает 5 минут. Я не хочу ждать окончательного вывода, вместо этого я хочу запустить его в фоновом режиме и завершить процесс без ожидания.

Ниже приведена моя функция с комментариями:

 def train_model():
    users, tasks, tags, task_tags, task_user, boards = connect_postgres()  ##loading the data from a postgres function
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('my-bucket')
    blob = bucket.blob('original_data.pkl')
    pickle_in0 = blob.download_as_string()
    data = pickle.loads(pickle_in0)

    tasks = tasks.rename(columns={'id': 'task_id', 'name': 'task_name'})

    # Joining tasks and task_user_assigns tables
    tasks = tasks[tasks.task_name.isnull() == False]
    task_user = task_user[['id', 'task_id', 'user_id']].rename(columns={'id': 'task_user_id'})
    task_data = tasks.merge(task_user, on='task_id', how='left')

    # Joining users with the task_data
    users = users[['id', 'email']].rename(columns={'id': 'user_id'})
    users_tasks = task_data.merge(users, on='user_id', how='left')
    users_tasks = users_tasks[users_tasks.user_id.isnull() == False].reset_index(drop=True)

    # Joining boards table to user_tasks
    boards = boards[['id', 'name']].rename(columns={'id': 'board_id', 'name': 'board_name'})
    users_board = users_tasks.merge(boards, on='board_id', how='left').reset_index(drop=True)

    # Data Cleaning
    translator = Translator()  # This is to translate if the tasks are not in English
    users_board["task_trans"] = users_board["task_name"].map(lambda x: translator.translate(x, dest="en").text)

    users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_emoji(x))  #This calls a function to remove Emoticons from text
    users_board['task_trans'] = users_board['task_trans'].apply(lambda x: remove_punct(x))  #This calls a function to remove punctuations from text

    users_board = users_board[['task_id', 'email', 'board_id', 'user_id', 'task_trans']]

    data1 = pd.concat([data, users_board], axis=0)

    df1 = data1.copy

    X = df1.task_trans  #all the observations
    y = df1.user_id  #all the lables

    print(y.nunique())

    #FROM HERE ON, THE TRAINING SCRIPT BEGINS

    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(X)
    tf_transformer = TfidfTransformer().fit(X_train_counts)
    X_train_transformed = tf_transformer.transform(X_train_counts)

    print('model 1 done')

    labels = LabelEncoder()
    y_train_labels_fit = labels.fit(y)
    y_train_lables_trf = labels.transform(y)

    linear_svc = LinearSVC()
    clf = linear_svc.fit(X_train_transformed, y_train_lables_trf)

    print('model 2 done')

    calibrated_svc = CalibratedClassifierCV(base_estimator=linear_svc, cv="prefit")
    calibrated_svc.fit(X_train_transformed, y_train_lables_trf)

    print('model 3 done')

    # SAVING THE MODELS ON GOOGLE CLOUD STORAGE

    # storage_client = storage.Client()
    fs = gcsfs.GCSFileSystem(project='my-project')

    filename = '~path/svc.sav'
    pickle.dump(calibrated_svc, fs.open(filename, 'wb'))

    filename = '~path/count_vectorizer.sav'
    pickle.dump(count_vect, fs.open(filename, 'wb'))

    filename = '~path/tfidf_vectorizer.sav'
    pickle.dump(tf_transformer, fs.open(filename, 'wb'))

    blob = bucket.blob('original_data.pkl')
    pickle_out = pickle.dumps(df1)
    blob.upload_from_string(pickle_out)

    return "success"
 

Теперь я попытался сделать следующее:

 p = subprocess.Popen([sys.executable, '-c', train_model()], stdout=subprocess.PIPE, stderr=subprocess.STDOUT); print('finished')
 

Это также заняло столько же времени. Есть ли способ, которым я могу это решить?

Кроме того, если я хочу распечатать журналы python для этого процесса на стороне клиента, возможно ли это?

Комментарии:

1. train_model() вызывает функцию.

2. Да, я с этим не знаком. Как мне это исправить?

3. Правильный вызов подпроцесса будет выглядеть примерно так p = subprocess.Popen([sys.executable, '-c', 'from thisfile import train_model; train_model()']) (возможно, все еще stdin stdout где-то и перенаправлен).