用 cutadapt 框架并行处理 FASTQ

复用 cutadapt 实现自定义多进程 FASTQ 处理,以 barcode 校正为例

解读 cutadapt 的并行架构,演示如何通过 SingleEndModifier 和 SingleEndSink 扩展 pipeline,实现自定义的 FASTQ 处理逻辑。
python
bioinformatics
Author

Liripo

Published

June 16, 2026

1 为什么用 cutadapt 框架

处理 FASTQ 文件时,最常见的需求就是对每条 read 做一些变换(截取、修剪、校正等)。数据量动辄几十 GB,单进程太慢,所以需要并行。

自己用 multiprocessing.Pool 实现并行 FASTQ 处理看似简单,但细节很多:

  • 块读取与分发
  • 进程间通信的序列化开销
  • 输出的有序写回
  • gzip 压缩的线程安全

cutadapt 已经把这些都实现好了。它的并行框架设计得很干净,核心只需理解三个概念:ModifierStepPipeline

本文解读 cutadapt 的并行架构,并演示如何编写一个 SingleEndModifier 来做 barcode 校正。

完整脚本:fastq_bc.py

2 架构概览

2.1 数据流

cutadapt 的并行处理数据流如下:

ReaderProcess ──pipe──> Worker 1 ──pipe──> ┐
                      Worker 2 ──pipe──> OrderedChunkWriter ──> 文件
                      Worker 3 ──pipe──> ┘
  • ReaderProcess:独立进程,用 dnaio.read_chunks() 按块读取 FASTQ,通过 Connection pipe 发送给空闲 worker
  • WorkerProcess:接收数据块,调用 Pipeline.process_reads() 处理,结果通过 pipe 发回主进程
  • OrderedChunkWriter:在主进程中缓冲乱序到达的结果块,按原始顺序写入文件

这不是 multiprocessing.Pool,而是自定义的进程 + 管道模型,避免了 Pool 的序列化开销。

2.2 核心类关系

make_runner(input_paths, cores)
    ├── cores == 1  → SerialPipelineRunner
    └── cores >  1  → ParallelPipelineRunner
                         ├── ReaderProcess
                         ├── WorkerProcess × N
                         └── OrderedChunkWriter

SingleEndPipeline(modifiers, steps)
    ├── modifiers: List[SingleEndModifier]  # 变换
    └── steps:     List[SingleEndStep]      # 过滤/输出

OutputFiles(proxied=True/False)
    ├── open_record_writer() → 直写 或 ProxyRecordWriter
    └── proxy_files()        → 主进程 drain 代理写入器

3 Pipeline 工作机制

SingleEndPipeline.process_reads() 的核心循环非常简洁:

# cutadapt/pipeline.py
modifiers_and_steps = self._modifiers + self._steps
for read in reader:
    info = ModificationInfo(read)
    for step in modifiers_and_steps:
        read = step(read, info)
        if read is None:       # read 被消费,跳出循环
            break

每个 step 接收 read,处理后返回:

  • SequenceRecord → 传递给下一个 step
  • None → read 被消费(写入文件或丢弃),不再传递

ModificationInfo 携带 read 的修改信息(adapter 匹配、截取位置等),在 modifier 之间传递。

4 关键 API

4.1 SingleEndModifier

自定义处理逻辑的核心基类:

# cutadapt/modifiers.py
class SingleEndModifier(ABC):
    @abstractmethod
    def __call__(self, read: SequenceRecord, info: ModificationInfo) -> SequenceRecord

实现要点:接收一条 read,返回处理后的 read。不能返回 None(消费 read 是 Sink 的职责)。

cutadapt 内置的 modifier 例子:

# 无条件截取前 N 个碱基
class UnconditionalCutter(SingleEndModifier):
    def __call__(self, read, info):
        return read[self.start:self.stop]

4.2 SingleEndSink

Pipeline 的最终 step,负责写入输出:

# cutadapt/steps.py
class SingleEndSink(SingleEndStep):
    def __call__(self, read, info):
        self.writer.write(read)    # 写入
        return None                 # 消费 read

4.3 OutputFiles

