import os import re import gzip from typing import Tuple, List import pandas as pd class QC: _DP_RE = re.compile(r"(?:^|;)\s*DP=(\d+(?:\.\d+)?)") def __init__( self, input_vcf: str ): self.input_vcf = input_vcf def _gt_to_bases(self, gt: str, ref: str, alt: str) -> str | None: if gt is None: return None gt = str(gt) if gt in {".", "./.", ".|."}: return None alts = [] if alt in {None, ".", ""} else str(alt).split(",") alleles = [ref] + alts # 0->REF, 1..->ALT if "/" in gt: parts = gt.split("/") elif "|" in gt: parts = gt.split("|") else: parts = [gt] # haploid like "1" bases = [] for p in parts: if p in {".", ""}: bases.append(".") continue try: idx = int(p) except ValueError: return None if 0 <= idx < len(alleles): bases.append(alleles[idx]) else: return None return "/".join(bases) @classmethod def dp_difference(cls, info: str, dp: int) -> float: """ INFO列里的DP 与 sample列里的DP 差值(INFO - sample) """ if info is None: return float(0 - dp) m = cls._DP_RE.search(str(info)) if m: return float(m.group(1)) - dp return float(0 - dp) def parse_vcf_row(self, row) -> pd.Series: """ row: tuple/list-like, 对应VCF的 0..9 列: 0 CHROM,1 POS,2 ID,3 REF,4 ALT,5 QUAL,6 FILTER,7 INFO,8 FORMAT,9 SAMPLE """ chr_id = row[0] position = row[1] ref_base = row[3] alt_base = row[4] fmt_keys = str(row[8]).split(":") sample_vals = str(row[9]).split(":") fmt = dict(zip(fmt_keys, sample_vals)) data = {"A": 0, "T": 0, "G": 0, "C": 0} gt = fmt.get("GT", None) gt_base = self._gt_to_bases(gt, ref_base, alt_base) # AD ad = fmt.get("AD", None) ad_sum = 0 if ad not in {None, ".", ""}: try: ad_int = [int(i) for i in str(ad).split(",")] ad_sum = sum(ad_int) bases = [ref_base] if alt_base in {None, ".", ""} else [ref_base] + str(alt_base).split(",") for b, n in zip(bases, ad_int): if b in data: # only A/T/G/C data[b] = n except ValueError: ad = None ad_sum = 0 else: ad = None # DP dp = 0 v = fmt.get("DP", None) if v not in {None, ".", ""}: try: dp = int(v) except ValueError: dp = 0 dp_diff = self.dp_difference(row[7], dp) # GQ / RGQ gq = None for k in ("GQ", "RGQ"): v = fmt.get(k, None) if v not in {None, ".", ""}: try: gq = float(v) except ValueError: gq = None break # PL pls = fmt.get("PL", None) if pls in {".", ""}: pls = None if ad_sum > 0: afA = data["A"] / ad_sum afT = data["T"] / ad_sum afG = data["G"] / ad_sum afC = data["C"] / ad_sum else: afA = afT = afG = afC = 0.0 return pd.Series( [ chr_id, position, data["A"], data["T"], data["G"], data["C"], afA, afT, afG, afC, gt, gt_base, ad, ad_sum, dp, dp_diff, gq, pls ], index=[ "chr", "pos", "A", "T", "G", "C", "A_frac", "T_frac", "G_frac", "C_frac", "GT", "GT_base", "AD", "AD_sum", "DP", "DP_diff", "GQ", "PL" ] ) def read_vcf(self, input_vcf: str) -> Tuple[pd.DataFrame, List[str], List[str]]: """ 返回: (vcf_df, vcf_header(##), header_line(#CHROM...)) """ is_gzipped = input_vcf.endswith(".gz") open_func = gzip.open if is_gzipped else open mode = "rt" if is_gzipped else "r" vcf_header: List[str] = [] header_line: List[str] = [] # 逐行读header,避免 readlines() 占内存 with open_func(input_vcf, mode) as f: for line in f: if line.startswith("##"): vcf_header.append(line.rstrip("\n")) continue if line.startswith("#CHROM"): header_line = line.strip().split() break # 主体用 pandas 读(comment='#' 会跳过所有header行) vcf_df = pd.read_csv( input_vcf, comment="#", sep="\t", header=None, compression="infer", dtype={0: str, 1: int, 3: str, 4: str, 7: str, 8: str, 9: str}, low_memory=False, ) return vcf_df, vcf_header, header_line def parse_vcf_df(self, vcf_df: pd.DataFrame) -> pd.DataFrame: """ 只解析,不过滤。比 iterrows() 更快。 """ out = [] # itertuples 更快;name=None 返回普通tuple for row in vcf_df.itertuples(index=False, name=None): out.append(self.parse_vcf_row(row)) return pd.DataFrame(out) def main(self) -> pd.DataFrame: # logger.info(f"starting: {self.input_vcf}") vcf_df, vcf_header, header_line = self.read_vcf(self.input_vcf) parsed = self.parse_vcf_df(vcf_df) # logger.info(f"done: {self.input_vcf}") return parsed