-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
markdown.py
341 lines (284 loc) · 11.4 KB
/
markdown.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
from collections import namedtuple
from functools import partial, wraps
from markdown_it import MarkdownIt
from mdit_plain.renderer import RendererPlain
from mdit_py_plugins.front_matter import front_matter_plugin
from yaml import safe_load
from nltk.corpus.reader.api import CategorizedCorpusReader
from nltk.corpus.reader.plaintext import PlaintextCorpusReader
from nltk.corpus.reader.util import concat, read_blankline_block
from nltk.tokenize import blankline_tokenize, sent_tokenize, word_tokenize
def comma_separated_string_args(func):
"""
A decorator that allows a function to be called with
a single string of comma-separated values which become
individual function arguments.
"""
@wraps(func)
def wrapper(*args, **kwargs):
_args = list()
for arg in args:
if isinstance(arg, str):
_args.append({part.strip() for part in arg.split(",")})
elif isinstance(arg, list):
_args.append(set(arg))
else:
_args.append(arg)
for name, value in kwargs.items():
if isinstance(value, str):
kwargs[name] = {part.strip() for part in value.split(",")}
return func(*_args, **kwargs)
return wrapper
def read_parse_blankline_block(stream, parser):
block = read_blankline_block(stream)
if block:
return [parser.render(block[0])]
return block
class MarkdownBlock:
def __init__(self, content):
self.content = content
self.truncate_at = 16
def __repr__(self):
return f"{self.__class__.__name__}(content={repr(str(self))})"
def __str__(self):
return (
f"{self.content[:self.truncate_at]}"
f"{'...' if len(self.content) > self.truncate_at else ''}"
)
@property
def raw(self):
return self.content
@property
def words(self):
return word_tokenize(self.content)
@property
def sents(self):
return [word_tokenize(sent) for sent in sent_tokenize(self.content)]
@property
def paras(self):
return [
[word_tokenize(sent) for sent in sent_tokenize(para)]
for para in blankline_tokenize(self.content)
]
class CodeBlock(MarkdownBlock):
def __init__(self, language, *args):
self.language = language
super().__init__(*args)
@property
def sents(self):
return [word_tokenize(line) for line in self.content.splitlines()]
@property
def lines(self):
return self.content.splitlines()
@property
def paras(self):
return [
[word_tokenize(line) for line in para.splitlines()]
for para in blankline_tokenize(self.content)
]
class MarkdownSection(MarkdownBlock):
def __init__(self, heading, level, *args):
self.heading = heading
self.level = level
super().__init__(*args)
Image = namedtuple("Image", "label, src, title")
Link = namedtuple("Link", "label, href, title")
List = namedtuple("List", "is_ordered, items")
class MarkdownCorpusReader(PlaintextCorpusReader):
def __init__(self, *args, parser=None, **kwargs):
self.parser = parser
if self.parser is None:
self.parser = MarkdownIt("commonmark", renderer_cls=RendererPlain)
self.parser.use(front_matter_plugin)
kwargs.setdefault(
"para_block_reader", partial(read_parse_blankline_block, parser=self.parser)
)
super().__init__(*args, **kwargs)
# This override takes care of removing markup.
def _read_word_block(self, stream):
words = list()
for para in self._para_block_reader(stream):
words.extend(self._word_tokenizer.tokenize(para))
return words
class CategorizedMarkdownCorpusReader(CategorizedCorpusReader, MarkdownCorpusReader):
"""
A reader for markdown corpora whose documents are divided into
categories based on their file identifiers.
Based on nltk.corpus.reader.plaintext.CategorizedPlaintextCorpusReader:
https://www.nltk.org/_modules/nltk/corpus/reader/api.html#CategorizedCorpusReader
"""
def __init__(self, *args, cat_field="tags", **kwargs):
"""
Initialize the corpus reader. Categorization arguments
(``cat_pattern``, ``cat_map``, and ``cat_file``) are passed to
the ``CategorizedCorpusReader`` constructor. The remaining arguments
are passed to the ``MarkdownCorpusReader`` constructor.
"""
cat_args = ["cat_pattern", "cat_map", "cat_file"]
if not any(arg in kwargs for arg in cat_args):
# Initialize with a blank map now,
# and try to build categories from document metadata later.
kwargs["cat_map"] = dict()
CategorizedCorpusReader.__init__(self, kwargs)
MarkdownCorpusReader.__init__(self, *args, **kwargs)
# Map file IDs to categories if self._map exists but is still empty:
if self._map is not None and not self._map:
for file_id in self._fileids:
metadata = self.metadata(file_id)
if metadata:
self._map[file_id] = metadata[0].get(cat_field, [])
### Begin CategorizedCorpusReader Overrides
@comma_separated_string_args
def categories(self, fileids=None):
return super().categories(fileids)
@comma_separated_string_args
def fileids(self, categories=None):
if categories is None:
return self._fileids
return super().fileids(categories)
### End CategorizedCorpusReader Overrides
### Begin MarkdownCorpusReader Overrides
@comma_separated_string_args
def raw(self, fileids=None, categories=None):
return super().raw(self._resolve(fileids, categories))
@comma_separated_string_args
def words(self, fileids=None, categories=None):
return super().words(self._resolve(fileids, categories))
@comma_separated_string_args
def sents(self, fileids=None, categories=None):
return super().sents(self._resolve(fileids, categories))
@comma_separated_string_args
def paras(self, fileids=None, categories=None):
return super().paras(self._resolve(fileids, categories))
### End MarkdownCorpusReader Overrides
def concatenated_view(self, reader, fileids, categories):
return concat(
[
self.CorpusView(path, reader, encoding=enc)
for (path, enc) in self.abspaths(
self._resolve(fileids, categories), include_encoding=True
)
]
)
def metadata_reader(self, stream):
return [
safe_load(t.content)
for t in self.parser.parse(stream.read())
if t.type == "front_matter"
]
@comma_separated_string_args
def metadata(self, fileids=None, categories=None):
return self.concatenated_view(self.metadata_reader, fileids, categories)
def blockquote_reader(self, stream):
tokens = self.parser.parse(stream.read())
opening_tokens = filter(
lambda t: t.level == 0 and t.type == "blockquote_open", tokens
)
closing_tokens = filter(
lambda t: t.level == 0 and t.type == "blockquote_close", tokens
)
blockquotes = list()
for o, c in zip(opening_tokens, closing_tokens):
opening_index = tokens.index(o)
closing_index = tokens.index(c, opening_index)
blockquotes.append(tokens[opening_index : closing_index + 1])
return [
MarkdownBlock(
self.parser.renderer.render(block, self.parser.options, env=None)
)
for block in blockquotes
]
@comma_separated_string_args
def blockquotes(self, fileids=None, categories=None):
return self.concatenated_view(self.blockquote_reader, fileids, categories)
def code_block_reader(self, stream):
return [
CodeBlock(
t.info,
t.content,
)
for t in self.parser.parse(stream.read())
if t.level == 0 and t.type in ("fence", "code_block")
]
@comma_separated_string_args
def code_blocks(self, fileids=None, categories=None):
return self.concatenated_view(self.code_block_reader, fileids, categories)
def image_reader(self, stream):
return [
Image(
child_token.content,
child_token.attrGet("src"),
child_token.attrGet("title"),
)
for inline_token in filter(
lambda t: t.type == "inline", self.parser.parse(stream.read())
)
for child_token in inline_token.children
if child_token.type == "image"
]
@comma_separated_string_args
def images(self, fileids=None, categories=None):
return self.concatenated_view(self.image_reader, fileids, categories)
def link_reader(self, stream):
return [
Link(
inline_token.children[i + 1].content,
child_token.attrGet("href"),
child_token.attrGet("title"),
)
for inline_token in filter(
lambda t: t.type == "inline", self.parser.parse(stream.read())
)
for i, child_token in enumerate(inline_token.children)
if child_token.type == "link_open"
]
@comma_separated_string_args
def links(self, fileids=None, categories=None):
return self.concatenated_view(self.link_reader, fileids, categories)
def list_reader(self, stream):
tokens = self.parser.parse(stream.read())
opening_types = ("bullet_list_open", "ordered_list_open")
opening_tokens = filter(
lambda t: t.level == 0 and t.type in opening_types, tokens
)
closing_types = ("bullet_list_close", "ordered_list_close")
closing_tokens = filter(
lambda t: t.level == 0 and t.type in closing_types, tokens
)
list_blocks = list()
for o, c in zip(opening_tokens, closing_tokens):
opening_index = tokens.index(o)
closing_index = tokens.index(c, opening_index)
list_blocks.append(tokens[opening_index : closing_index + 1])
return [
List(
tokens[0].type == "ordered_list_open",
[t.content for t in tokens if t.content],
)
for tokens in list_blocks
]
@comma_separated_string_args
def lists(self, fileids=None, categories=None):
return self.concatenated_view(self.list_reader, fileids, categories)
def section_reader(self, stream):
section_blocks, block = list(), list()
in_heading = False
for t in self.parser.parse(stream.read()):
if t.level == 0 and t.type == "heading_open":
if block:
section_blocks.append(block)
block = list()
in_heading = True
if in_heading:
block.append(t)
return [
MarkdownSection(
block[1].content,
block[0].markup.count("#"),
self.parser.renderer.render(block, self.parser.options, env=None),
)
for block in section_blocks
]
@comma_separated_string_args
def sections(self, fileids=None, categories=None):
return self.concatenated_view(self.section_reader, fileids, categories)