пользовательская команда django на основе скрипта python

#python #django

#python #django

Вопрос:

Я пытаюсь создать пользовательскую команду для django, с которой можно запускать, python manage.py cert_transparency и я почти на месте, но у меня возникли небольшие проблемы. Цель этой команды — создать команду, работающую 24/7 в фоновом режиме, которую я просто запускаю в контейнере docker.

Я получаю это сообщение об ошибке

 certificate_update: 0cert [00:00, ?cert/s]Traceback (most recent call last):
  File "manage.py", line 15, in <module>
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 330, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python3.7/site-packages/django/core/management/base.py", line 371, in execute
    output = self.handle(*args, **options)
  File "/src/scraper/management/commands/cert_transparency.py", line 184, in handle
    certstream.listen_for_events(callback, url=certstream_url)
NameError: name 'callback' is not defined
  

По сути, я пытаюсь просто импортировать этот скрипт в качестве пользовательской команды управления в django.

Структура каталогов

cert_transparency.py:

 from django.core.management.base import BaseCommand, CommandError
import re
import math

import certstream
import tqdm
import yaml
import time
import os
from Levenshtein import distance
from termcolor import colored, cprint
from tld import get_tld
from .confusables import unconfuse


class Command(BaseCommand):
    help = 'Scrapes calidogs websocket for cert renewals and rates them.'
    
    def score_domain(self, domain):
        """Score `domain`.

    The highest score, the most probable `domain` is a phishing site.

    Args:
        domain (str): the domain to check.

    Returns:
        int: the score of `domain`.
        """
        score = 0
        for t in suspicious['tlds']:
            if domain.endswith(t):
                score  = 20

    # Remove initial '*.' for wildcard certificates bug
        if domain.startswith('*.'):
            domain = domain[2:]

    # Removing TLD to catch inner TLD in subdomain (ie. paypal.com.domain.com)
        try:
            res = get_tld(domain, as_object=True, fail_silently=True, fix_protocol=True)
            domain = '.'.join([res.subdomain, res.domain])
        except Exception:
            pass

    # Higer entropy is kind of suspicious
    #score  = int(round(entropy(domain)*10))

    # Remove lookalike characters using list from http://www.unicode.org/reports/tr39
        domain = unconfuse(domain)

        words_in_domain = re.split("W ", domain)

    # ie. detect fake .com (ie. *.com-account-management.info)
        if words_in_domain[0] in ['com', 'net', 'org']:
            score  = 10

    # Testing keywords
        for word in suspicious['keywords']:
            if word in domain:
                score  = suspicious['keywords'][word]

    # Testing Levenshtein distance for strong keywords (>= 70 points) (ie. paypol)
        for key in [k for (k,s) in suspicious['keywords'].items() if s >= 70]:
        # Removing too generic keywords (ie. mail.domain.com)
            for word in [w for w in words_in_domain if w not in ['email', 'mail', 'cloud']]:
                if distance(str(word), str(key)) == 1:
                    score  = 70

    # Lots of '-' (ie. www.paypal-datacenter.com-acccount-alert.com)
        if 'xn--' not in domain and domain.count('-') >= 4:
            score  = domain.count('-') * 3

    # Deeply nested subdomains (ie. www.paypal.com.security.accountupdate.gq)
        if domain.count('.') >= 3:
            score  = domain.count('.') * 3

        return score


    def callback(self, message, context):
        """Callback handler for certstream events."""
        if message['message_type'] == "heartbeat":
            return

        if message['message_type'] == "certificate_update":
            all_domains = message['data']['leaf_cert']['all_domains']

            for domain in all_domains:
                pbar.update(1)
                score = score_domain(self, domain.lower())

            # If issued from a free CA = more suspicious
                if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
                    score  = 10

                if score >= 100:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score)))
                elif score >= 90:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline']), score)))
                elif score >= 80:
                    self.stdout.write(tqdm.tqdm.write(
                    "[!] Likely    : "
                    "{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score)))
                elif score >= 65:
                    self.stdout.write(tqdm.tqdm.write(
                    "[ ] Potential : "
                    "{} (score={})".format(colored(domain, attrs=['underline']), score)))

                if score >= 75:
                    with open(log_suspicious, 'a') as f:
                        f.write("{}n".format(domain))

    def callback(self, message, context):
        """Callback handler for certstream events."""
        if message['message_type'] == "heartbeat":
            return

        if message['message_type'] == "certificate_update":
            all_domains = message['data']['leaf_cert']['all_domains']

            for domain in all_domains:
                pbar.update(1)
                score = score_domain(domain.lower())

            # If issued from a free CA = more suspicious
                if "Let's Encrypt" in message['data']['chain'][0]['subject']['aggregated']:
                    score  = 10

                if score >= 100:
                    tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline', 'bold']), score))
                elif score >= 90:
                    tqdm.tqdm.write(
                    "[!] Suspicious: "
                    "{} (score={})".format(colored(domain, 'red', attrs=['underline']), score))
                elif score >= 80:
                    tqdm.tqdm.write(
                    "[!] Likely    : "
                    "{} (score={})".format(colored(domain, 'yellow', attrs=['underline']), score))
                elif score >= 65:
                    tqdm.tqdm.write(
                    "[ ] Potential : "
                    "{} (score={})".format(colored(domain, attrs=['underline']), score))

                if score >= 75:
                    with open(log_suspicious, 'a') as f:
                        f.write("{}n".format(domain))

                        
    def handle(self, *args, **options):
        certstream_url = 'wss://certstream.calidog.io'

        log_suspicious = os.path.dirname(os.path.realpath(__file__)) '/suspicious_domains_' time.strftime("%Y-%m-%d") '.log'

        suspicious_yaml = os.path.dirname(os.path.realpath(__file__)) '/suspicious.yaml'

        external_yaml = os.path.dirname(os.path.realpath(__file__)) '/external.yaml'

        pbar = tqdm.tqdm(desc='certificate_update', unit='cert')



        with open(suspicious_yaml, 'r') as f:
            suspicious = yaml.safe_load(f)

        with open(external_yaml, 'r') as f:
            external = yaml.safe_load(f)

        if external['override_suspicious.yaml'] is True:
            suspicious = external
        else:
            if external['keywords'] is not None:
                suspicious['keywords'].update(external['keywords'])

            if external['tlds'] is not None:
                suspicious['tlds'].update(external['tlds'])

        certstream.listen_for_events(callback, url=certstream_url)
  

Комментарии:

1. Так и должно быть self.callback , так certstream.listen_for_events(callback, url=certstream_url) . Однако вы определили два callback метода?

2. Упс, я скопировал код два раза. Спасибо, что указали на это! Я исправлю это и посмотрю, работает ли ваше решение.

Ответ №1:

Вы .callback(…) дважды определили метод, поэтому я думаю, вам следует удалить один из двух. Вы можете передать ссылку на .callback(…) метод с помощью self.callback :

 def handle(self, *args, **options):
    # …
    certstream.listen_for_events(self.callback, url=certstream_url)  

Комментарии:

1. Вчера я забыл ответить. Ваше решение устранило мою проблему. Спасибо за это!