parser_vcf.py
· 5.8 KiB · Python
原始文件
import os
import re
import gzip
from typing import Tuple, List
import pandas as pd
class QC:
_DP_RE = re.compile(r"(?:^|;)\s*DP=(\d+(?:\.\d+)?)")
def __init__(
self,
input_vcf: str
):
self.input_vcf = input_vcf
def _gt_to_bases(self, gt: str, ref: str, alt: str) -> str | None:
if gt is None:
return None
gt = str(gt)
if gt in {".", "./.", ".|."}:
return None
alts = [] if alt in {None, ".", ""} else str(alt).split(",")
alleles = [ref] + alts # 0->REF, 1..->ALT
if "/" in gt:
parts = gt.split("/")
elif "|" in gt:
parts = gt.split("|")
else:
parts = [gt] # haploid like "1"
bases = []
for p in parts:
if p in {".", ""}:
bases.append(".")
continue
try:
idx = int(p)
except ValueError:
return None
if 0 <= idx < len(alleles):
bases.append(alleles[idx])
else:
return None
return "/".join(bases)
@classmethod
def dp_difference(cls, info: str, dp: int) -> float:
"""
INFO列里的DP 与 sample列里的DP 差值(INFO - sample)
"""
if info is None:
return float(0 - dp)
m = cls._DP_RE.search(str(info))
if m:
return float(m.group(1)) - dp
return float(0 - dp)
def parse_vcf_row(self, row) -> pd.Series:
"""
row: tuple/list-like, 对应VCF的 0..9 列:
0 CHROM,1 POS,2 ID,3 REF,4 ALT,5 QUAL,6 FILTER,7 INFO,8 FORMAT,9 SAMPLE
"""
chr_id = row[0]
position = row[1]
ref_base = row[3]
alt_base = row[4]
fmt_keys = str(row[8]).split(":")
sample_vals = str(row[9]).split(":")
fmt = dict(zip(fmt_keys, sample_vals))
data = {"A": 0, "T": 0, "G": 0, "C": 0}
gt = fmt.get("GT", None)
gt_base = self._gt_to_bases(gt, ref_base, alt_base)
# AD
ad = fmt.get("AD", None)
ad_sum = 0
if ad not in {None, ".", ""}:
try:
ad_int = [int(i) for i in str(ad).split(",")]
ad_sum = sum(ad_int)
bases = [ref_base] if alt_base in {None, ".", ""} else [ref_base] + str(alt_base).split(",")
for b, n in zip(bases, ad_int):
if b in data: # only A/T/G/C
data[b] = n
except ValueError:
ad = None
ad_sum = 0
else:
ad = None
# DP
dp = 0
v = fmt.get("DP", None)
if v not in {None, ".", ""}:
try:
dp = int(v)
except ValueError:
dp = 0
dp_diff = self.dp_difference(row[7], dp)
# GQ / RGQ
gq = None
for k in ("GQ", "RGQ"):
v = fmt.get(k, None)
if v not in {None, ".", ""}:
try:
gq = float(v)
except ValueError:
gq = None
break
# PL
pls = fmt.get("PL", None)
if pls in {".", ""}:
pls = None
if ad_sum > 0:
afA = data["A"] / ad_sum
afT = data["T"] / ad_sum
afG = data["G"] / ad_sum
afC = data["C"] / ad_sum
else:
afA = afT = afG = afC = 0.0
return pd.Series(
[
chr_id, position,
data["A"], data["T"], data["G"], data["C"],
afA, afT, afG, afC,
gt, gt_base,
ad, ad_sum,
dp, dp_diff,
gq, pls
],
index=[
"chr", "pos",
"A", "T", "G", "C",
"A_frac", "T_frac", "G_frac", "C_frac",
"GT", "GT_base",
"AD", "AD_sum",
"DP", "DP_diff",
"GQ", "PL"
]
)
def read_vcf(self, input_vcf: str) -> Tuple[pd.DataFrame, List[str], List[str]]:
"""
返回: (vcf_df, vcf_header(##), header_line(#CHROM...))
"""
is_gzipped = input_vcf.endswith(".gz")
open_func = gzip.open if is_gzipped else open
mode = "rt" if is_gzipped else "r"
vcf_header: List[str] = []
header_line: List[str] = []
# 逐行读header,避免 readlines() 占内存
with open_func(input_vcf, mode) as f:
for line in f:
if line.startswith("##"):
vcf_header.append(line.rstrip("\n"))
continue
if line.startswith("#CHROM"):
header_line = line.strip().split()
break
# 主体用 pandas 读(comment='#' 会跳过所有header行)
vcf_df = pd.read_csv(
input_vcf,
comment="#",
sep="\t",
header=None,
compression="infer",
dtype={0: str, 1: int, 3: str, 4: str, 7: str, 8: str, 9: str},
low_memory=False,
)
return vcf_df, vcf_header, header_line
def parse_vcf_df(self, vcf_df: pd.DataFrame) -> pd.DataFrame:
"""
只解析,不过滤。比 iterrows() 更快。
"""
out = []
# itertuples 更快;name=None 返回普通tuple
for row in vcf_df.itertuples(index=False, name=None):
out.append(self.parse_vcf_row(row))
return pd.DataFrame(out)
def main(self) -> pd.DataFrame:
# logger.info(f"starting: {self.input_vcf}")
vcf_df, vcf_header, header_line = self.read_vcf(self.input_vcf)
parsed = self.parse_vcf_df(vcf_df)
# logger.info(f"done: {self.input_vcf}")
return parsed
| 1 | import os |
| 2 | import re |
| 3 | import gzip |
| 4 | from typing import Tuple, List |
| 5 | import pandas as pd |
| 6 | |
| 7 | class QC: |
| 8 | _DP_RE = re.compile(r"(?:^|;)\s*DP=(\d+(?:\.\d+)?)") |
| 9 | |
| 10 | def __init__( |
| 11 | self, |
| 12 | input_vcf: str |
| 13 | ): |
| 14 | self.input_vcf = input_vcf |
| 15 | |
| 16 | def _gt_to_bases(self, gt: str, ref: str, alt: str) -> str | None: |
| 17 | if gt is None: |
| 18 | return None |
| 19 | gt = str(gt) |
| 20 | if gt in {".", "./.", ".|."}: |
| 21 | return None |
| 22 | |
| 23 | alts = [] if alt in {None, ".", ""} else str(alt).split(",") |
| 24 | alleles = [ref] + alts # 0->REF, 1..->ALT |
| 25 | |
| 26 | if "/" in gt: |
| 27 | parts = gt.split("/") |
| 28 | elif "|" in gt: |
| 29 | parts = gt.split("|") |
| 30 | else: |
| 31 | parts = [gt] # haploid like "1" |
| 32 | |
| 33 | bases = [] |
| 34 | for p in parts: |
| 35 | if p in {".", ""}: |
| 36 | bases.append(".") |
| 37 | continue |
| 38 | try: |
| 39 | idx = int(p) |
| 40 | except ValueError: |
| 41 | return None |
| 42 | if 0 <= idx < len(alleles): |
| 43 | bases.append(alleles[idx]) |
| 44 | else: |
| 45 | return None |
| 46 | |
| 47 | return "/".join(bases) |
| 48 | |
| 49 | @classmethod |
| 50 | def dp_difference(cls, info: str, dp: int) -> float: |
| 51 | """ |
| 52 | INFO列里的DP 与 sample列里的DP 差值(INFO - sample) |
| 53 | """ |
| 54 | if info is None: |
| 55 | return float(0 - dp) |
| 56 | m = cls._DP_RE.search(str(info)) |
| 57 | if m: |
| 58 | return float(m.group(1)) - dp |
| 59 | return float(0 - dp) |
| 60 | |
| 61 | def parse_vcf_row(self, row) -> pd.Series: |
| 62 | """ |
| 63 | row: tuple/list-like, 对应VCF的 0..9 列: |
| 64 | 0 CHROM,1 POS,2 ID,3 REF,4 ALT,5 QUAL,6 FILTER,7 INFO,8 FORMAT,9 SAMPLE |
| 65 | """ |
| 66 | chr_id = row[0] |
| 67 | position = row[1] |
| 68 | ref_base = row[3] |
| 69 | alt_base = row[4] |
| 70 | |
| 71 | fmt_keys = str(row[8]).split(":") |
| 72 | sample_vals = str(row[9]).split(":") |
| 73 | fmt = dict(zip(fmt_keys, sample_vals)) |
| 74 | |
| 75 | data = {"A": 0, "T": 0, "G": 0, "C": 0} |
| 76 | |
| 77 | gt = fmt.get("GT", None) |
| 78 | gt_base = self._gt_to_bases(gt, ref_base, alt_base) |
| 79 | |
| 80 | # AD |
| 81 | ad = fmt.get("AD", None) |
| 82 | ad_sum = 0 |
| 83 | if ad not in {None, ".", ""}: |
| 84 | try: |
| 85 | ad_int = [int(i) for i in str(ad).split(",")] |
| 86 | ad_sum = sum(ad_int) |
| 87 | |
| 88 | bases = [ref_base] if alt_base in {None, ".", ""} else [ref_base] + str(alt_base).split(",") |
| 89 | for b, n in zip(bases, ad_int): |
| 90 | if b in data: # only A/T/G/C |
| 91 | data[b] = n |
| 92 | except ValueError: |
| 93 | ad = None |
| 94 | ad_sum = 0 |
| 95 | else: |
| 96 | ad = None |
| 97 | |
| 98 | # DP |
| 99 | dp = 0 |
| 100 | v = fmt.get("DP", None) |
| 101 | if v not in {None, ".", ""}: |
| 102 | try: |
| 103 | dp = int(v) |
| 104 | except ValueError: |
| 105 | dp = 0 |
| 106 | |
| 107 | dp_diff = self.dp_difference(row[7], dp) |
| 108 | |
| 109 | # GQ / RGQ |
| 110 | gq = None |
| 111 | for k in ("GQ", "RGQ"): |
| 112 | v = fmt.get(k, None) |
| 113 | if v not in {None, ".", ""}: |
| 114 | try: |
| 115 | gq = float(v) |
| 116 | except ValueError: |
| 117 | gq = None |
| 118 | break |
| 119 | |
| 120 | # PL |
| 121 | pls = fmt.get("PL", None) |
| 122 | if pls in {".", ""}: |
| 123 | pls = None |
| 124 | |
| 125 | if ad_sum > 0: |
| 126 | afA = data["A"] / ad_sum |
| 127 | afT = data["T"] / ad_sum |
| 128 | afG = data["G"] / ad_sum |
| 129 | afC = data["C"] / ad_sum |
| 130 | else: |
| 131 | afA = afT = afG = afC = 0.0 |
| 132 | |
| 133 | return pd.Series( |
| 134 | [ |
| 135 | chr_id, position, |
| 136 | data["A"], data["T"], data["G"], data["C"], |
| 137 | afA, afT, afG, afC, |
| 138 | gt, gt_base, |
| 139 | ad, ad_sum, |
| 140 | dp, dp_diff, |
| 141 | gq, pls |
| 142 | ], |
| 143 | index=[ |
| 144 | "chr", "pos", |
| 145 | "A", "T", "G", "C", |
| 146 | "A_frac", "T_frac", "G_frac", "C_frac", |
| 147 | "GT", "GT_base", |
| 148 | "AD", "AD_sum", |
| 149 | "DP", "DP_diff", |
| 150 | "GQ", "PL" |
| 151 | ] |
| 152 | ) |
| 153 | |
| 154 | def read_vcf(self, input_vcf: str) -> Tuple[pd.DataFrame, List[str], List[str]]: |
| 155 | """ |
| 156 | 返回: (vcf_df, vcf_header(##), header_line(#CHROM...)) |
| 157 | """ |
| 158 | is_gzipped = input_vcf.endswith(".gz") |
| 159 | open_func = gzip.open if is_gzipped else open |
| 160 | mode = "rt" if is_gzipped else "r" |
| 161 | |
| 162 | vcf_header: List[str] = [] |
| 163 | header_line: List[str] = [] |
| 164 | |
| 165 | # 逐行读header,避免 readlines() 占内存 |
| 166 | with open_func(input_vcf, mode) as f: |
| 167 | for line in f: |
| 168 | if line.startswith("##"): |
| 169 | vcf_header.append(line.rstrip("\n")) |
| 170 | continue |
| 171 | if line.startswith("#CHROM"): |
| 172 | header_line = line.strip().split() |
| 173 | break |
| 174 | |
| 175 | # 主体用 pandas 读(comment='#' 会跳过所有header行) |
| 176 | vcf_df = pd.read_csv( |
| 177 | input_vcf, |
| 178 | comment="#", |
| 179 | sep="\t", |
| 180 | header=None, |
| 181 | compression="infer", |
| 182 | dtype={0: str, 1: int, 3: str, 4: str, 7: str, 8: str, 9: str}, |
| 183 | low_memory=False, |
| 184 | ) |
| 185 | return vcf_df, vcf_header, header_line |
| 186 | |
| 187 | def parse_vcf_df(self, vcf_df: pd.DataFrame) -> pd.DataFrame: |
| 188 | """ |
| 189 | 只解析,不过滤。比 iterrows() 更快。 |
| 190 | """ |
| 191 | out = [] |
| 192 | # itertuples 更快;name=None 返回普通tuple |
| 193 | for row in vcf_df.itertuples(index=False, name=None): |
| 194 | out.append(self.parse_vcf_row(row)) |
| 195 | return pd.DataFrame(out) |
| 196 | |
| 197 | def main(self) -> pd.DataFrame: |
| 198 | # logger.info(f"starting: {self.input_vcf}") |
| 199 | vcf_df, vcf_header, header_line = self.read_vcf(self.input_vcf) |
| 200 | parsed = self.parse_vcf_df(vcf_df) |
| 201 | # logger.info(f"done: {self.input_vcf}") |
| 202 | return parsed |