作業系統概論:行程與執行緒

當你打開電腦,同時瀏覽網頁、聽音樂、寫程式——你以為這些程式是「同時」在運作嗎?

其實,對單核心 CPU 來說,它一次只能做一件事。但它切換得非常快,讓你感覺像是同時進行。這就是 作業系統 (OS) 的核心工作之一:行程管理

🔥 Vibe Prompt

「用 Python 模擬作業系統的行程排程:實作 FIFO、SJF、Round Robin 三種演算法,每個行程有 arrival time 和 burst time,輸出完整執行順序與等待時間統計。」

什麼是行程 (Process)?

行程 就是一個正在執行的程式實例。當你雙擊 Chrome 圖示時,作業系統會:

  1. 將程式的程式碼從硬碟載入到記憶體
  2. 分配一個獨立的記憶體空間
  3. 建立一個「行程控制塊 (PCB)」來記錄它的狀態

每個行程擁有獨立的記憶體空間,彼此不會互相干擾。

import os
import time

print(f"目前行程 ID (PID): {os.getpid()}")
print(f"父行程 ID (PPID): {os.getppid()}")

行程 vs 執行緒 (Thread)

| 特性 | 行程 (Process) | 執行緒 (Thread) | |------|---------------|----------------| | 記憶體空間 | 獨立(隔離) | 共享同一行程的記憶體 | | 建立成本 | 高(需要完整複製空間) | 低(輕量級) | | 通訊方式 | IPC (pipe, socket) | 直接讀寫共享變數 | | 穩定性 | 一個掛了不影響其他 | 一個掛了可能整組崩潰 | | 使用場景 | 隔離性要求高的應用 | 需要高效率並行的任務 |

生活比喻

  • 行程 = 不同的餐廳。每間餐廳有自己的廚房、食材、員工。一家餐廳火災不會影響隔壁。
  • 執行緒 = 同一間餐廳裡的多個廚師。他們共用同一個廚房、食材庫存。協作效率高,但搞砸了會一起遭殃。
import threading
import time

shared_counter = 0
lock = threading.Lock()

def chef_task(chef_id):
    global shared_counter
    for _ in range(5):
        with lock:
            shared_counter += 1
            print(f"👨‍🍳 廚師 {chef_id} 做了第 {shared_counter} 道菜")
        time.sleep(0.1)

# 建立 3 個執行緒(廚師)
threads = []
for i in range(3):
    t = threading.Thread(target=chef_task, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"\n✅ 總共完成 {shared_counter} 道菜")

CPU 排程演算法

當多個行程都在等待執行時,作業系統需要決定誰先誰後。這就是 CPU 排程。

1. FIFO (First In, First Out)

先來的先執行,非常公平,但短任務會被長任務阻塞。

def fifo_scheduling(processes):
    """
    FIFO 排程
    processes: [(name, arrival_time, burst_time), ...]
    """
    processes.sort(key=lambda p: p[1])  # 依照到達時間排序
    time = 0
    schedule = []
    
    for name, arrival, burst in processes:
        if time < arrival:
            time = arrival
        schedule.append((name, time, time + burst))
        time += burst
    
    return schedule

# 測試
processes = [
    ("P1", 0, 5),
    ("P2", 1, 3),
    ("P3", 2, 8),
    ("P4", 3, 2),
]

result = fifo_scheduling(processes)
for name, start, end in result:
    print(f"{name}: {start}~{end} (執行 {end-start}ms)")

2. SJF (Shortest Job First)

最短的任務先執行,能有效減少平均等待時間。

def sjf_scheduling(processes):
    """
    SJF 排程:最短作業優先
    """
    processes = sorted(processes, key=lambda p: p[1])  # 依 arrival time
    n = len(processes)
    completed = [False] * n
    time = 0
    schedule = []
    
    for _ in range(n):
        # 找出已到達且 burst time 最短的行程
        available = [(i, p) for i, p in enumerate(processes) 
                     if not completed[i] and p[1] <= time]
        if not available:
            time += 1
            continue
        
        idx = min(available, key=lambda x: x[0][2])[0]  # 最短 burst
        name, arrival, burst = processes[idx]
        schedule.append((name, time, time + burst))
        time += burst
        completed[idx] = True
    
    return schedule

result = sjf_scheduling(processes)
for name, start, end in result:
    print(f"{name}: {start}~{end} (執行 {end-start}ms)")

3. Round Robin (時間片輪轉)

每個行程輪流執行一個固定的「時間片 (time quantum)」。

