Skip to content

Commit

Permalink
Backport Yahoo decryption
Browse files Browse the repository at this point in the history
  • Loading branch information
ValueRaider committed Dec 18, 2022
1 parent 3537ec3 commit b47adf0
Showing 1 changed file with 98 additions and 7 deletions.
105 changes: 98 additions & 7 deletions yfinance/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@
import os as _os
import appdirs as _ad

from base64 import b64decode
import hashlib
usePycryptodome = False # slightly faster
# usePycryptodome = True
if usePycryptodome:
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
else:
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from threading import Lock
mutex = Lock()

Expand Down Expand Up @@ -109,19 +120,99 @@ def get_html(url, proxy=None, session=None):
return html



def decrypt_cryptojs_stores(data):
"""
Yahoo has started encrypting data stores, this method decrypts it.
:param data: Python dict of the json data
:return: The decrypted string data in data['context']['dispatcher']['stores']
"""

_cs = data["_cs"]
# Assumes _cr has format like: '{"words":[-449732894,601032952,157396918,2056341829],"sigBytes":16}';
_cr = _json.loads(data["_cr"])
_cr = b"".join(int.to_bytes(i, length=4, byteorder="big", signed=True) for i in _cr["words"])

password = hashlib.pbkdf2_hmac("sha1", _cs.encode("utf8"), _cr, 1, dklen=32).hex()

encrypted_stores = data['context']['dispatcher']['stores']
encrypted_stores = b64decode(encrypted_stores)
assert encrypted_stores[0:8] == b"Salted__"
salt = encrypted_stores[8:16]
encrypted_stores = encrypted_stores[16:]

key, iv = _EVPKDF(password, salt, keySize=32, ivSize=16, iterations=1, hashAlgorithm="md5")

if usePycryptodome:
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
plaintext = cipher.decrypt(encrypted_stores)
plaintext = unpad(plaintext, 16, style="pkcs7")
else:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
decryptor = cipher.decryptor()
plaintext = decryptor.update(encrypted_stores) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(plaintext) + unpadder.finalize()
plaintext = plaintext.decode("utf-8")

return plaintext

def _EVPKDF(password, salt, keySize=32, ivSize=16, iterations=1, hashAlgorithm="md5") -> tuple:
"""OpenSSL EVP Key Derivation Function
Args:
password (Union[str, bytes, bytearray]): Password to generate key from.
salt (Union[bytes, bytearray]): Salt to use.
keySize (int, optional): Output key length in bytes. Defaults to 32.
ivSize (int, optional): Output Initialization Vector (IV) length in bytes. Defaults to 16.
iterations (int, optional): Number of iterations to perform. Defaults to 1.
hashAlgorithm (str, optional): Hash algorithm to use for the KDF. Defaults to 'md5'.
Returns:
key, iv: Derived key and Initialization Vector (IV) bytes.
Taken from: https://gist.github.com/rafiibrahim8/0cd0f8c46896cafef6486cb1a50a16d3
OpenSSL original code: https://github.com/openssl/openssl/blob/master/crypto/evp/evp_key.c#L78
"""

assert iterations > 0, "Iterations can not be less than 1."

if isinstance(password, str):
password = password.encode("utf-8")

final_length = keySize + ivSize
key_iv = b""
block = None

while len(key_iv) < final_length:
hasher = hashlib.new(hashAlgorithm)
if block:
hasher.update(block)
hasher.update(password)
hasher.update(salt)
block = hasher.digest()
for _ in range(1, iterations):
block = hashlib.new(hashAlgorithm, block).digest()
key_iv += block

key, iv = key_iv[:keySize], key_iv[keySize:final_length]
return key, iv


def get_json(url, proxy=None, session=None):
session = session or _requests
html = session.get(url=url, proxies=proxy, headers=user_agent_headers).text

if "QuoteSummaryStore" not in html:
html = session.get(url=url, proxies=proxy).text
if "QuoteSummaryStore" not in html:
return {}

json_str = html.split('root.App.main =')[1].split(
'(this)')[0].split(';\n}')[0].strip()
data = _json.loads(json_str)[
'context']['dispatcher']['stores']['QuoteSummaryStore']
data = _json.loads(json_str)

if "_cs" in data and "_cr" in data:
data = _json.loads(decrypt_cryptojs_stores(data))

if "context" in data and "dispatcher" in data["context"]:
# Keep old code, just in case
data = data['context']['dispatcher']['stores']

data = data['QuoteSummaryStore']
# add data about Shares Outstanding for companies' tickers if they are available
try:
data['annualBasicAverageShares'] = _json.loads(
Expand Down

0 comments on commit b47adf0

Please sign in to comment.