forked from SergeyPirogov/webdriver_manager
/
driver_cache.py
139 lines (115 loc) · 4.6 KB
/
driver_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import datetime
import json
import os
from webdriver_manager.core.config import wdm_local, get_xdist_worker_id
from webdriver_manager.core.constants import (
DEFAULT_PROJECT_ROOT_CACHE_PATH,
DEFAULT_USER_HOME_CACHE_PATH, ROOT_FOLDER_NAME,
)
from webdriver_manager.core.logger import log
from webdriver_manager.core.utils import get_date_diff, File, save_file
class DriverCache(object):
def __init__(self, root_dir=None, valid_range=1):
self._root_dir = DEFAULT_USER_HOME_CACHE_PATH
is_wdm_local = wdm_local()
xdist_worker_id = get_xdist_worker_id()
if xdist_worker_id:
print('', flush=True)
log(f"xdist worker is: {xdist_worker_id}")
self._root_dir = os.path.join(self._root_dir, xdist_worker_id)
if root_dir is not None:
self._root_dir = os.path.join(root_dir, ROOT_FOLDER_NAME, xdist_worker_id)
if is_wdm_local:
self._root_dir = os.path.join(DEFAULT_PROJECT_ROOT_CACHE_PATH, xdist_worker_id)
self._drivers_root = "drivers"
self._drivers_json_path = os.path.join(self._root_dir, "drivers.json")
self._date_format = "%d/%m/%Y"
self._drivers_directory = os.path.join(self._root_dir, self._drivers_root)
self.valid_range = valid_range
def save_file_to_cache(self, driver, file: File):
driver_name = driver.get_name()
os_type = driver.get_os_type()
driver_version = driver.get_version()
browser_version = driver.browser_version
path = os.path.join(
self._drivers_directory, driver_name, os_type, driver_version
)
archive = save_file(file, path)
files = archive.unpack(path)
binary = self.__get_binary(files, driver_name)
binary_path = os.path.join(path, binary)
self.__save_metadata(
browser_version, driver_name, os_type, driver_version, binary_path
)
log(f"Driver has been saved in cache [{path}]")
return binary_path
def __get_binary(self, files, driver_name):
if len(files) == 1:
return files[0]
for f in files:
if driver_name in f:
return f
raise Exception(f"Can't find binary for {driver_name} among {files}")
def __save_metadata(
self,
browser_version,
driver_name,
os_type,
driver_version,
binary_path,
date=None,
):
if date is None:
date = datetime.date.today()
metadata = self.get_metadata()
key = f"{os_type}_{driver_name}_{driver_version}_for_{browser_version}"
data = {
key: {
"timestamp": date.strftime(self._date_format),
"binary_path": binary_path,
}
}
metadata.update(data)
with open(self._drivers_json_path, "w+") as outfile:
json.dump(metadata, outfile, indent=4)
def find_driver(self, driver):
"""Find driver by '{os_type}_{driver_name}_{driver_version}_{browser_version}'."""
os_type = driver.get_os_type()
driver_name = driver.get_name()
driver_version = driver.get_version()
browser_version = driver.browser_version
metadata = self.get_metadata()
key = f"{os_type}_{driver_name}_{driver_version}_for_{browser_version}"
if key not in metadata:
log(
f"There is no [{os_type}] {driver_name} for browser {browser_version} in cache"
)
return None
path = os.path.join(
self._drivers_directory, driver_name, os_type, driver_version
)
driver_binary_name = (
"msedgedriver" if driver_name == "edgedriver" else driver_name
)
driver_binary_name = (
f"{driver_binary_name}.exe" if "win" in os_type else driver_binary_name
)
binary_path = os.path.join(path, driver_binary_name)
if not os.path.exists(binary_path):
return None
driver_info = metadata[key]
if not self.__is_valid(driver_info):
return None
path = driver_info["binary_path"]
log(f"Driver [{path}] found in cache")
return path
def __is_valid(self, driver_info):
dates_diff = get_date_diff(
driver_info["timestamp"], datetime.date.today(), self._date_format
)
return dates_diff < self.valid_range
def get_metadata(self):
if os.path.exists(self._drivers_json_path):
with open(self._drivers_json_path, "r") as outfile:
return json.load(outfile)
return {}