最後活躍 1 month ago

parser_vcf.py 原始檔案
1import os
2import re
3import gzip
4from typing import Tuple, List
5import pandas as pd
6
7class QC:
8 _DP_RE = re.compile(r"(?:^|;)\s*DP=(\d+(?:\.\d+)?)")
9
10 def __init__(
11 self,
12 input_vcf: str
13 ):
14 self.input_vcf = input_vcf
15
16 def _gt_to_bases(self, gt: str, ref: str, alt: str) -> str | None:
17 if gt is None:
18 return None
19 gt = str(gt)
20 if gt in {".", "./.", ".|."}:
21 return None
22
23 alts = [] if alt in {None, ".", ""} else str(alt).split(",")
24 alleles = [ref] + alts # 0->REF, 1..->ALT
25
26 if "/" in gt:
27 parts = gt.split("/")
28 elif "|" in gt:
29 parts = gt.split("|")
30 else:
31 parts = [gt] # haploid like "1"
32
33 bases = []
34 for p in parts:
35 if p in {".", ""}:
36 bases.append(".")
37 continue
38 try:
39 idx = int(p)
40 except ValueError:
41 return None
42 if 0 <= idx < len(alleles):
43 bases.append(alleles[idx])
44 else:
45 return None
46
47 return "/".join(bases)
48
49 @classmethod
50 def dp_difference(cls, info: str, dp: int) -> float:
51 """
52 INFO列里的DP 与 sample列里的DP 差值(INFO - sample)
53 """
54 if info is None:
55 return float(0 - dp)
56 m = cls._DP_RE.search(str(info))
57 if m:
58 return float(m.group(1)) - dp
59 return float(0 - dp)
60
61 def parse_vcf_row(self, row) -> pd.Series:
62 """
63 row: tuple/list-like, 对应VCF的 0..9 列:
64 0 CHROM,1 POS,2 ID,3 REF,4 ALT,5 QUAL,6 FILTER,7 INFO,8 FORMAT,9 SAMPLE
65 """
66 chr_id = row[0]
67 position = row[1]
68 ref_base = row[3]
69 alt_base = row[4]
70
71 fmt_keys = str(row[8]).split(":")
72 sample_vals = str(row[9]).split(":")
73 fmt = dict(zip(fmt_keys, sample_vals))
74
75 data = {"A": 0, "T": 0, "G": 0, "C": 0}
76
77 gt = fmt.get("GT", None)
78 gt_base = self._gt_to_bases(gt, ref_base, alt_base)
79
80 # AD
81 ad = fmt.get("AD", None)
82 ad_sum = 0
83 if ad not in {None, ".", ""}:
84 try:
85 ad_int = [int(i) for i in str(ad).split(",")]
86 ad_sum = sum(ad_int)
87
88 bases = [ref_base] if alt_base in {None, ".", ""} else [ref_base] + str(alt_base).split(",")
89 for b, n in zip(bases, ad_int):
90 if b in data: # only A/T/G/C
91 data[b] = n
92 except ValueError:
93 ad = None
94 ad_sum = 0
95 else:
96 ad = None
97
98 # DP
99 dp = 0
100 v = fmt.get("DP", None)
101 if v not in {None, ".", ""}:
102 try:
103 dp = int(v)
104 except ValueError:
105 dp = 0
106
107 dp_diff = self.dp_difference(row[7], dp)
108
109 # GQ / RGQ
110 gq = None
111 for k in ("GQ", "RGQ"):
112 v = fmt.get(k, None)
113 if v not in {None, ".", ""}:
114 try:
115 gq = float(v)
116 except ValueError:
117 gq = None
118 break
119
120 # PL
121 pls = fmt.get("PL", None)
122 if pls in {".", ""}:
123 pls = None
124
125 if ad_sum > 0:
126 afA = data["A"] / ad_sum
127 afT = data["T"] / ad_sum
128 afG = data["G"] / ad_sum
129 afC = data["C"] / ad_sum
130 else:
131 afA = afT = afG = afC = 0.0
132
133 return pd.Series(
134 [
135 chr_id, position,
136 data["A"], data["T"], data["G"], data["C"],
137 afA, afT, afG, afC,
138 gt, gt_base,
139 ad, ad_sum,
140 dp, dp_diff,
141 gq, pls
142 ],
143 index=[
144 "chr", "pos",
145 "A", "T", "G", "C",
146 "A_frac", "T_frac", "G_frac", "C_frac",
147 "GT", "GT_base",
148 "AD", "AD_sum",
149 "DP", "DP_diff",
150 "GQ", "PL"
151 ]
152 )
153
154 def read_vcf(self, input_vcf: str) -> Tuple[pd.DataFrame, List[str], List[str]]:
155 """
156 返回: (vcf_df, vcf_header(##), header_line(#CHROM...))
157 """
158 is_gzipped = input_vcf.endswith(".gz")
159 open_func = gzip.open if is_gzipped else open
160 mode = "rt" if is_gzipped else "r"
161
162 vcf_header: List[str] = []
163 header_line: List[str] = []
164
165 # 逐行读header,避免 readlines() 占内存
166 with open_func(input_vcf, mode) as f:
167 for line in f:
168 if line.startswith("##"):
169 vcf_header.append(line.rstrip("\n"))
170 continue
171 if line.startswith("#CHROM"):
172 header_line = line.strip().split()
173 break
174
175 # 主体用 pandas 读(comment='#' 会跳过所有header行)
176 vcf_df = pd.read_csv(
177 input_vcf,
178 comment="#",
179 sep="\t",
180 header=None,
181 compression="infer",
182 dtype={0: str, 1: int, 3: str, 4: str, 7: str, 8: str, 9: str},
183 low_memory=False,
184 )
185 return vcf_df, vcf_header, header_line
186
187 def parse_vcf_df(self, vcf_df: pd.DataFrame) -> pd.DataFrame:
188 """
189 只解析,不过滤。比 iterrows() 更快。
190 """
191 out = []
192 # itertuples 更快;name=None 返回普通tuple
193 for row in vcf_df.itertuples(index=False, name=None):
194 out.append(self.parse_vcf_row(row))
195 return pd.DataFrame(out)
196
197 def main(self) -> pd.DataFrame:
198 # logger.info(f"starting: {self.input_vcf}")
199 vcf_df, vcf_header, header_line = self.read_vcf(self.input_vcf)
200 parsed = self.parse_vcf_df(vcf_df)
201 # logger.info(f"done: {self.input_vcf}")
202 return parsed