#algorithm #architecture #embedded #scheduling #channel
#алгоритм #архитектура #встроенный #планирование #канал
Вопрос:
Мне нужно периодически запрашивать данные с настраиваемого количества устройств с настраиваемыми интервалами (для каждого устройства). Все устройства подключены к общей шине данных, поэтому только одно устройство может отправлять данные одновременно.
У устройств очень мало памяти, поэтому каждое устройство может хранить данные только в течение определенного периода времени, прежде чем они будут перезаписаны следующим фрагментом. Это означает, что мне нужно обязательно запрашивать данные с любого данного устройства, пока оно все еще доступно, иначе оно будет потеряно.
Я ищу алгоритм, который, учитывая список устройств и их соответствующие временные свойства, находит приемлемый график для достижения минимальной потери данных.
Я предполагаю, что каждое устройство может быть формально описано с использованием следующих свойств:
data_interval
: время, необходимое для того, чтобы следующий фрагмент данных стал доступным
max_request_interval
: максимальное время между запросами, которое не приведет к потере данных
processing_time
: время, необходимое для отправки запроса и полного получения соответствующего ответа, содержащего запрошенные данные
В принципе, мне нужно убедиться, что данные запрашиваются с каждого устройства, как только его данные готовы и еще не истекли, при этом не забывая о крайних сроках для всех других устройств.
Существует ли какой-то алгоритм для решения подобных проблем? Я очень сомневаюсь, что я первый человек, который когда-либо сталкивался с подобной ситуацией. Поиск существующих решений в Интернете не дал много полезных результатов, главным образом потому, что алгоритмы планирования в основном используются для операционных систем и тому подобного, где запланированные процессы могут быть приостановлены и возобновлены по желанию. Однако я не могу этого сделать в моем случае, поскольку процесс запроса и получения фрагмента данных является атомарным, то есть он может выполняться только полностью или вообще не выполняться.
Комментарии:
1. Что вы пробовали / исследовали до сих пор? Поделитесь своими идеями / выводами / кодом.
2. Я изучил различные алгоритмы планирования, например, сначала самый ранний крайний срок (EDF), циклический перебор, скорость монотонная, сначала наименьшая слабость и т. Д., Но я не нашел их подходящими для рассматриваемой проблемы.
Ответ №1:
Я решил эту проблему, используя монотонное планирование без упреждающего дедлайна.
Вот некоторый код python для всех, кто заинтересован:
"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""
from math import gcd
from functools import reduce
from typing import List
class Slave:
def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
self.name = name
self.period = int(period)
self.processing_time = int(processing_time)
self.offset = int(offset)
if self.offset >= self.period:
raise ValueError("Slave %s: offset must be < period" % name)
self.deadline = int(deadline) if deadline else self.period
if self.deadline > self.period:
raise ValueError("Slave %s: deadline must be <= period" % name)
class Request:
def __init__(self, slave: Slave, start_time: int):
self.slave = slave
self.start_time = start_time
self.end_time = start_time slave.processing_time
self.duration = self.end_time - self.start_time
def overlaps_with(self, other: 'Request'):
min_duration = self.duration other.duration
start = min(other.start_time, self.start_time)
end = max(other.end_time, self.end_time)
effective_duration = end - start
return effective_duration < min_duration
class Scenario:
def __init__(self, *slaves: Slave):
self.slaves = list(slaves)
self.slaves.sort(key=lambda slave: slave.deadline)
# LCM of all slave periods
self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])
def compute_schedule(self, resolution=1) -> 'Schedule':
request_pool = []
for t in range(0, self.cycle_period, resolution):
for slave in self.slaves:
if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
request_pool.append(Request(slave, t))
request_pool.reverse()
scheduled_requests = []
current_request = request_pool.pop()
t = current_request.start_time
while t < self.cycle_period:
ongoing_request = Request(current_request.slave, t)
while ongoing_request.start_time <= t < ongoing_request.end_time:
t = resolution
scheduled_requests.append(ongoing_request)
if len(request_pool):
current_request = request_pool.pop()
t = max(current_request.start_time, t)
else:
current_request = None
break
if current_request:
request_pool.append(current_request)
return Schedule(self, scheduled_requests, request_pool)
class Schedule:
def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
self.scenario = scenario
self.requests = requests
self.unscheduled_requests = unscheduled if unscheduled else []
self._utilization = 0
for slave in self.scenario.slaves:
self._utilization = float(slave.processing_time) / float(slave.period)
self._missed_deadlines_dict = {}
for slave in self.scenario.slaves:
periods = scenario.cycle_period // slave.period
missed_deadlines = []
for period in range(periods):
start = period * slave.period
end = start slave.period
request = self._find_request(slave, start, end)
if request:
if request.start_time < (start slave.offset) or request.end_time > start slave.deadline:
missed_deadlines.append(request)
if missed_deadlines:
self._missed_deadlines_dict[slave] = missed_deadlines
self._overlapping_requests = []
for i in range(0, len(requests)):
if i == 0:
continue
previous_request = requests[i - 1]
current_request = requests[i]
if current_request.overlaps_with(previous_request):
self._overlapping_requests.append((current_request, previous_request))
self._incomplete_requests = []
for request in self.requests:
if request.duration < request.slave.processing_time:
self._incomplete_requests.append(request)
@property
def is_feasible(self) -> bool:
return self.utilization <= 1
and not self.has_missed_deadlines
and not self.has_overlapping_requests
and not self.has_unscheduled_requests
and not self.has_incomplete_requests
@property
def utilization(self) -> float:
return self._utilization
@property
def has_missed_deadlines(self) -> bool:
return len(self._missed_deadlines_dict) > 0
@property
def has_overlapping_requests(self) -> bool:
return len(self._overlapping_requests) > 0
@property
def has_unscheduled_requests(self) -> bool:
return len(self.unscheduled_requests) > 0
@property
def has_incomplete_requests(self) -> bool:
return len(self._incomplete_requests) > 0
def _find_request(self, slave, start, end) -> [Request, None]:
for r in self.requests:
if r.slave == slave and r.start_time >= start and r.end_time < end:
return r
return None
def read_scenario(file) -> Scenario:
from csv import DictReader
return Scenario(*[Slave(**row) for row in DictReader(file)])
def write_schedule(schedule: Schedule, file):
from csv import DictWriter
writer = DictWriter(file, fieldnames=["name", "start", "end"])
writer.writeheader()
for request in schedule.requests:
writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})
if __name__ == '__main__':
import argparse
import sys
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Use non-preemptive deadline monotonic scheduling (NPDMS) ton'
'compute a schedule of periodic, non-preemptable requests ton'
'slave devices connected to a shared data bus.nn'
'Prints the computed schedule to stdout as CSV. Returns withn'
'exit code 0 if the schedule is feasible, else 1.')
parser.add_argument("csv_file", metavar="SCENARIO", type=str,
help="A csv file describing the scenario, i.e. a listn"
"of slave devices with the following properties:n"
"* name: name/id of the slave devicenn"
"* period: duration of the period of time duringn"
" which requests must be dispatchednn"
"* processing_time: amount of time it takes ton"
" fully process a request (worst-case)nn"
"* offset: offset for initial phase-shiftingn"
" (default: 0)nn"
"* deadline: amount of time during which data isn"
" available after the start of each periodn"
" (default: <period>)")
parser.add_argument("-r", "--resolution", type=int, default=1,
help="The resolution used to simulate the passage of time (default: 1)")
args = parser.parse_args()
with open(args.csv_file, 'r') as f:
schedule = read_scenario(f).compute_schedule(args.resolution)
write_schedule(schedule, sys.stdout)
exit(0 if schedule.is_feasible else 1)