Планирование периодических запросов к нескольким устройствам с использованием общего канала

#algorithm #architecture #embedded #scheduling #channel

#алгоритм #архитектура #встроенный #планирование #канал

Вопрос:

Мне нужно периодически запрашивать данные с настраиваемого количества устройств с настраиваемыми интервалами (для каждого устройства). Все устройства подключены к общей шине данных, поэтому только одно устройство может отправлять данные одновременно.

У устройств очень мало памяти, поэтому каждое устройство может хранить данные только в течение определенного периода времени, прежде чем они будут перезаписаны следующим фрагментом. Это означает, что мне нужно обязательно запрашивать данные с любого данного устройства, пока оно все еще доступно, иначе оно будет потеряно.

Я ищу алгоритм, который, учитывая список устройств и их соответствующие временные свойства, находит приемлемый график для достижения минимальной потери данных.

Я предполагаю, что каждое устройство может быть формально описано с использованием следующих свойств:

data_interval : время, необходимое для того, чтобы следующий фрагмент данных стал доступным

max_request_interval : максимальное время между запросами, которое не приведет к потере данных

processing_time : время, необходимое для отправки запроса и полного получения соответствующего ответа, содержащего запрошенные данные

В принципе, мне нужно убедиться, что данные запрашиваются с каждого устройства, как только его данные готовы и еще не истекли, при этом не забывая о крайних сроках для всех других устройств.

Существует ли какой-то алгоритм для решения подобных проблем? Я очень сомневаюсь, что я первый человек, который когда-либо сталкивался с подобной ситуацией. Поиск существующих решений в Интернете не дал много полезных результатов, главным образом потому, что алгоритмы планирования в основном используются для операционных систем и тому подобного, где запланированные процессы могут быть приостановлены и возобновлены по желанию. Однако я не могу этого сделать в моем случае, поскольку процесс запроса и получения фрагмента данных является атомарным, то есть он может выполняться только полностью или вообще не выполняться.

Комментарии:

1. Что вы пробовали / исследовали до сих пор? Поделитесь своими идеями / выводами / кодом.

2. Я изучил различные алгоритмы планирования, например, сначала самый ранний крайний срок (EDF), циклический перебор, скорость монотонная, сначала наименьшая слабость и т. Д., Но я не нашел их подходящими для рассматриваемой проблемы.

Ответ №1:

Я решил эту проблему, используя монотонное планирование без упреждающего дедлайна.

