1 为什么用 cutadapt 框架
处理 FASTQ 文件时,最常见的需求就是对每条 read 做一些变换(截取、修剪、校正等)。数据量动辄几十 GB,单进程太慢,所以需要并行。
自己用 multiprocessing.Pool 实现并行 FASTQ 处理看似简单,但细节很多:
- 块读取与分发
- 进程间通信的序列化开销
- 输出的有序写回
- gzip 压缩的线程安全
cutadapt 已经把这些都实现好了。它的并行框架设计得很干净,核心只需理解三个概念:Modifier、Step 和 Pipeline。
本文解读 cutadapt 的并行架构,并演示如何编写一个 SingleEndModifier 来做 barcode 校正。
完整脚本:fastq_bc.py
2 架构概览
2.1 数据流
cutadapt 的并行处理数据流如下:
ReaderProcess ──pipe──> Worker 1 ──pipe──> ┐
Worker 2 ──pipe──> OrderedChunkWriter ──> 文件
Worker 3 ──pipe──> ┘
- ReaderProcess:独立进程,用
dnaio.read_chunks()按块读取 FASTQ,通过Connectionpipe 发送给空闲 worker - WorkerProcess:接收数据块,调用
Pipeline.process_reads()处理,结果通过 pipe 发回主进程 - OrderedChunkWriter:在主进程中缓冲乱序到达的结果块,按原始顺序写入文件
这不是 multiprocessing.Pool,而是自定义的进程 + 管道模型,避免了 Pool 的序列化开销。
2.2 核心类关系
make_runner(input_paths, cores)
├── cores == 1 → SerialPipelineRunner
└── cores > 1 → ParallelPipelineRunner
├── ReaderProcess
├── WorkerProcess × N
└── OrderedChunkWriter
SingleEndPipeline(modifiers, steps)
├── modifiers: List[SingleEndModifier] # 变换
└── steps: List[SingleEndStep] # 过滤/输出
OutputFiles(proxied=True/False)
├── open_record_writer() → 直写 或 ProxyRecordWriter
└── proxy_files() → 主进程 drain 代理写入器
3 Pipeline 工作机制
SingleEndPipeline.process_reads() 的核心循环非常简洁:
每个 step 接收 read,处理后返回:
SequenceRecord→ 传递给下一个 stepNone→ read 被消费(写入文件或丢弃),不再传递
ModificationInfo 携带 read 的修改信息(adapter 匹配、截取位置等),在 modifier 之间传递。
4 关键 API
4.1 SingleEndModifier
自定义处理逻辑的核心基类:
实现要点:接收一条 read,返回处理后的 read。不能返回 None(消费 read 是 Sink 的职责)。
cutadapt 内置的 modifier 例子:
4.2 SingleEndSink
Pipeline 的最终 step,负责写入输出:
4.3 OutputFiles
管理输出文件的写入。关键在于 proxied 参数:
当 proxied=True(多进程模式),open_record_writer() 返回 ProxyRecordWriter。它内部用 BytesIO 缓冲写入内容,主进程通过 proxy_files().drain() 回收并写入实际文件。这样 worker 进程不需要直接操作文件句柄。
4.4 make_runner
工厂函数,根据核心数选择串行或并行 runner:
5 双端模式
cutadapt 也提供了完整的双端 API,接口和单端几乎一样,只是每个方法多了一个 read2 参数。
5.1 核心类
# cutadapt/modifiers.py
class PairedEndModifier(ABC):
@abstractmethod
def __call__(
self, read1: SequenceRecord, read2: SequenceRecord,
info1: ModificationInfo, info2: ModificationInfo,
) -> Tuple[SequenceRecord, SequenceRecord]
# cutadapt/steps.py
class PairedEndStep(ABC):
@abstractmethod
def __call__(self, read1, read2, info1, info2) -> Optional[RecordPair]
# cutadapt/steps.py
class PairedEndSink(PairedEndStep):
def __call__(self, read1, read2, info1, info2):
self.writer.write(read1, read2)
return None5.2 组装差异
双端模式只需把 SingleEndPipeline 替换为 PairedEndPipeline,输入传两个文件路径:
# 输入:两个文件
input_paths = InputPaths("R1.fastq.gz", "R2.fastq.gz", interleaved=False)
with make_runner(input_paths, cores=4) as runner:
outfiles = OutputFiles(
proxied=cores > 1,
qualities=runner.input_file_format().has_qualities(),
file_opener=file_opener,
interleaved=False,
)
writer = outfiles.open_record_writer("R1_out.fq.gz", "R2_out.fq.gz")
# modifiers 支持两种形式:
# 1. PairedEndModifier 子类(直接操作 read pair)
# 2. 元组 (SingleEndModifier, SingleEndModifier)——分别作用于 R1 和 R2
modifiers = [
(barcode_corrector_r1, barcode_corrector_r2), # 元组形式
PairedEndRenamer("sample"), # PairedEndModifier 子类
]
sink = PairedEndSink(writer)
pipeline = PairedEndPipeline(modifiers=modifiers, steps=[sink])
stats = runner.run(pipeline, DummyProgress(), outfiles)PairedEndPipeline 的 modifiers 列表支持混合使用:元组形式的 (mod1, mod2) 会被自动包装为 PairedEndModifierWrapper,PairedEndModifier 子类直接使用。元组中的元素可以设为 None 表示只处理其中一端。
6 实战:Barcode 校正
以 barcode 校正为例,完整展示如何复用 cutadapt 框架。
6.1 校正算法
采用 PISA 的 k-mer 索引方法:
- 构建索引:对白名单中每条 barcode 提取 k-mer(k=5),建立 k-mer → barcode 集合的反向索引
- 查询校正:从 query barcode 提取 k-mer,通过索引召回候选 barcode,逐一计算 Hamming 距离
- 唯一命中:仅接受唯一命中且距离 ≤ max_dist 的结果。多个候选距离相同则返回 None(避免歧义校正)
白名单: [AGTCGA, AGTTGA, CGTAGC]
构建 k-mer 索引 (k=5):
AGTCG → {AGTCGA}
GTCGA → {AGTCGA}
AGTTG → {AGTTGA}
GTTGA → {AGTTGA}
CGTAG → {CGTAGC}
GTAGC → {CGTAGC}
查询 "AGTCCA" (距离 AGTCGA 为 1):
k-mer "AGTCC" → 无命中
k-mer "GTCCA" → 无命中
→ 候选为空,无法校正
查询 "AGTCGT" (距离 AGTCGA 为 1):
k-mer "AGTCG" → {AGTCGA} ✓ 命中候选
k-mer "GTCGT" → 无命中
→ 候选 {AGTCGA}, Hamming=1, 唯一命中 → 校正为 AGTCGA
6.2 实现 SingleEndModifier
from cutadapt.modifiers import SingleEndModifier
class BarcodeCorrectorModifier(SingleEndModifier):
def __init__(self, corrector, barcode_length):
self.corrector = corrector
self.barcode_length = barcode_length
self.corrected = 0
self.total = 0
def __call__(self, read, info):
self.total += 1
bl = self.barcode_length
barcode = read.sequence[:bl]
corrected = self.corrector.correct(barcode)
if corrected is not None and corrected != barcode:
self.corrected += 1
new_seq = corrected + read.sequence[bl:]
read = SequenceRecord(read.name, new_seq, read.qualities)
corr_tag = corrected if (corrected is not None and corrected != barcode) else ""
read = SequenceRecord(
f"{read.name}|||CR:Z:{barcode}|||CB:Z:{corr_tag}",
read.sequence, read.qualities,
)
return read逻辑很简单:提取前 N 个碱基作为 barcode,调用校正器,成功则替换序列。
6.3 组装 Pipeline
from cutadapt.files import InputPaths, OutputFiles, FileOpener
from cutadapt.runners import make_runner
from cutadapt.pipeline import SingleEndPipeline
from cutadapt.steps import SingleEndSink
from cutadapt.utils import DummyProgress
input_paths = InputPaths("input.fastq.gz")
file_opener = FileOpener(compression_level=1, threads=2)
with make_runner(input_paths, cores=4) as runner:
outfiles = OutputFiles(
proxied=cores > 1,
qualities=runner.input_file_format().has_qualities(),
file_opener=file_opener,
interleaved=False,
)
writer = outfiles.open_record_writer("output.fastq.gz")
bc_modifier = BarcodeCorrectorModifier(corrector, barcode_length)
sink = SingleEndSink(writer)
pipeline = SingleEndPipeline(
modifiers=[bc_modifier],
steps=[sink],
)
stats = runner.run(pipeline, DummyProgress(), outfiles)整个流程就是:构建 modifier → 构建 sink → 组装 pipeline → 运行。cutadapt 负责:
- 多进程的创建和通信
- FASTQ 的按块读取和有序写回
- gzip 的并行压缩
- 进程异常处理
你只需要关注自己的处理逻辑。
6.4 完整脚本
完整脚本封装为 typer CLI 工具:
参数说明:
| 参数 | 说明 |
|---|---|
input_files |
输入 FASTQ 文件 |
-w, --whitelist |
barcode 白名单文件,每行一条 |
-o, --output |
输出文件路径 |
-c, --cores |
并行进程数,默认使用全部 CPU |
-l, --barcode-length |
barcode 长度,默认自动检测 |
-d, --max-dist |
最大 Hamming 距离,默认 1 |
-k, --kmer-size |
k-mer 索引的 k 值,默认 5 |
--compression-level |
gzip 压缩等级 1-9 |
7 扩展思路
理解了这个框架后,可以很方便地扩展其他处理逻辑:
添加质量过滤:实现一个 SingleEndStep,不满足质量阈值时返回 None 丢弃 read:
多个 modifier 和 step 可以自由组合到同一个 pipeline 中,cutadapt 会按顺序执行。
8 参考
- cutadapt 源码
- PISA — k-mer 索引 barcode 校正
- dnaio — FASTQ/FASTA 读写库