The ML Arsenal
What works in production on 2012 hardware with 50ms latency budgets and skeptical process engineers. Not what is state-of-the-art in research papers.
Virtual Metrology: Predicting Y Before Y Arrives
Virtual Metrology is the signature ML application in semiconductor manufacturing. You predict metrology results (film thickness, critical dimension, defect count) from process sensor data before the physical measurement completes. Metrology takes 4 to 8 hours per wafer; VM predicts in milliseconds. Ground truth arrives 60 to 90 days later. Wrong predictions accumulate silently.
The four mandatory components
Process Tool (Level 1)
Sensors: RF power, pressure, OES, gas flow
|
| raw 100 Hz trace data
v
Feature Extraction (Level 2)
Domain-driven, physically interpretable
Every feature has a one-sentence physical explanation
|
| feature vector
v
Model Core (XGBoost + SHAP) Reliability Index (Autoencoder)
Predicts: thickness, CD, defect Measures: OOD distance
Interprets: SHAP per prediction Trained on commissioning data only
| |
+---------------+-----------------------+
|
v
Decision Router
Low RI -> accept VM prediction
High RI -> abstain, route to physical metrology
Sensor alarm -> suspend model immediatelyComponent 1: Feature extraction
Every feature must have a one-sentence physical interpretation. If you cannot explain what a feature measures in terms of plasma physics, tool mechanics, or process chemistry, it should not be in the model.
| Feature | Physical Interpretation | Computation |
|---|---|---|
| rf_power_mean | Average energy delivered to plasma, controlling ion bombardment flux and etch rate | mean(RF_Forward_Power) over main etch step |
| chamber_pressure_slope | Rate of pressure change, indicating gas consumption rate or potential leak | np.polyfit(time, pressure, 1)[0] |
| oes_520nm_max_derivative | Maximum rate of change in silicon emission, early endpoint signal | max(np.diff(OES_520nm)) |
| match_cap_position | Tuning capacitor position at step end, leading indicator of wear and arc risk | main_etch["match_capacitor_a"].iloc[-1] |
def extract_vm_features(fdc_trace: pd.DataFrame,
mes_context: dict) -> pd.Series:
"""Extract VM features. Every feature has physical interpretation."""
step = fdc_trace[
(fdc_trace['timestamp'] >= mes_context['step_start']) &
(fdc_trace['timestamp'] <= mes_context['step_end'])
]
t = (step['timestamp'] - step['timestamp'].iloc[0]).dt.total_seconds()
return pd.Series({
'rf_power_mean': step['rf_forward_power'].mean(),
'rf_power_std': step['rf_forward_power'].std(),
'pressure_slope': np.polyfit(t, step['chamber_pressure'], 1)[0],
'oes_520nm_max_deriv': np.diff(step['oes_520nm']).max(),
'match_cap_position': step['match_capacitor_a'].iloc[-1],
'step_duration': t.iloc[-1], # Recipe compliance check
})Component 2: Model core
Gradient-boosted trees with SHAP. Not neural networks (black box, too slow). Not random forest (slower inference, less consistent importance). GBTs are the production standard for tabular sensor data.
import xgboost as xgb
import shap
model = xgb.XGBRegressor(
n_estimators=100,
max_depth=4, # Shallow: faster inference, more interpretable
learning_rate=0.1,
subsample=0.8, # Handles PM-cycle concept drift
colsample_bytree=0.8,
objective='reg:squarederror',
random_state=42
)
model.fit(X_train, y_train)
# SHAP explanation for process engineer (required, not optional)
def explain_prediction(model, X_instance, feature_names):
explainer = shap.TreeExplainer(model)
sv = explainer.shap_values(X_instance)
top3 = np.argsort(np.abs(sv[0]))[-3:][::-1]
parts = [f"{feature_names[i]}={X_instance.iloc[0,i]:.2f} "
f"({'increases' if sv[0][i] > 0 else 'decreases'}) prediction"
for i in top3]
return "Driven by: " + "; ".join(parts)Component 3: Reliability Index (OOD detection)
from tensorflow.keras import layers, Input, Model
def build_ri_autoencoder(input_dim: int, encoding_dim: int = 8):
"""Reliability Index = reconstruction MSE. High RI = OOD = abstain."""
x = inp = Input(shape=(input_dim,))
x = layers.Dense(16, activation='relu')(x)
x = layers.Dense(encoding_dim, activation='relu')(x)
x = layers.Dense(16, activation='relu')(x)
x = layers.Dense(input_dim, activation='linear')(x)
ae = Model(inp, x)
ae.compile(optimizer='adam', loss='mse')
return ae
# Train ONLY on commissioning data (pm_sequence = 0)
ri_model = build_ri_autoencoder(input_dim=X_commissioning.shape[1])
ri_model.fit(X_commissioning, X_commissioning,
epochs=50, batch_size=32, validation_split=0.1, verbose=0)
def compute_ri(autoencoder, X):
return np.mean((X - autoencoder.predict(X, verbose=0))**2, axis=1)
# Threshold: commissioning mean + 3 sigma
ri_train = compute_ri(ri_model, X_commissioning)
ri_threshold = ri_train.mean() + 3 * ri_train.std()Component 4: Cost-calibrated decision router
def calibrate_abstention_threshold(ri_val, y_true, y_pred,
cost_fn=2.5e6, cost_fp=500,
tolerance=2.0):
"""
cost_fn: false negative ($2.5M per ghost excursion)
cost_fp: false positive ($500 per unnecessary physical metrology)
Ratio 5000:1 justifies aggressive abstention.
"""
thresholds = np.linspace(ri_val.min(), ri_val.max(), 100)
costs = []
for thresh in thresholds:
fn = ((ri_val <= thresh) & (np.abs(y_pred - y_true) > tolerance)).sum()
fp = ((ri_val > thresh) & (np.abs(y_pred - y_true) <= tolerance)).sum()
costs.append(fn * cost_fn + fp * cost_fp)
idx = np.argmin(costs)
return thresholds[idx], costs[idx]
optimal_thresh, min_cost = calibrate_abstention_threshold(ri_val, y_true, y_pred)
print(f"Optimal threshold: {optimal_thresh:.3f} Expected annual cost: ${min_cost:,.0f}")Fault Detection and Classification (FDC)
VM predicts quality. FDC detects when something is wrong right now, stopping bad wafers before processing completes. Use the hierarchy: start with univariate SPC, add multivariate only when individual sensors miss the excursion, add model-based only when both miss it.
class UnivariateFDC:
"""Per-sensor SPC. fit() on commissioning, update() per new observation."""
def __init__(self, chart_type='ewma', **params):
self.chart_type = chart_type
self.params = params
def fit(self, data: pd.Series):
if self.chart_type == 'shewhart':
self.center, s = data.mean(), data.std()
self.ucl, self.lcl = self.center + 3*s, self.center - 3*s
elif self.chart_type == 'ewma':
self.lam = self.params.get('lam', 0.2)
self.center = data.mean()
s_ewma = data.std() * (self.lam / (2 - self.lam))**0.5
self.ucl, self.lcl = self.center + 3*s_ewma, self.center - 3*s_ewma
self.ewma_val = self.center
elif self.chart_type == 'cusum':
self.target, self.sigma = data.mean(), data.std()
self.h = self.params.get('h', 4)
self.k = self.params.get('k', 0.5)
self.C_plus = self.C_minus = 0
def update(self, value: float) -> bool:
if self.chart_type == 'shewhart':
return value > self.ucl or value < self.lcl
elif self.chart_type == 'ewma':
self.ewma_val = self.lam * value + (1 - self.lam) * self.ewma_val
return self.ewma_val > self.ucl or self.ewma_val < self.lcl
elif self.chart_type == 'cusum':
z = (value - self.target) / self.sigma
self.C_plus = max(0, self.C_plus + z - self.k)
self.C_minus = max(0, self.C_minus - z - self.k)
return self.C_plus > self.h or self.C_minus > self.hclass ModelBasedFDC:
"""Predict target sensor from correlated sensors. Alarm on large residuals."""
def __init__(self, target_sensor: str, predictor_sensors: list):
import xgboost as xgb
self.target = target_sensor
self.predictors = predictor_sensors
self.model = xgb.XGBRegressor(n_estimators=50, max_depth=3)
def fit(self, data: pd.DataFrame):
self.model.fit(data[self.predictors], data[self.target])
residuals = data[self.target] - self.model.predict(data[self.predictors])
mu, sigma = residuals.mean(), residuals.std()
self.ucl, self.lcl = mu + 3*sigma, mu - 3*sigma
def update(self, obs: pd.Series) -> dict:
y_pred = self.model.predict(obs[self.predictors].values.reshape(1, -1))[0]
residual = obs[self.target] - y_pred
return {'predicted': y_pred, 'actual': obs[self.target],
'residual': residual,
'alarm': residual > self.ucl or residual < self.lcl}The Model Selection Decision Tree
Latency and interpretability constraints determine your algorithm. This is not a preference; it is a derivation. Start with the constraint, end with the algorithm.
What is the latency requirement?
|
+-- < 50ms (real-time control, tool floor)
| |
| +-- Interpretability required -> Linear regression, EWMA controller
| | Examples: R2R control, endpoint detection
| | Why: coefficients map to physical parameters engineers control
| |
| +-- Safety interlock (no ML allowed) -> Hardware PLC logic only
| Why: ML inference can fail; hardwired interlocks cannot
|
+-- < 2 seconds (APC server, between wafers)
| |
| +-- Interpretability required -> Gradient-boosted trees + SHAP
| | Examples: Virtual Metrology, FDC classification
| | Why: SHAP gives per-prediction explanation in physical terms
| |
| +-- Interpretability minimal -> Small quantized CNN (ONNX INT8)
| Examples: CD-SEM image analysis, optical defect inspection
| Why: only use CNNs where images are the input; never for sensors
|
+-- Hours to days (enterprise analytics, offline)
|
+-- Interpretability required -> XGBoost with SHAP, Random Forest
| Examples: fleet-wide yield prediction, root cause attribution
|
+-- Research only -> Deep learning, transformers
Not production: requires GPU, massive data, no interpretability| Latency Budget | Interpretability | Algorithm | When to Use |
|---|---|---|---|
| < 50ms | Required | Linear, EWMA | R2R control, endpoint detection |
| < 50ms | N/A (safety) | PLC hardwired | Safety interlocks only |
| < 2s | Required | GBT + SHAP | Virtual Metrology, FDC |
| < 2s | Minimal | CNN (ONNX INT8) | Image metrology only |
| Hours | Required | XGBoost + SHAP | Fleet analysis, yield prediction |
| Hours | Not required | Deep learning | Research only, not production |
Edge Deployment: From Notebook to Production
The air gap changes everything. Your development environment never touches production. Deployment is physical media transfer, change control approval, and 2 to 6 weeks of waiting. Design for the production stack from day one.
Development (Your Laptop / Server)
Train model (XGBoost, sklearn, PyTorch)
Validate: walk-forward, per-PM, OOD detection
Export to production format:
Trees: XGBoost native binary or JSON
Neural nets: ONNX (torch.onnx.export)
Linear: Coefficients as JSON
Build inference wrapper (pure Python, stdlib only)
Package: model + normalization params + feature extractors
Self-test script: known input -> known output
|
| USB drive or secure courier
v
Air Gap
|
v
Production IT
Antivirus scan
Code review (no network calls, no obfuscation, no pip installs)
Change control ticket submission
Approval: 2 to 6 weeks
|
v
Staging (Mirror Hardware: identical 2012 dual-core PC)
Latency: p99 < 50ms for 10,000 inferences
Memory: < 500MB RSS
Output: bit-exact match to development (within 1e-6)
|
v
Production (Tool Controller)
Shadow mode: log predictions, do not act on them (first 100 wafers)
Validation lot: physical metrology confirms VM accuracy
Active mode: predictions used for routing decisionsimport torch, onnx, onnxruntime as ort, time, numpy as np
# Export to ONNX (after training any PyTorch model)
dummy = torch.randn(1, input_dim)
torch.onnx.export(
model, dummy, "vm_model.onnx",
export_params=True, opset_version=11,
do_constant_folding=True,
input_names=['features'], output_names=['prediction'],
dynamic_axes={'features': {0: 'batch'}, 'prediction': {0: 'batch'}}
)
# Validate the export
onnx.checker.check_model(onnx.load("vm_model.onnx"))
# Validate latency on TARGET hardware (not your laptop)
def validate_latency(model_path, input_shape, n=10_000, target_p99_ms=50):
session = ort.InferenceSession(model_path)
latencies = []
for _ in range(n):
x = np.random.randn(*input_shape).astype(np.float32)
t0 = time.perf_counter()
session.run(None, {'features': x})
latencies.append((time.perf_counter() - t0) * 1000)
p99 = np.percentile(latencies, 99)
if p99 > target_p99_ms:
raise RuntimeError(f"p99={p99:.1f}ms exceeds {target_p99_ms}ms target")
print(f"p50={np.percentile(latencies,50):.1f}ms p99={p99:.1f}ms max={max(latencies):.1f}ms")The nine-point deployment checklist
Validation: Fab-Specific Methods
Standard k-fold cross-validation gives you an optimistic score that will not match production. Three validation strategies replace it.
Walk-Forward Validator (sklearn-compatible)
from sklearn.model_selection import BaseCrossValidator
import numpy as np
class WalkForwardValidator(BaseCrossValidator):
"""
sklearn-compatible walk-forward validator.
Train on [0:t], test on [t:t+test_size]. Never shuffle.
Plug into cross_val_score() as a drop-in replacement for KFold.
"""
def __init__(self, n_splits=5, test_size=None):
self.n_splits = n_splits
self.test_size = test_size
def split(self, X, y=None, groups=None):
n = len(X)
ts = self.test_size or n // (self.n_splits + 1)
idx = np.arange(n)
for i in range(1, self.n_splits + 1):
s = i * ts
yield idx[:s], idx[s:s + ts]
def get_n_splits(self, X=None, y=None, groups=None):
return self.n_splits
# Usage: exact same API as KFold
from sklearn.model_selection import cross_val_score
scores = cross_val_score(model, X, y,
cv=WalkForwardValidator(n_splits=5),
scoring='neg_mean_squared_error')Per-PM cross-validation
def per_pm_cv(df, model, features, target, pm_col='pm_sequence'):
"""
Validate within PM periods AND across PM boundaries.
within_pm score: can the model interpolate within a maintenance window?
across_pm score: does the model generalize across PM resets?
If across_pm << within_pm, the model has learned PM-specific artifacts
that will not transfer to new PM periods in production.
"""
within, across = [], []
periods = sorted(df[pm_col].unique())
for pm in periods:
d = df[df[pm_col] == pm]
if len(d) < 100: continue
mid = len(d) // 2
model.fit(d.iloc[:mid][features], d.iloc[:mid][target])
within.append(model.score(d.iloc[mid:][features], d.iloc[mid:][target]))
for i in range(len(periods) - 1):
train = df[df[pm_col] == periods[i]]
test = df[df[pm_col] == periods[i+1]]
model.fit(train[features], train[target])
across.append(model.score(test[features], test[target]))
print(f"Within-PM: {np.mean(within):.3f} +/- {np.std(within):.3f}")
print(f"Across-PM: {np.mean(across):.3f} +/- {np.std(across):.3f}")
# Gap > 0.1 = model not generalizing across maintenance events
return {'within_pm': within, 'across_pm': across}Cost-calibrated scoring
def cost_calibrated_score(y_true, y_pred, ri_scores,
cost_fn=2.5e6, cost_fp=500,
tolerance=2.0):
"""
Report total business cost, not accuracy.
For VM: false negative cost >> false positive cost (ratio ~5000:1).
Optimizing accuracy alone produces a model that is statistically
impressive but operationally wrong.
"""
thresholds = np.linspace(ri_scores.min(), ri_scores.max(), 100)
best_cost, best_thresh = np.inf, None
for thresh in thresholds:
fn = ((ri_scores <= thresh) & (np.abs(y_pred - y_true) > tolerance)).sum()
fp = ((ri_scores > thresh) & (np.abs(y_pred - y_true) <= tolerance)).sum()
cost = fn * cost_fn + fp * cost_fp
if cost < best_cost:
best_cost, best_thresh = cost, thresh
abstain_rate = (ri_scores > best_thresh).mean()
print(f"Optimal RI threshold: {best_thresh:.3f}")
print(f"Abstention rate: {abstain_rate:.1%}")
print(f"Expected annual cost at this threshold: ${best_cost * 365 / len(y_true):,.0f}")
return best_threshML Arsenal at a Glance
| Chapter | Core System | Key Implementation |
|---|---|---|
| 3.1 VM | Predict metrology before it arrives | Feature extraction with physical interpretation + XGBoost + SHAP + Autoencoder RI + cost-calibrated router |
| 3.2 FDC | Real-time fault detection in layers | Univariate SPC (Shewhart/EWMA/CUSUM) then T-squared then model-based residual monitoring |
| 3.3 Model Selection | Constraints choose the algorithm | Decision tree: latency constraint > interpretability requirement > specific algorithm |
| 3.4 Edge Deployment | Production on constrained hardware | ONNX export, p99 latency on mirror hardware, nine-point checklist, shadow mode first |
| 3.5 Validation | Fab-specific validation methods | WalkForwardValidator (sklearn-compatible), per-PM CV, cost-calibrated threshold; never standard KFold |