管理输出文件的写入。关键在于 proxied 参数:

outfiles = OutputFiles(
    proxied=cores > 1,
    qualities=runner.input_file_format().has_qualities(),
    file_opener=file_opener,
    interleaved=False,
)
writer = outfiles.open_record_writer(output_path)

proxied=True(多进程模式),open_record_writer() 返回 ProxyRecordWriter。它内部用 BytesIO 缓冲写入内容,主进程通过 proxy_files().drain() 回收并写入实际文件。这样 worker 进程不需要直接操作文件句柄。

4.4 make_runner

工厂函数,根据核心数选择串行或并行 runner:

# cutadapt/runners.py
def make_runner(inpaths, cores, buffer_size=None):
    if cores > 1:
        return ParallelPipelineRunner(inpaths, n_workers=cores, ...)
    else:
        return SerialPipelineRunner(inpaths.open())

5 双端模式

cutadapt 也提供了完整的双端 API,接口和单端几乎一样,只是每个方法多了一个 read2 参数。

5.1 核心类

# cutadapt/modifiers.py
class PairedEndModifier(ABC):
    @abstractmethod
    def __call__(
        self, read1: SequenceRecord, read2: SequenceRecord,
        info1: ModificationInfo, info2: ModificationInfo,
    ) -> Tuple[SequenceRecord, SequenceRecord]

# cutadapt/steps.py
class PairedEndStep(ABC):
    @abstractmethod
    def __call__(self, read1, read2, info1, info2) -> Optional[RecordPair]

# cutadapt/steps.py
class PairedEndSink(PairedEndStep):
    def __call__(self, read1, read2, info1, info2):
        self.writer.write(read1, read2)
        return None

5.2 组装差异

双端模式只需把 SingleEndPipeline 替换为 PairedEndPipeline,输入传两个文件路径:

# 输入:两个文件
input_paths = InputPaths("R1.fastq.gz", "R2.fastq.gz", interleaved=False)

with make_runner(input_paths, cores=4) as runner:
    outfiles = OutputFiles(
        proxied=cores > 1,
        qualities=runner.input_file_format().has_qualities(),
        file_opener=file_opener,
        interleaved=False,
    )
    writer = outfiles.open_record_writer("R1_out.fq.gz", "R2_out.fq.gz")

    # modifiers 支持两种形式:
    # 1. PairedEndModifier 子类(直接操作 read pair)
    # 2. 元组 (SingleEndModifier, SingleEndModifier)——分别作用于 R1 和 R2
    modifiers = [
        (barcode_corrector_r1, barcode_corrector_r2),  # 元组形式
        PairedEndRenamer("sample"),                     # PairedEndModifier 子类
    ]
    sink = PairedEndSink(writer)

    pipeline = PairedEndPipeline(modifiers=modifiers, steps=[sink])
    stats = runner.run(pipeline, DummyProgress(), outfiles)

PairedEndPipelinemodifiers 列表支持混合使用:元组形式的 (mod1, mod2) 会被自动包装为 PairedEndModifierWrapperPairedEndModifier 子类直接使用。元组中的元素可以设为 None 表示只处理其中一端。

6 实战:Barcode 校正

以 barcode 校正为例,完整展示如何复用 cutadapt 框架。

6.1 校正算法

采用 PISA 的 k-mer 索引方法:

  1. 构建索引:对白名单中每条 barcode 提取 k-mer(k=5),建立 k-mer → barcode 集合的反向索引
  2. 查询校正:从 query barcode 提取 k-mer,通过索引召回候选 barcode,逐一计算 Hamming 距离
  3. 唯一命中:仅接受唯一命中且距离 ≤ max_dist 的结果。多个候选距离相同则返回 None(避免歧义校正)
白名单: [AGTCGA, AGTTGA, CGTAGC]

构建 k-mer 索引 (k=5):
  AGTCG → {AGTCGA}
  GTCGA → {AGTCGA}
  AGTTG → {AGTTGA}
  GTTGA → {AGTTGA}
  CGTAG → {CGTAGC}
  GTAGC → {CGTAGC}