def round_robin(processes, time_quantum=2):
    """
    Round Robin 排程
    """
    queue = []
    remaining = {p[0]: p[2] for p in processes}  # 剩餘執行時間
    arrival_map = {p[0]: p[1] for p in processes}
    
    time = 0
    schedule = []
    ready_queue = []
    
    # 初始載入已到達行程
    for p in processes:
        if p[1] <= time:
            ready_queue.append(p[0])
    
    while remaining:
        if not ready_queue:
            time += 1
            # 檢查新到達的行程
            for name, arrival, _ in processes:
                if arrival == time and name in remaining:
                    ready_queue.append(name)
            continue
        
        name = ready_queue.pop(0)
        execute = min(time_quantum, remaining[name])
        schedule.append((name, time, time + execute))
        time += execute
        remaining[name] -= execute
        
        if remaining[name] <= 0:
            del remaining[name]
        else:
            ready_queue.append(name)
        
        # 加入新到達的行程
        for p_name, arrival, _ in processes:
            if p_name in remaining and p_name not in ready_queue:
                start_time = schedule[-1][2]
                if start_time < arrival <= time:
                    ready_queue.append(p_name)
    
    return schedule

result = round_robin(processes, time_quantum=2)
for name, start, end in result:
    print(f"{name}: {start}~{end}")

行程狀態轉換

一個行程在生命週期中會經歷以下狀態:

New → Ready → Running → Terminated
           ↕          ↓
        Waiting  ←  I/O 或事件等待
  • New: 剛被建立
  • Ready: 準備好執行,等待 CPU 分配
  • Running: 正在 CPU 上執行
  • Waiting: 等待 I/O 或某個事件
  • Terminated: 執行完畢

上下文切換 (Context Switch)

當 CPU 從一個行程切換到另一個行程時,必須:

  1. 儲存:將目前行程的 CPU 暫存器、程式計數器 (PC)、記憶體映射等狀態儲存到 PCB
  2. 載入:從下一個行程的 PCB 中讀取之前儲存的狀態,恢復到 CPU 暫存器

這個動作稱為 上下文切換

| 切換對象 | 開銷 | 說名 | |----------|------|------| | 同行程的執行緒切換 | 約 1~10 µs | 只需切換暫存器,記憶體映射不變 | | 不同行程的切換 | 約 10~100 µs | 需要切換記憶體映射(TLB 刷新) |

⚠️ 效能警示:如果上下文切換太頻繁,CPU 可能會花 30~50% 的時間在切換而不是真正執行程式。因此現代 OS 會動態調整時間片大小來平衡。

行程間通訊 (IPC)

既然行程彼此隔離,它們要如何交換資料?這就是 行程間通訊 (Inter-Process Communication, IPC) 的工作。

| IPC 方式 | 說明 | 適用場景 | |----------|------|---------| | Pipe (管線) | 單向資料流,一個行程寫入,另一個讀取 | 父子行程間通訊 | | Signal (信號) | 非同步事件通知,如 SIGINT、SIGKILL | 行程控制與中斷 | | Socket (插座) | 雙向通訊,可用於網路或本地 | 跨機器通訊 | | Shared Memory | 共享記憶體區塊,速度最快 | 大量資料交換 | | Message Queue | 訊息佇列,非同步傳送與接收 | 解耦合的通訊模式 |

# Pipe 範例:父子行程通訊
import multiprocessing

def child_process(conn):
    """子行程:接收資料並回應"""
    msg = conn.recv()
    print(f"📨 子行程收到: {msg}")
    conn.send(f"收到 '{msg}',謝謝!")
    conn.close()

# 建立 Pipe
parent_conn, child_conn = multiprocessing.Pipe()

# 建立子行程
child = multiprocessing.Process(target=child_process, args=(child_conn,))
child.start()

# 父行程發送資料
parent_conn.send("Hello from parent!")
response = parent_conn.recv()
print(f"📩 父行程收到回覆: {response}")

child.join()

執行緒同步與競賽條件

當多個執行緒共享同一份資料時,就會產生 競賽條件 (Race Condition)

# ❌ 危險:沒有同步的多執行緒
import threading

counter = 0

def increment_bad():
    global counter
    for _ in range(100000):
        # 這三行不是原子操作!
        temp = counter      # 讀取
        temp = temp + 1     # 計算
        counter = temp      # 寫回

threads = []
for _ in range(2):
    t = threading.Thread(target=increment_bad)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"期望值: 200000")
print(f"實際值: {counter}")
print(f"差異: {200000 - counter}")

解決方案:鎖 (Lock)

# ✅ 正確:使用 Lock 保護共享資源
counter_safe = 0
lock = threading.Lock()

def increment_safe():
    global counter_safe
    for _ in range(100000):
        with lock:  # 確保一次只有一個執行緒能進入
            temp = counter_safe
            temp = temp + 1
            counter_safe = temp

