Real-Time Programming: When Delay Is Not an Option
Real-Time Programming: When Milliseconds Are a Matter of Life or Death
In most ordinary software, if a response is delayed by half a second, nobody notices. But in industrial control, half a second could mean a robot arm overshooting its position by ten centimeters, or pressure in a pipe reaching a dangerous level. This is the world of Real-Time Programming -- where timing correctness is just as important as result correctness.
A real-time system does not necessarily mean a fast system -- it means a system with predictable response time. A motor that always responds within 10 milliseconds is better than one that responds in 1 millisecond sometimes and 100 milliseconds other times.
Hard vs. Soft Real-Time
Hard Real-Time
Missing a deadline = catastrophe. There is no room for delay, ever.
Examples:
- Anti-lock Braking System (ABS): must respond within 5ms, always
- Fuel injection timing in an engine: precise timing every cycle
- Safety system on industrial presses: immediate stop when a hand is detected
Soft Real-Time
Delay is annoying but not catastrophic -- the system continues to work at reduced performance.
Examples:
- Factory data display screen: a one-second delay is acceptable
- Data logging system: losing a single data point is not critical
- SCADA user interface: temporary slowness is tolerable
Firm Real-Time
A late result is useless but does not cause a catastrophe:
- Video frame processing in an industrial vision system: late frames are discarded
| Type | Missed Deadline | Industrial Example |
|---|---|---|
| Hard | Catastrophe / danger | Brakes, safety systems |
| Firm | Result discarded | Computer vision |
| Soft | Reduced performance | Monitoring screens |
Real-Time Operating System (RTOS)
A general-purpose OS (Windows, Linux) does not guarantee specific response times -- it might decide to run a virus scan update during a critical control operation. This is why we use an RTOS (Real-Time Operating System).
FreeRTOS: The Most Common in Industrial Applications
FreeRTOS is an open-source operating system that runs on microcontrollers (such as ESP32, STM32, Arduino). Here is what the code looks like:
#include "FreeRTOS.h"
#include "task.h"
// Pressure sensor task -- high priority
void vPressureTask(void *pvParameters) {
TickType_t xLastWakeTime = xTaskGetTickCount();
const TickType_t xPeriod = pdMS_TO_TICKS(10); // Every 10ms
for (;;) {
float pressure = read_pressure_sensor();
if (pressure > MAX_SAFE_PRESSURE) {
activate_emergency_relief_valve();
}
// Precise wait until next cycle
vTaskDelayUntil(&xLastWakeTime, xPeriod);
}
}
// Display task -- low priority
void vDisplayTask(void *pvParameters) {
for (;;) {
update_lcd_display();
vTaskDelay(pdMS_TO_TICKS(500)); // Every 500ms
}
}
int main(void) {
// Create tasks with priorities
xTaskCreate(vPressureTask, "Pressure", 256, NULL,
3, // High priority
NULL);
xTaskCreate(vDisplayTask, "Display", 512, NULL,
1, // Low priority
NULL);
// Start the scheduler
vTaskStartScheduler();
// We never reach here
for (;;);
}
The key point: the pressure task (priority 3) always preempts the display task (priority 1) when its time comes.
Priority Scheduling
Preemptive Scheduling
The scheduler pulls the processor from a lower-priority task and gives it to a higher-priority task immediately:
Time ──────────────────────────────────>
Safety task (priority 5): ██ ██
Control task (priority 3): ███ ████ ███
Display task (priority 1): ██ ███
^ ^
preempt resume
Priority Inversion
A famous problem that occurred on the Mars Pathfinder spacecraft in 1997:
The problem:
1. Low-priority task holds a shared resource (lock)
2. High-priority task needs the same resource -- waits
3. Medium-priority task preempts the low-priority task
4. Result: high-priority task waits for medium-priority task -- inversion!
The solution: Priority Inheritance -- the low-priority task temporarily inherits the priority of the waiting high-priority task:
// FreeRTOS supports Mutex with priority inheritance
SemaphoreHandle_t xMutex;
void setup() {
// Mutex with automatic priority inheritance
xMutex = xSemaphoreCreateMutex();
}
void vHighPriorityTask(void *pv) {
for (;;) {
// Request lock -- if held, holder's priority is boosted
xSemaphoreTake(xMutex, portMAX_DELAY);
access_shared_resource();
xSemaphoreGive(xMutex);
vTaskDelay(pdMS_TO_TICKS(10));
}
}
Task Synchronization
When multiple tasks work together, they must be coordinated to prevent conflicts.
Semaphores
SemaphoreHandle_t xDataReady;
void vSensorTask(void *pv) {
for (;;) {
read_all_sensors();
// Tell the control task that data is ready
xSemaphoreGive(xDataReady);
vTaskDelay(pdMS_TO_TICKS(10));
}
}
void vControlTask(void *pv) {
for (;;) {
// Wait until data is ready
if (xSemaphoreTake(xDataReady, pdMS_TO_TICKS(50))) {
compute_control_output();
} else {
// Timeout -- sensors did not respond!
handle_sensor_timeout();
}
}
}
Message Queues
QueueHandle_t xCommandQueue;
typedef struct {
uint8_t motor_id;
uint16_t target_rpm;
uint8_t direction;
} MotorCommand_t;
void vSupervisorTask(void *pv) {
MotorCommand_t cmd;
cmd.motor_id = 1;
cmd.target_rpm = 1200;
cmd.direction = FORWARD;
// Send command to motor control task
xQueueSend(xCommandQueue, &cmd, pdMS_TO_TICKS(100));
}
void vMotorControlTask(void *pv) {
MotorCommand_t received_cmd;
for (;;) {
if (xQueueReceive(xCommandQueue, &received_cmd, portMAX_DELAY)) {
set_motor_speed(received_cmd.motor_id,
received_cmd.target_rpm,
received_cmd.direction);
}
}
}
Determinism: The Number One Enemy Is Unpredictability
Determinism means the same inputs produce the same outputs in the same amount of time, always. To achieve it:
What to Avoid
// Bad: dynamic memory allocation -- time is unpredictable!
void bad_realtime_function() {
int *data = malloc(1000 * sizeof(int)); // May take 1us or 1ms
// ...
free(data);
}
// Good: static pre-allocation
static int data_buffer[1000]; // Reserved at program startup
void good_realtime_function() {
// Use the pre-allocated buffer
process_data(data_buffer, 1000);
}
Rules for Deterministic Code
| Do | Do Not |
|---|---|
| Static memory allocation at startup | malloc/free during runtime |
| Loops with known iteration count | while loops with unbounded conditions |
| Pre-computed lookup tables | Complex math at runtime (sin, sqrt) |
| Fixed-size circular buffers | Dynamic linked lists |
| Locks with maximum timeout | Infinite wait without timeout |
Latency Requirements for Control Systems
Every industrial system has different timing requirements:
| Application | Required Response Time | Type |
|---|---|---|
| Position control loop (CNC) | 0.1 - 1 ms | Hard |
| Motor speed control | 1 - 10 ms | Hard |
| Temperature sensor reading | 10 - 100 ms | Soft |
| Chemical process control | 100 ms - 1 s | Soft |
| Fire alarm system | < 50 ms | Hard |
| HMI screen update | 100 - 500 ms | Soft |
| Data transfer to SCADA | 1 - 5 s | Soft |
Calculating Worst-Case Execution Time (WCET)
// Every execution path must be shorter than the deadline
void control_cycle() {
// Reading: ~50us
float temp = read_temperature();
float pressure = read_pressure();
// Computation: ~100us
float output = pid_compute(temp, pressure);
// Writing: ~30us
write_analog_output(output);
// Worst-case total: ~180us
// Deadline: 1000us (1ms)
// Margin: 820us -- safe
}
Practical Example: Filling Line Control System
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "semphr.h"
#define FILL_CYCLE_MS 10
#define SAFETY_CYCLE_MS 5
#define DISPLAY_CYCLE_MS 200
SemaphoreHandle_t xSafetyOk;
QueueHandle_t xFillLevel;
// Safety task -- highest priority
void vSafetyTask(void *pv) {
TickType_t xLast = xTaskGetTickCount();
for (;;) {
bool door_closed = read_safety_door();
bool e_stop_ok = !read_emergency_stop();
if (door_closed && e_stop_ok) {
xSemaphoreGive(xSafetyOk);
} else {
// Stop everything immediately
disable_all_outputs();
}
vTaskDelayUntil(&xLast, pdMS_TO_TICKS(SAFETY_CYCLE_MS));
}
}
// Filling task -- medium priority
void vFillingTask(void *pv) {
TickType_t xLast = xTaskGetTickCount();
for (;;) {
if (xSemaphoreTake(xSafetyOk, 0) == pdTRUE) {
float level = read_level_sensor();
float valve_cmd = pid_fill_control(level, TARGET_LEVEL);
set_fill_valve(valve_cmd);
xQueueOverwrite(xFillLevel, &level);
}
vTaskDelayUntil(&xLast, pdMS_TO_TICKS(FILL_CYCLE_MS));
}
}
// Display task -- low priority
void vDisplayTask(void *pv) {
float level;
for (;;) {
if (xQueuePeek(xFillLevel, &level, pdMS_TO_TICKS(1000))) {
update_hmi_display(level);
}
vTaskDelay(pdMS_TO_TICKS(DISPLAY_CYCLE_MS));
}
}
Practical Tips for Real-Time Programmers
- Always measure -- do not assume code is fast enough; measure actual execution time
- Design for the worst case -- not the average, but the worst possible scenario
- Test under load -- the system may work fine when idle and fail under stress
- Use a watchdog timer -- if a task hangs, the system resets itself automatically
- Separate critical from non-critical tasks -- never mix safety code with UI code
Summary
Real-time programming is the backbone of every industrial control system:
- Hard vs. soft: define your timing requirements precisely
- RTOS: guarantees predictable responses through priority scheduling
- Task synchronization: semaphores and queues coordinate work between tasks
- Determinism: avoid anything that makes execution time unpredictable
- Latency requirements: every application has different needs -- know them and respect them
When you write code that controls a real machine, remember: correctness alone is not enough -- you must be correct at the right time.