ジョブスケジューリング
🔥 Vibe プロンプト
「OR-Tools CP-SATで3台の機械で8つのジョブをスケジュール。総完了時間を最小化。ガントチャートを出力。」
ジョブスケジューリングとは?
Job Shop Scheduling (JSS) は、各ジョブが特定の機械で処理され、ジョブ間に依存関係がある場合に、総完了時間(makespan)を最小化する問題です。
| 実世界の応用 | |-------------| | 🏭 製造業: 切削→溶接→組立の各工程をスケジュール | | 🖥️ CI/CDパイプライン: ビルド→テスト→デプロイを異なるランナーに割り当て | | 🚚 物流: トラックのルートと配送順序をスケジュール | | 🏗️ 建設: 基礎工事→フレーム→内装の順序を最適化 |
数学的定式化
目的: C_max(メイクスパン)を最小化
制約:
1. 各ジョブは所定の機械で処理
2. 同じ機械上のジョブは重複不可
3. 先行ジョブ完了後に後続ジョブ開始
変数:
s_j = ジョブjの開始時間
e_j = ジョブjの終了時間
CP-SATによる実装
from ortools.sat.python import cp_model
# データ: (job_id, machine_id, duration)
jobs = [
(0, 0, 3), # Job 0: Machine 0, 3h
(1, 1, 2), # Job 1: Machine 1, 2h
(2, 2, 5), # Job 2: Machine 2, 5h
(3, 0, 4),
(4, 1, 3),
(5, 2, 2),
(6, 0, 2),
(7, 1, 4),
]
# 先行制約: (before, after)
precedences = [(0, 3), (1, 4), (2, 5)]
num_machines = 3
num_jobs = len(jobs)
# 上限 = 全ジョブ合計時間
horizon = sum(d for _, _, d in jobs)
model = cp_model.CpModel()
# 変数
starts = {}
ends = {}
intervals = {}
for jid, mid, dur in jobs:
starts[jid] = model.NewIntVar(0, horizon, f'start_{jid}')
ends[jid] = model.NewIntVar(0, horizon, f'end_{jid}')
intervals[jid] = model.NewIntervalVar(starts[jid], dur, ends[jid], f'int_{jid}')
# 制約1: 同一機械のジョブは重複不可
machine_to_jobs = {m: [] for m in range(num_machines)}
for jid, mid, _ in jobs:
machine_to_jobs[mid].append(jid)
for m, jids in machine_to_jobs.items():
model.AddNoOverlap([intervals[j] for j in jids])
# 制約2: 先行制約
for before, after in precedences:
model.Add(starts[after] >= ends[before])
# 目的: makespan最小化
makespan = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(makespan, [ends[jid] for jid, _, _ in jobs])
model.Minimize(makespan)
# 求解
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 10
status = solver.Solve(model)
# 結果出力
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print(f"\n=== スケジューリング結果 ===")
print(f"最適メイクスパン: {solver.Value(makespan)} 時間")
print(f"状態: {'最適' if status == cp_model.OPTIMAL else '実行可能'}")
print(f"\n{'Job':<8} {'機械':<8} {'時間':<8} {'開始':<8} {'終了':<8}")
print(f"{'-'*40}")
for jid, mid, dur in sorted(jobs):
s = solver.Value(starts[jid])
e = solver.Value(ends[jid])
print(f"J{job:<7} M{mid:<7} {dur:<8h} {s:<8} {e:<8}")
# ガントチャート
print(f"\nガントチャート:")
for m in range(num_machines):
timeline = [' '] * solver.Value(makespan)
for jid, mid, dur in jobs:
if mid == m:
s = solver.Value(starts[jid])
e = solver.Value(ends[jid])
for t in range(s, e):
timeline[t] = chr(65 + jid) # A, B, C...
print(f"M{m}: [{''.join(timeline)}]")
大規模インスタンス用優先ルール
def schedule_by_priority(jobs, precedences, rule="SPT"):
"""優先ルールによるスケジューリング
SPT=最短処理時間, LPT=最長処理時間, EDD=最も早い納期
"""
machine_available = {m: 0 for _, m, _ in jobs}
job_progress = {}
remaining = sorted(jobs, key=lambda j: j[0])
while remaining:
available = []
for jid, mid, dur in remaining:
preds_done = all(p[1] in job_progress for p in precedences if p[0] == jid)
if preds_done:
available.append((jid, mid, dur))
if not available:
break # デッドロック
if rule == "SPT":
available.sort(key=lambda j: j[2])
elif rule == "LPT":
available.sort(key=lambda j: -j[2])
elif rule == "EDD":
available.sort(key=lambda j: j[1])
jid, mid, dur = available[0]
start = machine_available[mid]
end = start + dur
machine_available[mid] = end
job_progress[jid] = (start, end)
remaining = [j for j in remaining if j[0] != jid]
return max(end for _, end in job_progress.values())
for rule in ["SPT", "LPT", "EDD"]:
makespan = schedule_by_priority(jobs, precedences, rule)
print(f"{rule}: makespan = {makespan}")
まとめ
| 項目 | 詳細 | |------|------| | 問題 | ジョブを機械に割り当て、完了時間を最小化 | | 複雑さ | NP困難 | | CP-SAT | 小規模(<10ジョブ)で最適解 | | 優先ルール | SPT(高速), LPT, EDD(納期順守) | | ガントチャート | 各機械のタイムラインを可視化 | | 応用 | 製造、CI/CD、物流、建設