Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and improve timezone cache concurrency #1105

Merged
merged 5 commits into from Oct 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions yfinance/base.py
Expand Up @@ -379,7 +379,7 @@ def _get_ticker_tz(self):
if not self._tz is None:
return self._tz

tz = utils.cache_lookup_tkr_tz(self.ticker)
tz = utils.tz_cache.lookup(self.ticker)

if tz is not None:
invalid_value = not isinstance(tz, str)
Expand All @@ -391,7 +391,7 @@ def _get_ticker_tz(self):

if invalid_value:
# Clear from cache and force re-fetch
utils.cache_store_tkr_tz(self.ticker, None)
utils.tz_cache.store(self.ticker, None)
tz = None

if tz is None:
Expand All @@ -407,7 +407,7 @@ def _get_ticker_tz(self):
tz = None
if tz is not None:
# info fetch is relatively slow so cache timezone
utils.cache_store_tkr_tz(self.ticker, tz)
utils.tz_cache.store(self.ticker, tz)

self._tz = tz
return tz
Expand Down
119 changes: 84 additions & 35 deletions yfinance/utils.py
Expand Up @@ -22,6 +22,8 @@
from __future__ import print_function

import datetime as _datetime
from typing import Dict, Union

import pytz as _tz
import requests as _requests
import re as _re
Expand All @@ -30,9 +32,10 @@
import sys as _sys
import os as _os
import appdirs as _ad
import sqlite3 as _sqlite3
import atexit as _atexit

from threading import Lock
cache_mutex = Lock()

try:
import ujson as _json
Expand Down Expand Up @@ -88,7 +91,9 @@ def get_news_by_isin(isin, proxy=None, session=None):
return data.get('news', {})


def empty_df(index=[]):
def empty_df(index=None):
if index is None:
index = []
empty = _pd.DataFrame(index=index, data={
'Open': _np.nan, 'High': _np.nan, 'Low': _np.nan,
'Close': _np.nan, 'Adj Close': _np.nan, 'Volume': _np.nan})
Expand Down Expand Up @@ -488,44 +493,88 @@ def __str__(self):
return str(self.prog_bar)


# Simple file cache of ticker->timezone:
def get_cache_dirpath():
return _os.path.join(_ad.user_cache_dir(), "py-yfinance")
class _KVStore:
"""Simpel Sqlite backed key/value store, key and value are strings. Should be thread safe."""

def cache_lookup_tkr_tz(tkr):
fp = _os.path.join(get_cache_dirpath(), "tkr-tz.csv")
if not _os.path.isfile(fp):
return None
df = _pd.read_csv(fp, index_col="Ticker")
if not tkr in df.index:
return None
return df.loc[tkr,"Tz"]
def __init__(self, filename):
self._cache_mutex = Lock()
with self._cache_mutex:
self.conn = _sqlite3.connect(filename, timeout=10, check_same_thread=False)
self.conn.execute('pragma journal_mode=wal')
self.conn.execute('create table if not exists "kv" (key TEXT primary key, value TEXT) without rowid')
self.conn.commit()
_atexit.register(self.close)

def cache_store_tkr_tz(tkr, tz):
dp = get_cache_dirpath()
fp = _os.path.join(dp, "tkr-tz.csv")
def close(self):
if self.conn is not None:
with self._cache_mutex:
self.conn.close()
self.conn = None

cache_mutex.acquire()
def get(self, key: str) -> Union[str, None]:
"""Get value for key if it exists else returns None"""
item = self.conn.execute('select value from "kv" where key=?', (key,))
if item:
return next(item, (None,))[0]

if not _os.path.isdir(dp):
_os.makedirs(dp)
if (not _os.path.isfile(fp)) and (tz is not None):
# Initialise CSV file with first entry
df = _pd.DataFrame({"Tz":[tz]}, index=[tkr])
df.index.name = "Ticker"
df.to_csv(fp)
def set(self, key: str, value: str) -> str:
with self._cache_mutex:
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
self.conn.commit()

else:
df = _pd.read_csv(fp, index_col="Ticker")
def bulk_set(self, kvdata: Dict[str, str]):
records = tuple(i for i in kvdata.items())
with self._cache_mutex:
self.conn.executemany('replace into "kv" (key, value) values (?,?)', records)
self.conn.commit()

def delete(self, key: str):
with self._cache_mutex:
self.conn.execute('delete from "kv" where key=?', (key,))
self.conn.commit()


class _TzCache:
"""Simple sqllite file cache of ticker->timezone"""

def __init__(self):
self._tz_db = None

def lookup(self, tkr):
return self.tz_db.get(tkr)

def store(self, tkr, tz):
if tz is None:
# Delete if in cache:
if tkr in df.index:
df = df.drop(tkr)
df.to_csv(fp)
self.tz_db.delete(tkr)
elif self.tz_db.get(tkr) is not None:
raise Exception("Tkr {} tz already in cache".format(tkr))
else:
if tkr in df.index:
raise Exception("Tkr {} tz already in cache".format(tkr))
df.loc[tkr,"Tz"] = tz
df.to_csv(fp)
self.tz_db.set(tkr, tz)

@property
def cache_dirpath(self):
return _os.path.join(_ad.user_cache_dir(), "py-yfinance")

@property
def tz_db(self):
# lazy init
if self._tz_db is None:
if not _os.path.isdir(self.cache_dirpath):
_os.makedirs(self.cache_dirpath)
self._tz_db = _KVStore(_os.path.join(self.cache_dirpath, "tkr-tz.db"))
self._migrate_cache_tkr_tz()

return self._tz_db

def _migrate_cache_tkr_tz(self):
"""Migrate contents from old ticker CSV-cache to SQLite db"""
fp = _os.path.join(self.cache_dirpath, "tkr-tz.csv")
if not _os.path.isfile(fp):
return None
df = _pd.read_csv(fp, index_col="Ticker")
print(df.to_dict()['Tz'])
self.tz_db.bulk_set(df.to_dict()['Tz'])
_os.remove(fp)


cache_mutex.release()
tz_cache = _TzCache()