查询 "AGTCCA" (距离 AGTCGA 为 1):
  k-mer "AGTCC" → 无命中
  k-mer "GTCCA" → 无命中
  → 候选为空,无法校正

查询 "AGTCGT" (距离 AGTCGA 为 1):
  k-mer "AGTCG" → {AGTCGA}  ✓ 命中候选
  k-mer "GTCGT" → 无命中
  → 候选 {AGTCGA}, Hamming=1, 唯一命中 → 校正为 AGTCGA

6.2 实现 SingleEndModifier

from cutadapt.modifiers import SingleEndModifier

class BarcodeCorrectorModifier(SingleEndModifier):
    def __init__(self, corrector, barcode_length):
        self.corrector = corrector
        self.barcode_length = barcode_length
        self.corrected = 0
        self.total = 0

    def __call__(self, read, info):
        self.total += 1
        bl = self.barcode_length
        barcode = read.sequence[:bl]

        corrected = self.corrector.correct(barcode)
        if corrected is not None and corrected != barcode:
            self.corrected += 1
            new_seq = corrected + read.sequence[bl:]
            read = SequenceRecord(read.name, new_seq, read.qualities)

        corr_tag = corrected if (corrected is not None and corrected != barcode) else ""
        read = SequenceRecord(
            f"{read.name}|||CR:Z:{barcode}|||CB:Z:{corr_tag}",
            read.sequence, read.qualities,
        )
        return read

逻辑很简单:提取前 N 个碱基作为 barcode,调用校正器,成功则替换序列。

6.3 组装 Pipeline

from cutadapt.files import InputPaths, OutputFiles, FileOpener
from cutadapt.runners import make_runner
from cutadapt.pipeline import SingleEndPipeline
from cutadapt.steps import SingleEndSink
from cutadapt.utils import DummyProgress

input_paths = InputPaths("input.fastq.gz")
file_opener = FileOpener(compression_level=1, threads=2)

with make_runner(input_paths, cores=4) as runner:
    outfiles = OutputFiles(
        proxied=cores > 1,
        qualities=runner.input_file_format().has_qualities(),
        file_opener=file_opener,
        interleaved=False,
    )
    writer = outfiles.open_record_writer("output.fastq.gz")

    bc_modifier = BarcodeCorrectorModifier(corrector, barcode_length)
    sink = SingleEndSink(writer)

    pipeline = SingleEndPipeline(
        modifiers=[bc_modifier],
        steps=[sink],
    )
    stats = runner.run(pipeline, DummyProgress(), outfiles)

整个流程就是:构建 modifier → 构建 sink → 组装 pipeline → 运行。cutadapt 负责:

  • 多进程的创建和通信
  • FASTQ 的按块读取和有序写回
  • gzip 的并行压缩
  • 进程异常处理

你只需要关注自己的处理逻辑。

6.4 完整脚本

完整脚本封装为 typer CLI 工具:

# 安装依赖
pip install cutadapt dnaio typer

# 运行
python fastq_bc.py input.fastq.gz -w whitelist.txt -o output.fastq.gz -c 4

参数说明:

参数 说明
input_files 输入 FASTQ 文件
-w, --whitelist barcode 白名单文件,每行一条
-o, --output 输出文件路径
-c, --cores 并行进程数,默认使用全部 CPU
-l, --barcode-length barcode 长度,默认自动检测
-d, --max-dist 最大 Hamming 距离,默认 1
-k, --kmer-size k-mer 索引的 k 值,默认 5
--compression-level gzip 压缩等级 1-9

7 扩展思路

理解了这个框架后,可以很方便地扩展其他处理逻辑:

添加质量过滤:实现一个 SingleEndStep,不满足质量阈值时返回 None 丢弃 read:

class QualityFilter(SingleEndStep):
    def __call__(self, read, info):
        avg_quality = sum(ord(c) - 33 for c in read.qualities) / len(read.qualities)
        if avg_quality < self.min_quality:
            return None  # 丢弃
        return read

多个 modifier 和 step 可以自由组合到同一个 pipeline 中,cutadapt 会按顺序执行。

8 参考