-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyoutube_api.py
165 lines (126 loc) · 5.17 KB
/
youtube_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import logs
import feedparser
import requests
import json
import main
import re
import os
import utils
RSS_FEED = "http://www.tested.com/podcast-xml/this-is-only-a-test/"
TESTED_YOUTUBE_ID = "UCiDJtJKMICpb9B1qf7qjEOA"
def get_access_token():
url = "https://www.googleapis.com/oauth2/v4/token"
querystring = {"client_secret": os.environ["TESTED_YOUTUBE_CLIENT_SECRET"],
"grant_type": "refresh_token",
"refresh_token": os.environ["TESTED_YOUTUBE_REFRESH_TOKEN"],
"client_id": os.environ["TESTED_YOUTUBE_CLIENT_ID"]}
response = requests.request("POST", url, params=querystring)
return json.loads(response.text)["access_token"]
def get_newest_podcast_video_id_and_name():
url = "https://www.googleapis.com/youtube/v3/search"
querystring = {
"part": "snippet",
"channelId": TESTED_YOUTUBE_ID,
"maxResults": "25",
"order": "date"
}
headers = {
'Authorization': "Bearer " + get_access_token(),
'Accept': "application/json",
'Cache-Control': "no-cache",
'Host': "www.googleapis.com",
'accept-encoding': "gzip, deflate",
'Connection': "keep-alive"
}
response = requests.request("GET", url, headers=headers, params=querystring)
videos = json.loads(response.text)
print(videos)
print(response.status_code)
for video in videos['items']:
video_id = video['id']['videoId']
video_title = video['snippet']['title']
if "this is only a test" in video_title.lower():
return video_id, video_title
def have_i_already_commented(video_id):
url = "https://www.googleapis.com/youtube/v3/commentThreads"
querystring = {
"part": "snippet,replies",
"videoId": video_id,
"searchTerms": "Timestamp Bot"
}
headers = {
'Authorization': "Bearer " + get_access_token(),
'Accept': "application/json",
'Cache-Control': "no-cache",
'Host': "www.googleapis.com",
'accept-encoding': "gzip, deflate",
'Connection': "keep-alive",
}
response = requests.request("GET", url, headers=headers, params=querystring)
print(response.text)
my_comments = json.loads(response.text)
return len(my_comments['items']) > 0
def comment_on_video(video_id, message):
url = "https://www.googleapis.com/youtube/v3/commentThreads"
querystring = {"part": "snippet"}
payload = {
'snippet': {
'videoId': video_id,
'topLevelComment': {
'snippet': {
'textOriginal': message
}
}
}
}
headers = {
'Authorization': "Bearer " + get_access_token(),
'Accept': "application/json",
'Content-Type': "application/json",
'Cache-Control': "no-cache",
'Host': "www.googleapis.com",
'accept-encoding': "gzip, deflate",
'Connection': "keep-alive",
}
response = requests.request("POST", url, data=json.dumps(payload), headers=headers, params=querystring)
print(response.text)
def get_newest_rss_episode_number():
feed = feedparser.parse(RSS_FEED)
title = feed.entries[0].title
# Find the largest digit in the title, assume that it's the episode number
pattern = r'\d\d\d+'
return sorted(list(map(int, re.findall(pattern, title))), reverse=True)[0]
def youtube_main(force=False):
"""
:param force: If true post on the episode regardless of the episode number matching or if already commented
"""
with utils.get_tracer().span(name='get_newest_podcast_video_id_and_name'):
video_id, video_name = get_newest_podcast_video_id_and_name()
print(video_id, video_name)
with utils.get_tracer().span(name='have_i_already_commented'):
already_commented = have_i_already_commented(video_id)
if (not force) and already_commented:
return "Already commented on video " + video_name
newest_rss = str(get_newest_rss_episode_number())
if (not force) and newest_rss not in video_name:
return "The newest podcast in the RSS feed does not correspond to the youtube channel. " + newest_rss + " " + video_name
with utils.get_tracer().span(name='generate_timestamps'):
segments = main.generate_timestamps(video_id)
to_post = "0:00 Intro\n"
if segments is None:
print("No segments found")
return
for segment in segments:
segment_name = main.FILE_NAMES_TO_NAME[segment[0]]
if segment_name != "Intro":
to_post += main.format_seconds(segment[1]) + " " + segment_name
to_post += "\n"
if "spoiler" in to_post.lower():
to_post = "NEW: Spoiler alerts are now included in the timestamps! Tested can also copy this comment into the YouTube description to embed chapters in the video.\n" + to_post
with utils.get_tracer().span(name='comment_on_video'):
comment_on_video(video_id, to_post)
with utils.get_tracer().span(name='log_to_discord'):
logs.post_to_discord(video_name + " " + "https://www.youtube.com/watch?v=" + video_id, to_post)
return "Posted on " + video_name + "\n\n" + to_post
if __name__ == "__main__":
youtube_main()