threads = []
for _ in range(2):
    t = threading.Thread(target=increment_safe)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"使用 Lock 後的實際值: {counter_safe}")

死結 (Deadlock)

當兩個執行緒各自持有對方需要的資源時,就會發生 死結

執行緒 A 持有 Lock 1,等待 Lock 2
執行緒 B 持有 Lock 2,等待 Lock 1
        → 兩個執行緒永遠等待下去

死結發生的四個必要條件:\n1. 互斥 (Mutual Exclusion):資源一次只能被一個執行緒使用\n2. 持有並等待 (Hold and Wait):持有資源的同時等待其他資源\n3. 不可搶佔 (No Preemption):資源不能被強制取走\n4. 循環等待 (Circular Wait):形成等待循環\n\n預防死結最簡單的方法:固定資源取得順序。\n\npython\n# ✅ 避免死結:所有執行緒以相同順序取得鎖\nlock1 = threading.Lock()\nlock2 = threading.Lock()\n\ndef worker():\n with lock1: # 先拿 lock1\n with lock2: # 再拿 lock2\n print("安全操作共享資源")\n\n\n## 多核心與平行處理\n\n現代 CPU 都有多個核心,這讓行程與執行緒的運作方式有了新的維度:\n\n| 概念 | 單核心 | 多核心 |\n|------|--------|--------|\n| 並發 (Concurrency) | 快速切換,看起來同時 | 可以真正同時執行 |\n| 平行 (Parallelism) | 不可能真正平行 | 多個執行緒在不同核心上同時執行 |\n| 效能提升 | 來自減少閒置 | 來自核心數量的線性增長 |\n\npython\n# 檢查你的 CPU 核心數\nimport os\nprint(f"CPU 核心數: {os.cpu_count()}")\n\n# 如果是 8 核心,理論上可以讓 8 個執行緒「真正同時」執行\n\n\n## 本章總結\n\n| 概念 | 核心重點 |\n|------|---------|\n| 行程 (Process) | 獨立記憶體空間,重量級,適合隔離性高的任務 |\n| 執行緒 (Thread) | 共享記憶體,輕量級,適合並行計算 |\n| PCB | 作業系統管理行程的核心資料結構 |\n| FIFO | 先來先做,簡單但短任務可能被阻塞 |\n| SJF | 最短先做,平均等待最少,但可能造成飢餓 |\n| Round Robin | 輪流執行,互動性最好,時間片大小是關鍵 |\n| IPC | 行程間通訊的五種方式:Pipe、Signal、Socket、Shared Memory、Message Queue |\n| 同步 | Lock、Semaphore 等機制防止競賽條件 |\n| 死結 | 四個必要條件,固定資源順序是最簡單的預防法 |\n\n## 實戰練習\n\n> 💡 Vibe Coding 練習:請 AI 幫你建立一個完整的「行程管理互動式學習工具」:\n> 1. 行程排程模擬器:輸入多個行程,選擇 FIFO / SJF / Round Robin,輸出甘特圖與統計比較\n> 2. IPC 展示工具:可視化展示 Pipe、Shared Memory、Message Queue 的資料流動\n> 3. 競賽條件模擬器:展示有 Lock 和沒有 Lock 的差異,用圖表呈現錯誤率\n> 4. 死結偵測小遊戲:給定資源分配圖,判斷是否會發生死結\n> 5. 多核心效能比較:模擬不同核心數(1/2/4/8)對相同工作量的影響\n> 用 Streamlit 或 HTML + JavaScript 打造漂亮互動介面


為什麼 Process 和 Thread 不同?

| 比較 | Process | Thread | |:----|:-------|:------| | 資源隔離 | 各有獨立的記憶體空間 | 共用同一個 Process 的記憶體 | | 切換成本 | 高(需要切換記憶體映射) | 低(只需切換暫存器) | | 通訊方式 | IPC(pipe、socket、shared memory) | 直接讀寫共用變數 | | 穩定性 | 一個 crash 不影響其他 | 一個 crash 可能帶垮整個 Process | | 建立時間 | 慢 | 快 |

實務應用

  • 多 Process:適合需要隔離的任務(瀏覽器的每個 tab 是一個 process)
  • 多 Thread:適合需要大量 I/O 的任務(Web server 處理請求)
  • Async/Await:Python 和 JS 的協程(Coroutine)是單 thread 但非阻塞

下一章預告:記憶體管理

Process 的記憶體空間是獨立的——這帶來了下一個問題:記憶體管理。

會員專屬免費教學

本章節為註冊會員專屬的免費開放內容!請先登入或註冊會員,即可立即解鎖閱讀。

立即登入 / 註冊