await update.message.reply_text('Hi! I\'m your NSFW Image Detector Bot. Send me an image to check if it contains NSFW content.')
+async def analyze(file_name):
+ img = Image.open(file_name)
+ classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection")
+ result = classifier(img)
+ return result[0]['label']
+
+
async def analyze_image(update: Update, context) -> None:
file_id = update.message.photo[-1].file_id
file_info = await context.bot.get_file(file_id)
await update.message.reply_text('Failed to download the photo.')
try:
- img = Image.open(file_name)
- classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection")
- results = classifier(img)
+ result = await analyze(file_name)
- if results[0]['label'] == 'nsfw':
+ if result == 'nsfw':
await update.message.reply_text(f"Warning: This image contains NSFW content.")
await context.bot.delete_message(chat_id=update.message.chat_id,message_id=update.message.message_id)
ffmpeg.input(file_name).filter('fps', fps='1').output(f'{file_name}-%d.jpg', start_number=0).overwrite_output().run(quiet=True)
for i in [i for i in os.listdir('.') if (file_name in i) and ('.jpg' in i)]:
- img = Image.open(i)
- classifier = pipeline("image-classification", model="Falconsai/nsfw_image_detection")
- results = classifier(img)
+ result = await analyze(i)
- if results[0]['label'] == 'nsfw':
+ if result == 'nsfw':
await update.message.reply_text(f"Warning: This video contains NSFW content.")
await context.bot.delete_message(chat_id=update.message.chat_id,message_id=update.message.message_id)
break
os.remove(i)
+async def analyze_gif(update: Update, context) -> None:
+ file_id = update.message.animation.file_id
+ file_info = await context.bot.get_file(file_id)
+
+ if file_info:
+ # Get the URL of the file
+ file_path = file_info.file_path
+ response = requests.get(file_path)
+
+ if response.status_code == 200:
+ # Save the video to disk
+ file_name = f"{file_id}.mp4"
+ with open(file_name, 'wb') as f:
+ f.write(response.content)
+ else:
+ await update.message.reply_text('Failed to download the animation.')
+ else:
+ await update.message.reply_text('Failed to download the animation.')
+
+ try:
+ ffmpeg.input(file_name).filter('fps', fps='1').output(f'{file_name}-%d.jpg', start_number=0).overwrite_output().run(quiet=True)
+
+ for i in [i for i in os.listdir('.') if (file_name in i) and ('.jpg' in i)]:
+ result = await analyze(i)
+
+ if result == 'nsfw':
+ await update.message.reply_text(f"Warning: This animation contains NSFW content.")
+ await context.bot.delete_message(chat_id=update.message.chat_id,message_id=update.message.message_id)
+ break
+
+ except Exception as e:
+ await update.message.reply_text(f'Some kind of mistake')
+ print(f"An error occurred: {e}")
+
+ finally:
+ for i in [i for i in os.listdir('.') if file_name in i]:
+ os.remove(i)
+
+
+async def analyze_video_note(update: Update, context) -> None:
+ file_id = update.message.video_note.file_id
+ file_info = await context.bot.get_file(file_id)
+
+ if file_info:
+ # Get the URL of the file
+ file_path = file_info.file_path
+ response = requests.get(file_path)
+
+ if response.status_code == 200:
+ # Save the video to disk
+ file_name = f"{file_id}.mp4"
+ with open(file_name, 'wb') as f:
+ f.write(response.content)
+ else:
+ await update.message.reply_text('Failed to download the circle.')
+ else:
+ await update.message.reply_text('Failed to download the circle.')
+
+ try:
+ ffmpeg.input(file_name).filter('fps', fps='1').output(f'{file_name}-%d.jpg', start_number=0).overwrite_output().run(quiet=True)
+
+ for i in [i for i in os.listdir('.') if (file_name in i) and ('.jpg' in i)]:
+ result = await analyze(i)
+
+ if result == 'nsfw':
+ await update.message.reply_text(f"Warning: This circle contains NSFW content.")
+ await context.bot.delete_message(chat_id=update.message.chat_id,message_id=update.message.message_id)
+ break
+
+ except Exception as e:
+ await update.message.reply_text(f'Some kind of mistake')
+ print(f"An error occurred: {e}")
+
+ finally:
+ for i in [i for i in os.listdir('.') if file_name in i]:
+ os.remove(i)
+
async def main() -> None:
application = ApplicationBuilder().token(TOKEN).build()
application.add_handler(CommandHandler('start', start))
application.add_handler(MessageHandler(filters.PHOTO, analyze_image))
application.add_handler(MessageHandler(filters.VIDEO, analyze_video))
+ application.add_handler(MessageHandler(filters.ANIMATION, analyze_gif))
+ application.add_handler(MessageHandler(filters.VIDEO_NOTE, analyze_video_note))
await application.run_polling()