Уничтожение дочернего потока, выполняющего многопроцессорную операцию

#python #multithreading #image-processing #multiprocessing

#python #многопоточность #обработка изображений #многопроцессорная обработка

Вопрос:

Я новичок в многопроцессорной обработке. Я разрабатывал приложение, в котором пользователь может выбрать видео, а приложение выполнит распознавание лиц и покажет его. Я использовал concurrent.futures.ProcessPoolExecutor для многопроцессорной обработки кадров для увеличения общего требуемого времени. Моя проблема в том, что когда мне нужно интегрировать многопроцессорную часть распознавания лиц в приложение, мне нужно будет запустить все это в отдельном потоке, чтобы оно оставалось отзывчивым. Обычно, когда выполняется автономная многопроцессорная часть, ее можно остановить, создав исключение. Но при выполнении в отдельном потоке у них есть свой собственный стек. Я видел людей, предлагающих использовать сообщение для проверки, должен ли код продолжать выполняться или нет. Но моя вызывающая функция не выполняется в «цикле while», чтобы проверить это. Все, что я прошу, это, если есть какой-либо способ, которым я могу вызвать исключение, чтобы остановить оба потока или любой другой способ остановить запуск этого дочернего потока из основного потока. Мой код прилагается ниже. Sub() — это вызывающая функция, а process() — это функция, которая используется в качестве цели многопроцессорной обработки. Заранее спасибо.

 import cv2
import face_recognition
import numpy as np
import time
import pickle
import concurrent.futures
import queue
import file_video_stream


def process(sframe, facesencoded, knownfacenames,process_this_frame):
    
    if sframe is  not None :
        if process_this_frame:
            
            small_frame = cv2.resize(sframe, (0, 0), fx=0.5, fy=0.5)
            rgb_small_frame = small_frame[:, :, ::-1] 
            face_locations = face_recognition.face_locations(rgb_small_frame)   
            unknown_face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
            face_names = []
                
            for face_encoding in unknown_face_encodings:
                
                matches = face_recognition.compare_faces(facesencoded, face_encoding)
                name = "Unknown"
                face_distances = face_recognition.face_distance(facesencoded, face_encoding)
                best_match_index = np.argmin(face_distances)
                if matches[best_match_index]:
                    name = knownfacenames[best_match_index]

                face_names.append(name)
                
                for (top, right, bottom, left), name in zip(face_locations, face_names):
                # Scale back up face locations since the frame we detected in was scaled to 1/4 size
                    top *= 2
                    right *= 2
                    bottom *= 2
                    left *= 2

                    # Draw a box around the face
                    cv2.rectangle(sframe, (left-20, top-20), (right 20, bottom 20), (255, 0, 0), 2)

                    # Draw a label with a name below the face
                    cv2.rectangle(sframe, (left-20, bottom -15), (right 20, bottom 20), (255, 0, 0), cv2.FILLED)
                    font = cv2.FONT_HERSHEY_DUPLEX
                    cv2.putText(sframe, name, (left -20, bottom   15), font, 1.0, (255, 255, 255), 2)
            show_frame = cv2.resize(sframe, (1080,720))
            return show_frame
        else :
            show_frame = cv2.resize(sframe, (1080,720))
            return show_frame


def sub():
    
    list_original = []
    frame_number = 0
    alt_frame_number = 0
    proc_frame = 0
    q1= queue.Queue()
    with open("known.pickle", 'rb') as ki:
        faces = pickle.load(ki)

    faces_encoded = list(faces.values())
    known_face_names = list(faces.keys())

    fvs = file_video_stream.FileVideoStream('Attendance.mp4')
    fvs.start()
    time.sleep(1.0)
    process_this_frame = True
    with concurrent.futures.ProcessPoolExecutor() as executor:
        while fvs.more():
        #for _ in range (5):
            frame = fvs.read()
            time.sleep(0.1)
            list_element = executor.submit(process,frame,faces_encoded,known_face_names,process_this_frame)
            time.sleep(0.1)
            
            if list_element is  not None:
                frame_number  =1
                #print (frame_number)                    
                list_original.append(list_element)

            else :
                fvs.stop()
                break
            process_this_frame = not process_this_frame

    print ("Total number of frames read:",frame_number)
    #print ("Total number of frames processed:",alt_frame_number) 

    fvs.stop()    
    for res in list_original:
        q1.put(res.result())
               
    while not q1.empty():
        dump = q1.get()
        cv2.imshow('Video', dump)
        time.sleep(0.01)
        # Hit 'q' on the keyboard to quit!
        if cv2.waitKey(1) amp; 0xFF == ord('q'):
            break

    cv2.destroyAllWindows()
if __name__ == '__main__':
    sub()    
  

Комментарии:

1. Вы пишете код для потока. В чем сложность заставить этот код выполнять только ту работу, которую вы хотите, чтобы он выполнял? Забудьте о том факте, что задействованы потоки — это просто вопрос того, чтобы заставить код не выполнять работу, которую вы не хотите, чтобы он выполнял. Вы пишете код, чтобы заставить поток выполнять работу. Вы знаете, как выглядит логика определения того, какая работа должна быть выполнена. В чем проблема? Забудьте о каком-то особом «потоковом» способе сделать это и просто сделайте это.

2. Вы общаетесь с другим процессом через очередь. Проверьте: docs.python.org/3/library/multiprocessing.html Итак, когда вы хотите остановить другой процесс, скажем, при получении исключения, вы отправляете сообщение (скажем, «выйти») в очередь. Другой процесс должен периодически прослушивать очередь и завершать работу после получения этого сообщения.