From 052e135029826a4caf84393263f13a13cc8cdac8 Mon Sep 17 00:00:00 2001 From: pukkandan Date: Sat, 24 Jul 2021 06:16:46 +0530 Subject: [PATCH] [youtube] Simplify `_get_text` early --- test/parameters.json | 1 + yt_dlp/extractor/youtube.py | 75 ++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/test/parameters.json b/test/parameters.json index 9425e85eb..9ca7d2ca9 100644 --- a/test/parameters.json +++ b/test/parameters.json @@ -1,4 +1,5 @@ { + "check_formats": false, "consoletitle": false, "continuedl": true, "forcedescription": false, diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index 9eb103520..48fc460ef 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -691,7 +691,7 @@ def _extract_alerts(cls, data): alert_type = alert.get('type') if not alert_type: continue - message = cls._get_text(alert.get('text')) + message = cls._get_text(alert, 'text') if message: yield alert_type, message @@ -721,23 +721,26 @@ def _extract_badges(self, renderer: dict): return badges @staticmethod - def _get_text(data, getter=None, max_runs=None): - for get in variadic(getter): - d = try_get(data, get) if get is not None else data - text = try_get(d, lambda x: x['simpleText'], compat_str) - if text: - return text - runs = try_get(d, lambda x: x['runs'], list) or [] - if not runs and isinstance(d, list): - runs = d + def _get_text(data, *path_list, max_runs=None): + for path in path_list or [None]: + if path is None: + obj = [data] + else: + obj = traverse_obj(data, path, default=[]) + if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)): + obj = [obj] + for item in obj: + text = try_get(item, lambda x: x['simpleText'], compat_str) + if text: + return text + runs = try_get(item, lambda x: x['runs'], list) or [] + if not runs and isinstance(item, list): + runs = item - def get_runs(runs): - for run in runs[:min(len(runs), max_runs or len(runs))]: - yield try_get(run, lambda x: x['text'], compat_str) or '' - - text = ''.join(get_runs(runs)) - if text: - return text + runs = runs[:min(len(runs), max_runs or len(runs))] + text = ''.join(traverse_obj(runs, (..., 'text'), expected_type=str, default=[])) + if text: + return text def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None, ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None, @@ -804,15 +807,15 @@ def is_music_url(url): def _extract_video(self, renderer): video_id = renderer.get('videoId') - title = self._get_text(renderer.get('title')) - description = self._get_text(renderer.get('descriptionSnippet')) - duration = parse_duration(self._get_text(renderer.get('lengthText'))) - view_count_text = self._get_text(renderer.get('viewCountText')) or '' + title = self._get_text(renderer, 'title') + description = self._get_text(renderer, 'descriptionSnippet') + duration = parse_duration(self._get_text(renderer, 'lengthText')) + view_count_text = self._get_text(renderer, 'viewCountText') or '' view_count = str_to_int(self._search_regex( r'^([\d,]+)', re.sub(r'\s', '', view_count_text), 'view count', default=None)) - uploader = self._get_text(renderer, (lambda x: x['ownerText'], lambda x: x['shortBylineText'])) + uploader = self._get_text(renderer, 'ownerText', 'shortBylineText') return { '_type': 'url', @@ -2028,8 +2031,8 @@ def _extract_chapters_from_engagement_panel(self, data, duration): data, ('engagementPanels', ..., 'engagementPanelSectionListRenderer', 'content', 'macroMarkersListRenderer', 'contents'), expected_type=list, default=[]) - chapter_time = lambda chapter: parse_duration(self._get_text(chapter.get('timeDescription'))) - chapter_title = lambda chapter: self._get_text(chapter.get('title')) + chapter_time = lambda chapter: parse_duration(self._get_text(chapter, 'timeDescription')) + chapter_title = lambda chapter: self._get_text(chapter, 'title') return next(( filter(None, ( @@ -2083,14 +2086,14 @@ def _extract_comment(self, comment_renderer, parent=None): if not comment_id: return - text = self._get_text(comment_renderer.get('contentText')) + text = self._get_text(comment_renderer, 'contentText') # note: timestamp is an estimate calculated from the current time and time_text - time_text = self._get_text(comment_renderer.get('publishedTimeText')) or '' + time_text = self._get_text(comment_renderer, 'publishedTimeText') or '' time_text_dt = self.parse_time_text(time_text) if isinstance(time_text_dt, datetime.datetime): timestamp = calendar.timegm(time_text_dt.timetuple()) - author = self._get_text(comment_renderer.get('authorText')) + author = self._get_text(comment_renderer, 'authorText') author_id = try_get(comment_renderer, lambda x: x['authorEndpoint']['browseEndpoint']['browseId'], compat_str) @@ -2125,7 +2128,7 @@ def extract_header(contents): for content in contents: comments_header_renderer = try_get(content, lambda x: x['commentsHeaderRenderer']) expected_comment_count = parse_count(self._get_text( - comments_header_renderer, (lambda x: x['countText'], lambda x: x['commentsCount']), max_runs=1)) + comments_header_renderer, 'countText', 'commentsCount', max_runs=1)) if expected_comment_count: comment_counts[1] = expected_comment_count @@ -3001,10 +3004,7 @@ def process_language(container, base_url, lang_code, sub_name, query): }) vsir = content.get('videoSecondaryInfoRenderer') if vsir: - info['channel'] = self._get_text(try_get( - vsir, - lambda x: x['owner']['videoOwnerRenderer']['title'], - dict)) + info['channel'] = self._get_text(vsir, ('owner', 'videoOwnerRenderer', 'title')) rows = try_get( vsir, lambda x: x['metadataRowContainer']['metadataRowContainerRenderer']['rows'], @@ -3019,8 +3019,8 @@ def process_language(container, base_url, lang_code, sub_name, query): mrr_title = mrr.get('title') if not mrr_title: continue - mrr_title = self._get_text(mrr['title']) - mrr_contents_text = self._get_text(mrr['contents'][0]) + mrr_title = self._get_text(mrr, 'title') + mrr_contents_text = self._get_text(mrr, ('contents', 0)) if mrr_title == 'License': info['license'] = mrr_contents_text elif not multiple_songs: @@ -3592,7 +3592,7 @@ def _grid_entries(self, grid_renderer): renderer = self._extract_basic_item_renderer(item) if not isinstance(renderer, dict): continue - title = self._get_text(renderer.get('title')) + title = self._get_text(renderer, 'title') # playlist playlist_id = renderer.get('playlistId') @@ -3652,7 +3652,7 @@ def _shelf_entries(self, shelf_renderer, skip_channels=False): # will not work if skip_channels and '/channels?' in shelf_url: return - title = self._get_text(shelf_renderer, lambda x: x['title']) + title = self._get_text(shelf_renderer, 'title') yield self.url_result(shelf_url, video_title=title) # Shelf may not contain shelf URL, fallback to extraction from content for entry in self._shelf_entries_from_content(shelf_renderer): @@ -4026,8 +4026,7 @@ def _extract_availability(self, data): renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False if not is_selected: continue - label = self._get_text( - try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label'], dict) or []) + label = self._get_text(renderer_dict, ('privacyDropdownItemRenderer', 'label')) if label: badge_labels.add(label.lower()) break