Вот некоторый код python для всех, кто заинтересован:

 """This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""

from math import gcd
from functools import reduce
from typing import List


class Slave:

    def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
        self.name = name
        self.period = int(period)
        self.processing_time = int(processing_time)
        self.offset = int(offset)
        if self.offset >= self.period:
            raise ValueError("Slave %s: offset must be < period" % name)
        self.deadline = int(deadline) if deadline else self.period
        if self.deadline > self.period:
            raise ValueError("Slave %s: deadline must be <= period" % name)


class Request:

    def __init__(self, slave: Slave, start_time: int):
        self.slave = slave
        self.start_time = start_time
        self.end_time = start_time   slave.processing_time
        self.duration = self.end_time - self.start_time

    def overlaps_with(self, other: 'Request'):
        min_duration = self.duration   other.duration
        start = min(other.start_time, self.start_time)
        end = max(other.end_time, self.end_time)
        effective_duration = end - start
        return effective_duration < min_duration


class Scenario:

    def __init__(self, *slaves: Slave):
        self.slaves = list(slaves)
        self.slaves.sort(key=lambda slave: slave.deadline)
        # LCM of all slave periods
        self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])

    def compute_schedule(self, resolution=1) -> 'Schedule':
        request_pool = []
        for t in range(0, self.cycle_period, resolution):
            for slave in self.slaves:
                if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
                    request_pool.append(Request(slave, t))
        request_pool.reverse()

        scheduled_requests = []
        current_request = request_pool.pop()
        t = current_request.start_time
        while t < self.cycle_period:
            ongoing_request = Request(current_request.slave, t)
            while ongoing_request.start_time <= t < ongoing_request.end_time:
                t  = resolution
            scheduled_requests.append(ongoing_request)
            if len(request_pool):
                current_request = request_pool.pop()
                t = max(current_request.start_time, t)
            else:
                current_request = None
                break

        if current_request:
            request_pool.append(current_request)

        return Schedule(self, scheduled_requests, request_pool)


class Schedule:

    def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
        self.scenario = scenario
        self.requests = requests
        self.unscheduled_requests = unscheduled if unscheduled else []

        self._utilization = 0
        for slave in self.scenario.slaves:
            self._utilization  = float(slave.processing_time) / float(slave.period)

        self._missed_deadlines_dict = {}
        for slave in self.scenario.slaves:
            periods = scenario.cycle_period // slave.period
            missed_deadlines = []
            for period in range(periods):
                start = period * slave.period
                end = start   slave.period
                request = self._find_request(slave, start, end)
                if request:
                    if request.start_time < (start   slave.offset) or request.end_time > start   slave.deadline:
                        missed_deadlines.append(request)
            if missed_deadlines:
                self._missed_deadlines_dict[slave] = missed_deadlines

        self._overlapping_requests = []
        for i in range(0, len(requests)):
            if i == 0:
                continue
            previous_request = requests[i - 1]
            current_request = requests[i]
            if current_request.overlaps_with(previous_request):
                self._overlapping_requests.append((current_request, previous_request))

        self._incomplete_requests = []
        for request in self.requests:
            if request.duration < request.slave.processing_time:
                self._incomplete_requests.append(request)

    @property
    def is_feasible(self) -> bool:
        return self.utilization <= 1 
               and not self.has_missed_deadlines 
               and not self.has_overlapping_requests 
               and not self.has_unscheduled_requests 
               and not self.has_incomplete_requests

    @property
    def utilization(self) -> float:
        return self._utilization

    @property
    def has_missed_deadlines(self) -> bool:
        return len(self._missed_deadlines_dict) > 0

    @property
    def has_overlapping_requests(self) -> bool:
        return len(self._overlapping_requests) > 0

    @property
    def has_unscheduled_requests(self) -> bool:
        return len(self.unscheduled_requests) > 0

    @property
    def has_incomplete_requests(self) -> bool:
        return len(self._incomplete_requests) > 0

    def _find_request(self, slave, start, end) -> [Request, None]:
        for r in self.requests:
            if r.slave == slave and r.start_time >= start and r.end_time < end:
                return r
        return None


def read_scenario(file) -> Scenario:
    from csv import DictReader
    return Scenario(*[Slave(**row) for row in DictReader(file)])


def write_schedule(schedule: Schedule, file):
    from csv import DictWriter
    writer = DictWriter(file, fieldnames=["name", "start", "end"])
    writer.writeheader()
    for request in schedule.requests:
        writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})


if __name__ == '__main__':
    import argparse
    import sys

    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
                                     description='Use non-preemptive deadline monotonic scheduling (NPDMS) ton'
                                                 'compute a schedule of periodic, non-preemptable requests ton'
                                                 'slave devices connected to a shared data bus.nn'
                                                 'Prints the computed schedule to stdout as CSV. Returns withn'
                                                 'exit code 0 if the schedule is feasible, else 1.')
    parser.add_argument("csv_file", metavar="SCENARIO", type=str,
                        help="A csv file describing the scenario, i.e. a listn"
                             "of slave devices with the following properties:n"
                             "* name:            name/id of the slave devicenn"
                             "* period:          duration of the period of time duringn"
                             "                   which requests must be dispatchednn"
                             "* processing_time: amount of time it takes ton"
                             "                   fully process a request (worst-case)nn"
                             "* offset:          offset for initial phase-shiftingn"
                             "                   (default: 0)nn"
                             "* deadline:        amount of time during which data isn"
                             "                   available after the start of each periodn"
                             "                   (default: <period>)")

    parser.add_argument("-r", "--resolution", type=int, default=1,
                        help="The resolution used to simulate the passage of time (default: 1)")

    args = parser.parse_args()

    with open(args.csv_file, 'r') as f:
        schedule = read_scenario(f).compute_schedule(args.resolution)
        write_schedule(schedule, sys.stdout)
        exit(0 if schedule.is_feasible else 1)