-
Notifications
You must be signed in to change notification settings - Fork 0
/
ccs.py
151 lines (136 loc) · 5.33 KB
/
ccs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import cv2
import pafy
import numpy as np
import time
import requests
from PIL import Image
import pytesseract
import pylast
import difflib
import sys
import json
def main():
if len(sys.argv) < 2:
print("Usage: python ccs.py <data_file_json>")
sys.exit()
data_file = sys.argv[1]
with open(data_file, "r") as f:
data = json.load(f)
url = get_live_video_url(data["Youtube_Data_API_Key"], data["chilledcow_youtube_channel_id"])
stream = get_video_url(url)
prev_song_details = ""
lastfm_network = pylast.LastFMNetwork(api_key=data["LASTFM_API_KEY"], api_secret=data["LASTFM_SHARED_SECRET"], \
username=data["username"], password_hash=data["password_hash"])
document = get_doc_file(data["song_list_google_doc_id"])
entries = get_entries_from_doc(document)
while True:
image_file = take_snapshot(stream)
cropped_image_file = cut_image(image_file)
processed_image_file = cv2_process(cropped_image_file)
song_details = tesseract_ocr_read(processed_image_file)
matched_song_details, confidence = find_closest_match_from_entries(entries, song_details)
diff_flag = diff_song_details(prev_song_details, matched_song_details)
if diff_flag:
prev_song_details = matched_song_details
artist, song = check_song_details(matched_song_details)
if artist is not None:
scrobble_to_lastfm(lastfm_network, artist, song, confidence)
time.sleep(30)
def get_live_video_url(youtube_data_api_key, chilledcow_youtube_channel_id):
request_link = "https://www.googleapis.com/youtube/v3/search?order=date&part=snippet&channelId={}&maxResults=50&key={}"
new_request_link = request_link.format(chilledcow_youtube_channel_id, youtube_data_api_key)
response = requests.get(new_request_link, allow_redirects=True)
response = response.json()
for item in response["items"]:
if item["snippet"]["liveBroadcastContent"] == "live" and "radio" in item["snippet"]["title"] and "relax/study" in item["snippet"]["title"]:
return item["id"]["videoId"]
def get_doc_file(song_list_google_doc_id):
request_link = "https://docs.google.com/document/d/{}/export?format={}"
new_request_link = request_link.format(song_list_google_doc_id, "txt")
response = requests.get(new_request_link)
response = response.text
return response
def get_entries_from_doc(document):
entries = []
for line in document.split("\n"):
if " - " in line:
entries.append(line.strip())
return entries
def get_video_url(youtube_link):
new_youtube_link = "https://www.youtube.com/watch?v=" + youtube_link
videoPafy = pafy.new(new_youtube_link)
# print(videoPafy)
best = videoPafy.getbest()
# print(best)
stream = best.url
# print("Stream link: {}".format(stream))
return stream
def take_snapshot(stream):
capture = cv2.VideoCapture(stream)
success, image = capture.read()
if not success:
print("OpenCV can't read video stream.")
sys.exit()
image_file = "img.jpg"
cv2.imwrite(image_file, image)
capture.release()
# print("Snapshot taken")
return image_file
def cut_image(image):
img = Image.open(image)
width = img.size[0]
height = img.size[1]
area = (0, 0, width, height/10)
cropped_img = img.crop(area)
new_image = image.split(".")[0]+"_cropped.jpg"
cropped_img.save(new_image)
# print("Snapshot cropped")
return new_image
def cv2_process(image):
img = cv2.imread(image)
low = np.array([200,200,200])
up = np.array([255,255,255])
mask = cv2.inRange(img, low, up)
res = cv2.bitwise_and(img,img, mask= mask)
new_image = image.split(".")[0]+"_processed.jpg"
cv2.imwrite(new_image, res)
# print("Snapshot background removed")
return new_image
def tesseract_ocr_read(image):
img = Image.open(image)
song_details = pytesseract.image_to_string(img)
# print("Song details read by tesseract")
return song_details
def find_closest_match_from_entries(entries, song_details):
max_similarity_score = 0
matched_entry = ""
for entry in entries:
seq = difflib.SequenceMatcher(a=entry.lower(), b=song_details.lower())
similarity_score = seq.ratio()
if similarity_score > max_similarity_score:
max_similarity_score = similarity_score
matched_entry = entry
# print(song_details, " | ", matched_entry, " | ", max_similarity_score)
return matched_entry, max_similarity_score
def diff_song_details(previous_song_details, song_details):
seq = difflib.SequenceMatcher(a=previous_song_details.lower(), b=song_details.lower())
similarity_score = seq.ratio()
# print("Similarity ratio: {}".format(similarity_score))
if similarity_score < 0.8:
return True
else:
return False
def check_song_details(song_details):
song_info = song_details.split(" - ")
if len(song_info) == 2:
artist, song = song_info
return artist, song
else:
return None, None
def scrobble_to_lastfm(lastfm_network, artist, song, confidence):
timestamp = int(time.time())
lastfm_network.scrobble(artist, song, timestamp)
print("Scrobled: {} - {} ({}% confidence)".format(artist, song, int(confidence*100)))
if __name__ == '__main__':
main()