作業系統概論:行程與執行緒
當你打開電腦,同時瀏覽網頁、聽音樂、寫程式——你以為這些程式是「同時」在運作嗎?
其實,對單核心 CPU 來說,它一次只能做一件事。但它切換得非常快,讓你感覺像是同時進行。這就是 作業系統 (OS) 的核心工作之一:行程管理。
🔥 Vibe Prompt
「用 Python 模擬作業系統的行程排程:實作 FIFO、SJF、Round Robin 三種演算法,每個行程有 arrival time 和 burst time,輸出完整執行順序與等待時間統計。」
什麼是行程 (Process)?
行程 就是一個正在執行的程式實例。當你雙擊 Chrome 圖示時,作業系統會:
- 將程式的程式碼從硬碟載入到記憶體
- 分配一個獨立的記憶體空間
- 建立一個「行程控制塊 (PCB)」來記錄它的狀態
每個行程擁有獨立的記憶體空間,彼此不會互相干擾。
import os
import time
print(f"目前行程 ID (PID): {os.getpid()}")
print(f"父行程 ID (PPID): {os.getppid()}")
行程 vs 執行緒 (Thread)
| 特性 | 行程 (Process) | 執行緒 (Thread) | |------|---------------|----------------| | 記憶體空間 | 獨立(隔離) | 共享同一行程的記憶體 | | 建立成本 | 高(需要完整複製空間) | 低(輕量級) | | 通訊方式 | IPC (pipe, socket) | 直接讀寫共享變數 | | 穩定性 | 一個掛了不影響其他 | 一個掛了可能整組崩潰 | | 使用場景 | 隔離性要求高的應用 | 需要高效率並行的任務 |
生活比喻
- 行程 = 不同的餐廳。每間餐廳有自己的廚房、食材、員工。一家餐廳火災不會影響隔壁。
- 執行緒 = 同一間餐廳裡的多個廚師。他們共用同一個廚房、食材庫存。協作效率高,但搞砸了會一起遭殃。
import threading
import time
shared_counter = 0
lock = threading.Lock()
def chef_task(chef_id):
global shared_counter
for _ in range(5):
with lock:
shared_counter += 1
print(f"👨🍳 廚師 {chef_id} 做了第 {shared_counter} 道菜")
time.sleep(0.1)
# 建立 3 個執行緒(廚師)
threads = []
for i in range(3):
t = threading.Thread(target=chef_task, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"\n✅ 總共完成 {shared_counter} 道菜")
CPU 排程演算法
當多個行程都在等待執行時,作業系統需要決定誰先誰後。這就是 CPU 排程。
1. FIFO (First In, First Out)
先來的先執行,非常公平,但短任務會被長任務阻塞。
def fifo_scheduling(processes):
"""
FIFO 排程
processes: [(name, arrival_time, burst_time), ...]
"""
processes.sort(key=lambda p: p[1]) # 依照到達時間排序
time = 0
schedule = []
for name, arrival, burst in processes:
if time < arrival:
time = arrival
schedule.append((name, time, time + burst))
time += burst
return schedule
# 測試
processes = [
("P1", 0, 5),
("P2", 1, 3),
("P3", 2, 8),
("P4", 3, 2),
]
result = fifo_scheduling(processes)
for name, start, end in result:
print(f"{name}: {start}~{end} (執行 {end-start}ms)")
2. SJF (Shortest Job First)
最短的任務先執行,能有效減少平均等待時間。
def sjf_scheduling(processes):
"""
SJF 排程:最短作業優先
"""
processes = sorted(processes, key=lambda p: p[1]) # 依 arrival time
n = len(processes)
completed = [False] * n
time = 0
schedule = []
for _ in range(n):
# 找出已到達且 burst time 最短的行程
available = [(i, p) for i, p in enumerate(processes)
if not completed[i] and p[1] <= time]
if not available:
time += 1
continue
idx = min(available, key=lambda x: x[0][2])[0] # 最短 burst
name, arrival, burst = processes[idx]
schedule.append((name, time, time + burst))
time += burst
completed[idx] = True
return schedule
result = sjf_scheduling(processes)
for name, start, end in result:
print(f"{name}: {start}~{end} (執行 {end-start}ms)")
3. Round Robin (時間片輪轉)
每個行程輪流執行一個固定的「時間片 (time quantum)」。
def round_robin(processes, time_quantum=2):
"""
Round Robin 排程
"""
queue = []
remaining = {p[0]: p[2] for p in processes} # 剩餘執行時間
arrival_map = {p[0]: p[1] for p in processes}
time = 0
schedule = []
ready_queue = []
# 初始載入已到達行程
for p in processes:
if p[1] <= time:
ready_queue.append(p[0])
while remaining:
if not ready_queue:
time += 1
# 檢查新到達的行程
for name, arrival, _ in processes:
if arrival == time and name in remaining:
ready_queue.append(name)
continue
name = ready_queue.pop(0)
execute = min(time_quantum, remaining[name])
schedule.append((name, time, time + execute))
time += execute
remaining[name] -= execute
if remaining[name] <= 0:
del remaining[name]
else:
ready_queue.append(name)
# 加入新到達的行程
for p_name, arrival, _ in processes:
if p_name in remaining and p_name not in ready_queue:
start_time = schedule[-1][2]
if start_time < arrival <= time:
ready_queue.append(p_name)
return schedule
result = round_robin(processes, time_quantum=2)
for name, start, end in result:
print(f"{name}: {start}~{end}")
行程狀態轉換
一個行程在生命週期中會經歷以下狀態:
New → Ready → Running → Terminated
↕ ↓
Waiting ← I/O 或事件等待
- New: 剛被建立
- Ready: 準備好執行,等待 CPU 分配
- Running: 正在 CPU 上執行
- Waiting: 等待 I/O 或某個事件
- Terminated: 執行完畢
上下文切換 (Context Switch)
當 CPU 從一個行程切換到另一個行程時,必須:
- 儲存:將目前行程的 CPU 暫存器、程式計數器 (PC)、記憶體映射等狀態儲存到 PCB
- 載入:從下一個行程的 PCB 中讀取之前儲存的狀態,恢復到 CPU 暫存器
這個動作稱為 上下文切換。
| 切換對象 | 開銷 | 說名 | |----------|------|------| | 同行程的執行緒切換 | 約 1~10 µs | 只需切換暫存器,記憶體映射不變 | | 不同行程的切換 | 約 10~100 µs | 需要切換記憶體映射(TLB 刷新) |
⚠️ 效能警示:如果上下文切換太頻繁,CPU 可能會花 30~50% 的時間在切換而不是真正執行程式。因此現代 OS 會動態調整時間片大小來平衡。
行程間通訊 (IPC)
既然行程彼此隔離,它們要如何交換資料?這就是 行程間通訊 (Inter-Process Communication, IPC) 的工作。
| IPC 方式 | 說明 | 適用場景 | |----------|------|---------| | Pipe (管線) | 單向資料流,一個行程寫入,另一個讀取 | 父子行程間通訊 | | Signal (信號) | 非同步事件通知,如 SIGINT、SIGKILL | 行程控制與中斷 | | Socket (插座) | 雙向通訊,可用於網路或本地 | 跨機器通訊 | | Shared Memory | 共享記憶體區塊,速度最快 | 大量資料交換 | | Message Queue | 訊息佇列,非同步傳送與接收 | 解耦合的通訊模式 |
# Pipe 範例:父子行程通訊
import multiprocessing
def child_process(conn):
"""子行程:接收資料並回應"""
msg = conn.recv()
print(f"📨 子行程收到: {msg}")
conn.send(f"收到 '{msg}',謝謝!")
conn.close()
# 建立 Pipe
parent_conn, child_conn = multiprocessing.Pipe()
# 建立子行程
child = multiprocessing.Process(target=child_process, args=(child_conn,))
child.start()
# 父行程發送資料
parent_conn.send("Hello from parent!")
response = parent_conn.recv()
print(f"📩 父行程收到回覆: {response}")
child.join()
執行緒同步與競賽條件
當多個執行緒共享同一份資料時,就會產生 競賽條件 (Race Condition)。
# ❌ 危險:沒有同步的多執行緒
import threading
counter = 0
def increment_bad():
global counter
for _ in range(100000):
# 這三行不是原子操作!
temp = counter # 讀取
temp = temp + 1 # 計算
counter = temp # 寫回
threads = []
for _ in range(2):
t = threading.Thread(target=increment_bad)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"期望值: 200000")
print(f"實際值: {counter}")
print(f"差異: {200000 - counter}")
解決方案:鎖 (Lock)
# ✅ 正確:使用 Lock 保護共享資源
counter_safe = 0
lock = threading.Lock()
def increment_safe():
global counter_safe
for _ in range(100000):
with lock: # 確保一次只有一個執行緒能進入
temp = counter_safe
temp = temp + 1
counter_safe = temp
threads = []
for _ in range(2):
t = threading.Thread(target=increment_safe)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"使用 Lock 後的實際值: {counter_safe}")
死結 (Deadlock)
當兩個執行緒各自持有對方需要的資源時,就會發生 死結。
執行緒 A 持有 Lock 1,等待 Lock 2
執行緒 B 持有 Lock 2,等待 Lock 1
→ 兩個執行緒永遠等待下去
死結發生的四個必要條件:\n1. 互斥 (Mutual Exclusion):資源一次只能被一個執行緒使用\n2. 持有並等待 (Hold and Wait):持有資源的同時等待其他資源\n3. 不可搶佔 (No Preemption):資源不能被強制取走\n4. 循環等待 (Circular Wait):形成等待循環\n\n預防死結最簡單的方法:固定資源取得順序。\n\npython\n# ✅ 避免死結:所有執行緒以相同順序取得鎖\nlock1 = threading.Lock()\nlock2 = threading.Lock()\n\ndef worker():\n with lock1: # 先拿 lock1\n with lock2: # 再拿 lock2\n print("安全操作共享資源")\n\n\n## 多核心與平行處理\n\n現代 CPU 都有多個核心,這讓行程與執行緒的運作方式有了新的維度:\n\n| 概念 | 單核心 | 多核心 |\n|------|--------|--------|\n| 並發 (Concurrency) | 快速切換,看起來同時 | 可以真正同時執行 |\n| 平行 (Parallelism) | 不可能真正平行 | 多個執行緒在不同核心上同時執行 |\n| 效能提升 | 來自減少閒置 | 來自核心數量的線性增長 |\n\npython\n# 檢查你的 CPU 核心數\nimport os\nprint(f"CPU 核心數: {os.cpu_count()}")\n\n# 如果是 8 核心,理論上可以讓 8 個執行緒「真正同時」執行\n\n\n## 本章總結\n\n| 概念 | 核心重點 |\n|------|---------|\n| 行程 (Process) | 獨立記憶體空間,重量級,適合隔離性高的任務 |\n| 執行緒 (Thread) | 共享記憶體,輕量級,適合並行計算 |\n| PCB | 作業系統管理行程的核心資料結構 |\n| FIFO | 先來先做,簡單但短任務可能被阻塞 |\n| SJF | 最短先做,平均等待最少,但可能造成飢餓 |\n| Round Robin | 輪流執行,互動性最好,時間片大小是關鍵 |\n| IPC | 行程間通訊的五種方式:Pipe、Signal、Socket、Shared Memory、Message Queue |\n| 同步 | Lock、Semaphore 等機制防止競賽條件 |\n| 死結 | 四個必要條件,固定資源順序是最簡單的預防法 |\n\n## 實戰練習\n\n> 💡 Vibe Coding 練習:請 AI 幫你建立一個完整的「行程管理互動式學習工具」:\n> 1. 行程排程模擬器:輸入多個行程,選擇 FIFO / SJF / Round Robin,輸出甘特圖與統計比較\n> 2. IPC 展示工具:可視化展示 Pipe、Shared Memory、Message Queue 的資料流動\n> 3. 競賽條件模擬器:展示有 Lock 和沒有 Lock 的差異,用圖表呈現錯誤率\n> 4. 死結偵測小遊戲:給定資源分配圖,判斷是否會發生死結\n> 5. 多核心效能比較:模擬不同核心數(1/2/4/8)對相同工作量的影響\n> 用 Streamlit 或 HTML + JavaScript 打造漂亮互動介面