Naposledy aktivní 1 month ago

Revize 1ff3ae76fb2d1e202f7011adbdd8f7fb7e5500bf

parser_vcf.py Raw
1import os
2import re
3import gzip
4from typing import Tuple, List
5import pandas as pd
6
7class QC:
8 _DP_RE = re.compile(r"(?:^|;)\s*DP=(\d+(?:\.\d+)?)")
9
10 def __init__(
11 self,
12 uid: str,
13 library_num: int,
14 output_qc: str,
15 input_vcf: str,
16 basic_reads: str,
17 bamstats: str,
18 depth: int
19 ):
20 self.uid = uid
21 self.library_num = library_num
22 self.output_qc = output_qc
23 self.input_vcf = input_vcf
24 self.basic_reads = basic_reads
25 self.bamstats = bamstats
26 self.depth = depth
27
28 def _gt_to_bases(self, gt: str, ref: str, alt: str) -> str | None:
29 if gt is None:
30 return None
31 gt = str(gt)
32 if gt in {".", "./.", ".|."}:
33 return None
34
35 alts = [] if alt in {None, ".", ""} else str(alt).split(",")
36 alleles = [ref] + alts # 0->REF, 1..->ALT
37
38 if "/" in gt:
39 parts = gt.split("/")
40 elif "|" in gt:
41 parts = gt.split("|")
42 else:
43 parts = [gt] # haploid like "1"
44
45 bases = []
46 for p in parts:
47 if p in {".", ""}:
48 bases.append(".")
49 continue
50 try:
51 idx = int(p)
52 except ValueError:
53 return None
54 if 0 <= idx < len(alleles):
55 bases.append(alleles[idx])
56 else:
57 return None
58
59 return "/".join(bases)
60
61 @classmethod
62 def dp_difference(cls, info: str, dp: int) -> float:
63 """
64 INFO列里的DP 与 sample列里的DP 差值(INFO - sample)
65 """
66 if info is None:
67 return float(0 - dp)
68 m = cls._DP_RE.search(str(info))
69 if m:
70 return float(m.group(1)) - dp
71 return float(0 - dp)
72
73 def parse_vcf_row(self, row) -> pd.Series:
74 """
75 row: tuple/list-like, 对应VCF的 0..9 列:
76 0 CHROM,1 POS,2 ID,3 REF,4 ALT,5 QUAL,6 FILTER,7 INFO,8 FORMAT,9 SAMPLE
77 """
78 chr_id = row[0]
79 position = row[1]
80 ref_base = row[3]
81 alt_base = row[4]
82
83 fmt_keys = str(row[8]).split(":")
84 sample_vals = str(row[9]).split(":")
85 fmt = dict(zip(fmt_keys, sample_vals))
86
87 data = {"A": 0, "T": 0, "G": 0, "C": 0}
88
89 gt = fmt.get("GT", None)
90 gt_base = self._gt_to_bases(gt, ref_base, alt_base)
91
92 # AD
93 ad = fmt.get("AD", None)
94 ad_sum = 0
95 if ad not in {None, ".", ""}:
96 try:
97 ad_int = [int(i) for i in str(ad).split(",")]
98 ad_sum = sum(ad_int)
99
100 bases = [ref_base] if alt_base in {None, ".", ""} else [ref_base] + str(alt_base).split(",")
101 for b, n in zip(bases, ad_int):
102 if b in data: # only A/T/G/C
103 data[b] = n
104 except ValueError:
105 ad = None
106 ad_sum = 0
107 else:
108 ad = None
109
110 # DP
111 dp = 0
112 v = fmt.get("DP", None)
113 if v not in {None, ".", ""}:
114 try:
115 dp = int(v)
116 except ValueError:
117 dp = 0
118
119 dp_diff = self.dp_difference(row[7], dp)
120
121 # GQ / RGQ
122 gq = None
123 for k in ("GQ", "RGQ"):
124 v = fmt.get(k, None)
125 if v not in {None, ".", ""}:
126 try:
127 gq = float(v)
128 except ValueError:
129 gq = None
130 break
131
132 # PL
133 pls = fmt.get("PL", None)
134 if pls in {".", ""}:
135 pls = None
136
137 if ad_sum > 0:
138 afA = data["A"] / ad_sum
139 afT = data["T"] / ad_sum
140 afG = data["G"] / ad_sum
141 afC = data["C"] / ad_sum
142 else:
143 afA = afT = afG = afC = 0.0
144
145 return pd.Series(
146 [
147 chr_id, position,
148 data["A"], data["T"], data["G"], data["C"],
149 afA, afT, afG, afC,
150 gt, gt_base,
151 ad, ad_sum,
152 dp, dp_diff,
153 gq, pls
154 ],
155 index=[
156 "chr", "pos",
157 "A", "T", "G", "C",
158 "A_frac", "T_frac", "G_frac", "C_frac",
159 "GT", "GT_base",
160 "AD", "AD_sum",
161 "DP", "DP_diff",
162 "GQ", "PL"
163 ]
164 )
165
166 def read_vcf(self, input_vcf: str) -> Tuple[pd.DataFrame, List[str], List[str]]:
167 """
168 返回: (vcf_df, vcf_header(##), header_line(#CHROM...))
169 """
170 is_gzipped = input_vcf.endswith(".gz")
171 open_func = gzip.open if is_gzipped else open
172 mode = "rt" if is_gzipped else "r"
173
174 vcf_header: List[str] = []
175 header_line: List[str] = []
176
177 # 逐行读header,避免 readlines() 占内存
178 with open_func(input_vcf, mode) as f:
179 for line in f:
180 if line.startswith("##"):
181 vcf_header.append(line.rstrip("\n"))
182 continue
183 if line.startswith("#CHROM"):
184 header_line = line.strip().split()
185 break
186
187 # 主体用 pandas 读(comment='#' 会跳过所有header行)
188 vcf_df = pd.read_csv(
189 input_vcf,
190 comment="#",
191 sep="\t",
192 header=None,
193 compression="infer",
194 dtype={0: str, 1: int, 3: str, 4: str, 7: str, 8: str, 9: str},
195 low_memory=False,
196 )
197 return vcf_df, vcf_header, header_line
198
199 def parse_vcf_df(self, vcf_df: pd.DataFrame) -> pd.DataFrame:
200 """
201 只解析,不过滤。比 iterrows() 更快。
202 """
203 out = []
204 # itertuples 更快;name=None 返回普通tuple
205 for row in vcf_df.itertuples(index=False, name=None):
206 out.append(self.parse_vcf_row(row))
207 return pd.DataFrame(out)
208
209 def main(self) -> pd.DataFrame:
210 # logger.info(f"starting: {self.input_vcf}")
211 vcf_df, vcf_header, header_line = self.read_vcf(self.input_vcf)
212 parsed = self.parse_vcf_df(vcf_df)
213 # logger.info(f"done: {self.input_vcf}")
214 return parsed