-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
116 lines (94 loc) · 4.68 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from typing import Final
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import os
from dotenv import load_dotenv
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from pymongo import MongoClient
from Langchain import *
from func import *
nltk.download('vader_lexicon')
sia = SentimentIntensityAnalyzer()
load_dotenv()
uri = os.getenv("MONGODB")
client = MongoClient(uri)
db = client['ListnerAI_DB']
collection = db['UserData']
print('Starting up bot...')
TOKEN: Final = os.getenv("TELEGRAM_KEY")
BOT_USERNAME: Final = '@ListnerAI_bot'
user_id = ''
data = {'user_id': '', 'Name': '', 'Contact': ''}
ACTIVE = False
# Lets us use the /start command
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
global user_id
user_id = update.message.from_user.id
global ACTIVE
ACTIVE = check_user_exists(user_id)
if ACTIVE:
user = collection.find_one({'user_id': user_id})
data['Name'] = user['Name']
data['Contact'] = user['Contact']
text=(f"hi, my name is {data['Name']}")
response: str = conversation.predict(input=text).strip("\n").strip()
await update.message.reply_text(response)
else:
data['user_id'] = user_id
await update.message.reply_text('''Hello there! Before gtting started What\'s your name? use /name <your name> to tell me your name \n Eg: /name ListenerAI_Bot''')
async def name_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
text: str = update.message.text
name = text.replace('/name', '').strip()
if not is_valid_name(name):
await update.message.reply_text('Please enter a valid name! use /name <your name> to tell me your name \n Eg: /name ListenerAI_Bot')
else:
data['Name'] = name
await update.message.reply_text('Hello '+name+'! use /contact <Country code + Phone number> to tell me your closest person\'s contact information \n /contact +911234567890')
async def contact_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
text: str = update.message.text
contact = text.replace('/contact', '').strip()
if not is_valid_phone_number(contact):
await update.message.reply_text('Please enter a valid phone number! use /contact <Country code + Phone number> to tell me your closest person\'s contact information \n /contact +911234567890')
else:
data['Contact'] = contact
await update.message.reply_text('Thank you for your contact information! check the data using /data')
async def check_data(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(f"Name: *{data['Name']}*\nContact: *{data['Contact']}* use /yes to upload the data or use the previous commands to make changes", parse_mode='Markdown')
print(data['Name'], data['Contact'])
async def upload_data(update: Update, context: ContextTypes.DEFAULT_TYPE):
collection.insert_one(data)
global ACTIVE
ACTIVE = True
resp=conversation.predict(input=(f"hi, my name is {data['Name']}"))
await update.message.reply_text('Data uploaded successfully! ' + resp)
# Lets us use the /help command
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text('Try typing anything and I will do my best to respond! or use /start')
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
if ACTIVE:
message_type: str = update.message.chat.type
text: str = update.message.text
print(f'User ({update.message.chat.id}) in {message_type}: "{text}"')
response: str = conversation.predict(input=text).strip("\n").strip()
sentiment = sia.polarity_scores(text)['compound']
print('Bot:', response, sentiment)
await update.message.reply_text(response)
if sentiment < -0.3:
send_SMS(data['Contact'], data['Name'])
else:
await update.message.reply_text('Use /start to start the bot')
async def error(update: Update, context: ContextTypes.DEFAULT_TYPE):
print(f'Update {update} caused error {context.error}')
if __name__ == '__main__':
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler('start', start_command))
app.add_handler(CommandHandler('help', help_command))
app.add_handler(CommandHandler('name', name_command))
app.add_handler(CommandHandler('contact', contact_command))
app.add_handler(CommandHandler('data', check_data))
app.add_handler(CommandHandler('yes', upload_data))
app.add_handler(MessageHandler(filters.TEXT, handle_message))
app.add_error_handler(error)
print('Polling...')
app.run_polling(poll_interval=5)