-
Notifications
You must be signed in to change notification settings - Fork 0
/
bamtail.py
219 lines (175 loc) · 7.2 KB
/
bamtail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import argparse
import gzip
import io
import os
import re
import struct
import sys
"""Perform a ``tail``-like operation on BAM files.
This tool is used to return the position of an alignment near the end of BAM
file(s). This is useful for monitoring progress during generation of large BAM
files where the resulting file is sorted by position.
The BAM specification is referenced heavily throughout. It can be found at:
https://samtools.github.io/hts-specs/SAMv1.pdf
"""
__version__ = '1.0'
def parse_arguments():
"""Parse the command line arguments.
Returns:
Parsed arguments.
"""
parser = argparse.ArgumentParser(description='tail for BAMs')
parser.add_argument(
'filenames',
help='BAMs on which to perform the tail operation',
nargs='*',
metavar='FILE'
)
parser.add_argument(
'--version',
'-v',
help='print the version',
action='store_true'
)
return parser.parse_args()
def decompress_block(byte_stream):
"""Interpret a byte string as a BGZF block and return its decompressed
contents.
BGZF format is gunzip compatible, so the gzip module can perform the
decompression.
Args:
byte_stream (bytes): Complete BGZF block to be decompressed.
Returns:
Bytes representing the decompressed block.
"""
byte_stream = io.BytesIO(byte_stream)
return gzip.GzipFile(fileobj=byte_stream).read()
def get_final_bam_block(filename, size, tailsize=124000):
"""Return the decompressed data from the final BGZF block in the BAM file.
Data is read from the end of the file, and start positions of BGZF blocks
are identified by a sequence of 8-bit integers as outlined in the BAM spec.
The final complete BGZF block is identified as the data between the
penultimate and final occurrences of this sequence. In the case that there
are no incomplete blocks at the end of the file (such as when the file is
complete), the data returned is from the penultimate rather than final
BGZF block.
Args:
filename: Path of BAM.
size (int): Total filesize of BAM.
tailsize (int, optional): Number of bytes to look at from the end of
the BAM file, defaulting to 124000. This should be large enough
to read an entire unbroken BGZF block in addition to any incomplete
block at the end of the file.
Returns:
Bytes representing the ungzipped final complete block of data in the
BAM file.
Raises:
RuntimeError: Fewer than two BGZF block beginnings were discovered,
indicating that not enough data was read.
"""
with open(filename, 'rb') as bamfile:
bamfile.seek(size-tailsize)
tail = bamfile.read(tailsize)
search_bytes = struct.pack('<4B', 31, 139, 8, 4) # from BAM spec
block_starts = [m.start() for m in re.finditer(search_bytes, tail)]
if len(block_starts) < 2:
raise RuntimeError('not enough data')
start, end = block_starts[-2:]
return decompress_block(tail[start:end])
def final_alignment_position(bam_data):
"""Find the reference index and position of the final alignment in the BGZF
block.
Assume that alignment blocks do not span BGZF blocks (this is a strong
assumption (!), but it seems to be true for every BAM file I've
encountered). Iterate through all the alignment blocks until all data is
read and return the values for the final alignment.
Args:
bam_data (bytes): Decompressed contents of a BGZF block.
Returns:
A tuple of ints, where the first int is the index of the reference
sequence (from the header) or -1 for an unmapped alignment, and the
second int is the position.
"""
while bam_data:
block_size, ref_id, pos = struct.unpack_from('<3i', bam_data)
bam_data = bam_data[block_size+4:] # block_size excludes itself
return ref_id, pos
def get_bam_ref_sequences(filename, headsize=64000):
"""Read the list of reference sequence names from the beginning of the BAM
file.
The first BGZF block in the BAM file is read and decompressed, and all the
reference sequence names are read. Also ensures that the file is actually
a BAM file by looking for the BAM magic string outlined in the spec. In
cases with very many reference sequences such that they do not all fit
within one BGZF block, this will fail! Consult the BAM spec for more detail
regarding the format.
Args:
filename: Path of BAM to get the reference sequences from.
headsize (int, optional): number of bytes to read so that the complete
first BGZF block is read. Defaults to 64000.
Returns:
List of reference sequence names.
Raises:
RuntimeError: The file appears to not be a BAM file.
"""
with open(filename, 'rb') as bamfile:
head = bamfile.read(headsize)
# read a bunch of fields, where the final field is the size of the block
# minus 1, then decompress the block
fields = struct.unpack_from('<4BI2BH2B2H', head)
block_size = fields[-1] + 1
bam_data = decompress_block(head[:block_size])
offset = 0 # keep track how much data we've read
magic, l_text = struct.unpack_from('<4si', bam_data)
if magic != b'BAM\x01': # magic string
raise RuntimeError('{} appears to not be a BAM file'.format(filename))
offset += 8 + l_text
n_ref, = struct.unpack_from('<i', bam_data, offset)
offset += 4
ref_sequences = list()
for _ in range(n_ref):
l_name, = struct.unpack_from('<i', bam_data, offset)
offset += 4
name, = struct.unpack_from('<{:d}s'.format(l_name), bam_data, offset)
ref_sequences.append(name[:-1]) # sequence names are NUL terminated
offset += l_name + 4 # skipping l_ref
return ref_sequences
def process_file(filename):
"""Find the chromosome and position of the final alignment in the final
BGZF block discovered in the BAM file.
Args:
filename: Path of BAM on which to perform tail.
Returns:
String of the position of the final alignment of the BAM, e.g.
'chr1:100', or 'unmapped' if the last alignment is not mapped to a
reference sequence.
"""
ref_sequences = get_bam_ref_sequences(filename)
size = os.path.getsize(filename)
bam_data = get_final_bam_block(filename, size)
chr, pos = final_alignment_position(bam_data)
if chr == -1:
return 'unmapped'
else:
chr = ref_sequences[chr].decode('utf-8')
pos += 1 # BAM positions are 0-indexed
return '{}:{}'.format(chr, pos)
def main():
"""Perform the tail operation on all given BAMs. If more than one BAM is
given, prefix the output with the filename.
Args:
filenames: A sequence of paths to BAMs on which to perform tail.
"""
args = parse_arguments()
if args.version:
print('bamtail version {}'.format(__version__))
if not args.filenames:
sys.exit(0)
for filename in args.filenames:
if len(args.filenames) > 1:
print('{}: '.format(filename), end='')
print(process_file(filename))
if __name__ == '__main__':
main()