/
sync.py
100 lines (83 loc) · 3.4 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Does the synchronization. Called by "manage-plnt.py sync"."""
from datetime import datetime
from html import escape
import feedparser
from .database import Blog
from .database import Entry
from .database import session
from .utils import nl2p
from .utils import strip_tags
HTML_MIMETYPES = {"text/html", "application/xhtml+xml"}
def sync():
"""
Performs a synchronization. Articles that are already synchronized aren't
touched anymore.
"""
for blog in Blog.query.all():
# parse the feed. feedparser.parse will never given an exception
# but the bozo bit might be defined.
feed = feedparser.parse(blog.feed_url)
for entry in feed.entries:
# get the guid. either the id if specified, otherwise the link.
# if none is available we skip the entry.
guid = entry.get("id") or entry.get("link")
if not guid:
continue
# get an old entry for the guid to check if we need to update
# or recreate the item
old_entry = Entry.query.filter_by(guid=guid).first()
# get title, url and text. skip if no title or no text is
# given. if the link is missing we use the blog link.
if "title_detail" in entry:
title = entry.title_detail.get("value") or ""
if entry.title_detail.get("type") in HTML_MIMETYPES:
title = strip_tags(title)
else:
title = escape(title)
else:
title = entry.get("title")
url = entry.get("link") or blog.blog_url
text = (
entry.content[0] if "content" in entry else entry.get("summary_detail")
)
if not title or not text:
continue
# if we have an html text we use that, otherwise we HTML
# escape the text and use that one. We also handle XHTML
# with our tag soup parser for the moment.
if text.get("type") not in HTML_MIMETYPES:
text = escape(nl2p(text.get("value") or ""))
else:
text = text.get("value") or ""
# no text? continue
if not text.strip():
continue
# get the pub date and updated date. This is rather complex
# because different feeds do different stuff
pub_date = (
entry.get("published_parsed")
or entry.get("created_parsed")
or entry.get("date_parsed")
)
updated = entry.get("updated_parsed") or pub_date
pub_date = pub_date or updated
# if we don't have a pub_date we skip.
if not pub_date:
continue
# convert the time tuples to datetime objects.
pub_date = datetime(*pub_date[:6])
updated = datetime(*updated[:6])
if old_entry and updated <= old_entry.last_update:
continue
# create a new entry object based on the data collected or
# update the old one.
entry = old_entry or Entry()
entry.blog = blog
entry.guid = guid
entry.title = title
entry.url = url
entry.text = text
entry.pub_date = pub_date
entry.last_update = updated
session.add(entry)
session.commit()