#python #django
#python #django
Вопрос:
Я пытаюсь создать пользовательскую команду для django, с которой можно запускать, python manage.py cert_transparency
и я почти на месте, но у меня возникли небольшие проблемы. Цель этой команды — создать команду, работающую 24/7 в фоновом режиме, которую я просто запускаю в контейнере docker.
Я получаю это сообщение об ошибке
certificate_update: 0cert [00:00, ?cert/s]Traceback (most recent call last):
File "manage.py", line 15, in <module>
execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
utility.execute()
File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 330, in run_from_argv
self.execute(*args, **cmd_options)
File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 371, in execute
output = self.handle(*args, **options)
File "/src/scraper/management/commands/cert_transparency.py", line 184, in handle
certstream.listen_for_events(callback, url=certstream_url)
NameError: name 'callback' is not defined
По сути, я пытаюсь просто импортировать этот скрипт в качестве пользовательской команды управления в django.
cert_transparency.py:
from django.core.management.base import BaseCommand, CommandError
import re
import math
import certstream
import tqdm
import yaml
import time
import os
from Levenshtein import distance
from termcolor import colored, cprint
from tld import get_tld
from .confusables import unconfuse
class Command(BaseCommand):
help = 'Scrapes calidogs websocket for cert renewals and rates them.'
def score_domain(self, domain):
"""Score `domain`.
The highest score, the most probable `domain` is a phishing site.
Args:
domain (str): the domain to check.
Returns:
int: the score of `domain`.
"""
score = 0
for t in suspicious['tlds']:
if domain.endswith(t):
score = 20
# Remove initial '*.' for wildcard certificates bug
if domain.startswith('*.'):
domain = domain[2:]
# Removing TLD to catch inner TLD in subdomain (ie. paypal.com.domain.com)
try:
res = get_tld(domain, as_object=True, fail_silently=True, fix_protocol=True)
domain = '.'.join([res.subdomain, res.domain])
except Exception:
pass
# Higer entropy is kind of suspicious
#score = int(round(entropy(domain)*10))
# Remove lookalike characters using list from http://www.unicode.org/reports/tr39
domain = unconfuse(domain)
words_in_domain = re.split("W ", domain)
# ie. detect fake .com (ie. *.com-account-management.info)
if words_in_domain[0] in ['com', 'net', 'org']:
score = 10
# Testing keywords
for word in suspicious['keywords']:
if word in domain:
score = suspicious['keywords'][word]
# Testing Levenshtein distance for strong keywords (>= 70 points) (ie. paypol)
for key in [k for (k,s) in suspicious['keywords'].items() if s >= 70]:
# Removing too generic keywords (ie. mail.domain.com)
for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
if distance(str(word), str(key)) == 1:
score = 70
# Lots of '-' (ie. www.paypal-datacenter.com-acccount-alert.com)
if 'xn--' not in domain and domain.count('-') >= 4:
score = domain.count('-') * 3
# Deeply nested subdomains (ie. www.paypal.com.security.accountupdate.gq)
if domain.count('.') >= 3:
score = domain.count('.') * 3
return score
def callback(self, message, context):
"""Callback handler for certstream events."""
if message['message_type'] == "heartbeat":
return
if message['message_type'] == "certificate_update":
all_domains = message['data']['leaf_cert']['all_domains']
for domain in all_domains:
pbar.update(1)
score = score_domain(self, domain.lower())
# If issued from a free CA = more suspicious
if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
score = 10
if score >= 100:
self.stdout.write(tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score)))
elif score >= 90:
self.stdout.write(tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline']), score)))
elif score >= 80:
self.stdout.write(tqdm.tqdm.write(
"[!] Likely : "
"{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score)))
elif score >= 65:
self.stdout.write(tqdm.tqdm.write(
"[ ] Potential : "
"{} (score={})".format(colored(domain, attrs=['underline']), score)))
if score >= 75:
with open(log_suspicious, 'a') as f:
f.write("{}n".format(domain))
def callback(self, message, context):
"""Callback handler for certstream events."""
if message['message_type'] == "heartbeat":
return
if message['message_type'] == "certificate_update":
all_domains = message['data']['leaf_cert']['all_domains']
for domain in all_domains:
pbar.update(1)
score = score_domain(domain.lower())
# If issued from a free CA = more suspicious
if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
score = 10
if score >= 100:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
elif score >= 90:
tqdm.tqdm.write(
"[!] Suspicious: "
"{} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
elif score >= 80:
tqdm.tqdm.write(
"[!] Likely : "
"{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
elif score >= 65:
tqdm.tqdm.write(
"[ ] Potential : "
"{} (score={})".format(colored(domain, attrs=['underline']), score))
if score >= 75:
with open(log_suspicious, 'a') as f:
f.write("{}n".format(domain))
def handle(self, *args, **options):
certstream_url = 'wss://certstream.calidog.io'
log_suspicious = os.path.dirname(os.path.realpath(__file__)) '/suspicious_domains_' time.strftime("%Y-%m-%d") '.log'
suspicious_yaml = os.path.dirname(os.path.realpath(__file__)) '/suspicious.yaml'
external_yaml = os.path.dirname(os.path.realpath(__file__)) '/external.yaml'
pbar = tqdm.tqdm(desc='certificate_update', unit='cert')
with open(suspicious_yaml, 'r') as f:
suspicious = yaml.safe_load(f)
with open(external_yaml, 'r') as f:
external = yaml.safe_load(f)
if external['override_suspicious.yaml'] is True:
suspicious = external
else:
if external['keywords'] is not None:
suspicious['keywords'].update(external['keywords'])
if external['tlds'] is not None:
suspicious['tlds'].update(external['tlds'])
certstream.listen_for_events(callback, url=certstream_url)
Комментарии:
1. Так и должно быть
self.callback
, такcertstream.listen_for_events(callback, url=certstream_url)
. Однако вы определили дваcallback
метода?2. Упс, я скопировал код два раза. Спасибо, что указали на это! Я исправлю это и посмотрю, работает ли ваше решение.
Ответ №1:
Вы .callback(…)
дважды определили метод, поэтому я думаю, вам следует удалить один из двух. Вы можете передать ссылку на .callback(…)
метод с помощью self.callback
:
def handle(self, *args, **options):
# …
certstream.listen_for_events(self.callback, url=certstream_url)
Комментарии:
1. Вчера я забыл ответить. Ваше решение устранило мою проблему. Спасибо за это!