This repository has been archived by the owner on Jun 30, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 153
/
channelsearch.py
97 lines (83 loc) 路 3.32 KB
/
channelsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import copy
from typing import Union
import json
from urllib.parse import urlencode
from youtubesearchpython.core.requests import RequestCore
from youtubesearchpython.handlers.componenthandler import ComponentHandler
from youtubesearchpython.core.constants import *
class ChannelSearchCore(RequestCore, ComponentHandler):
response = None
responseSource = None
resultComponents = []
def __init__(self, query: str, language: str, region: str, searchPreferences: str, browseId: str, timeout: int):
super().__init__()
self.query = query
self.language = language
self.region = region
self.browseId = browseId
self.searchPreferences = searchPreferences
self.continuationKey = None
self.timeout = timeout
def sync_create(self):
self._syncRequest()
self._parseChannelSearchSource()
self.response = self._getChannelSearchComponent(self.response)
async def next(self):
await self._asyncRequest()
self._parseChannelSearchSource()
self.response = self._getChannelSearchComponent(self.response)
return self.response
def _parseChannelSearchSource(self) -> None:
try:
last_tab = self.response["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1]
if 'expandableTabRenderer' in last_tab:
self.response = last_tab["expandableTabRenderer"]["content"]["sectionListRenderer"]["contents"]
else:
tab_renderer = last_tab["tabRenderer"]
if 'content' in tab_renderer:
self.response = tab_renderer["content"]["sectionListRenderer"]["contents"]
else:
self.response = []
except:
raise Exception('ERROR: Could not parse YouTube response.')
def _getRequestBody(self):
''' Fixes #47 '''
requestBody = copy.deepcopy(requestPayload)
requestBody['query'] = self.query
requestBody['client'] = {
'hl': self.language,
'gl': self.region,
}
requestBody['params'] = self.searchPreferences
requestBody['browseId'] = self.browseId
self.url = 'https://www.youtube.com/youtubei/v1/browse' + '?' + urlencode({
'key': searchKey,
})
self.data = requestBody
def _syncRequest(self) -> None:
''' Fixes #47 '''
self._getRequestBody()
request = self.syncPostRequest()
try:
self.response = request.json()
except:
raise Exception('ERROR: Could not make request.')
async def _asyncRequest(self) -> None:
''' Fixes #47 '''
self._getRequestBody()
request = await self.asyncPostRequest()
try:
self.response = request.json()
except:
raise Exception('ERROR: Could not make request.')
def result(self, mode: int = ResultMode.dict) -> Union[str, dict]:
'''Returns the search result.
Args:
mode (int, optional): Sets the type of result. Defaults to ResultMode.dict.
Returns:
Union[str, dict]: Returns JSON or dictionary.
'''
if mode == ResultMode.json:
return json.dumps({'result': self.response}, indent=4)
elif mode == ResultMode.dict:
return {'result': self.response}