Аутентификация токена HTTP Flask как извлекать объекты, принадлежащие подписанному пользователю

#python #api #flask #http-token-authentication #flask-httpauth

#python #API #flask #http-token-аутентификация #flask-httpauth

Вопрос:

Я создаю API Flask, и я настроил логин с помощью HTTPBasicAuth и HTTPTokenAuth flask_httpauth. То есть сначала я отправляю имя пользователя и пароль, после чего API возвращает мне токен, и я отправляю этот токен в последующих вызовах API для аутентификации.

Допустим, у меня есть 2 таблицы, пользователи и прямоугольники. Таблица Rectangles имеет rectangle_id и user_id (и другие атрибуты, связанные с прямоугольником). Как мне настроить функциональность CRUD для таблицы Rectangles таким образом, чтобы только прямоугольники с соответствующим идентификатором пользователя из токена были обработаны в CRUD?

Для справки, код ниже (или на https://github.com/SamsonChoo/PythonCRUDApi ):

Модель пользователя

 class User(UserMixin, db.Model):
"""
Create a User table
"""

# Ensures table will be named in plural and not in singular
# as is the name of the model
__tablename__ = 'users'

user_id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(60), index=True,
                      unique=True, nullable=False)
email = db.Column(db.String(60), index=True, unique=True)
first_name = db.Column(db.String(60), index=True)
last_name = db.Column(db.String(60), index=True)
password_hash = db.Column(db.String(128), nullable=False)
token = db.Column(db.String(32), index=True, unique=True)
token_expiration = db.Column(db.DateTime)
rectangles = db.relationship(
    'Rectangles', cascade='all,delete', backref='users')

def __repr__(self):
    return '<User: {}>'.format(self.username)

def set_password(self, password):
    """
    Set password to a hashed password
    """
    self.password_hash = generate_password_hash(password)

def verify_password(self, password):
    """
    Check if hashed password matches actual password
    """
    return check_password_hash(self.password_hash, password)

def get_id(self):
    return (self.user_id)

def to_dict(self):
    data = {
        'user_id': self.user_id,
        'user_name': self.user_name,
        'email': self.email,
        'first_name': self.first_name,
        'last_name': self.last_name,
        '_links': {
            'self_by_user_name': url_for('api.get_user_by_user_name', user_name=self.user_name),
            'self_by_id': url_for('api.get_user_by_id', user_id=self.user_id)
        }
    }
    return data

def from_dict(self, data, new_user=False):
    for field in ['user_name', 'email', 'first_name', 'last_name']:
        if field in data and data[field]:
            setattr(self, field, data[field])
    if new_user and 'password' in data:
        self.set_password(data['password'])

def get_token(self, expires_in=3600):
    now = datetime.utcnow()
    if self.token and self.token_expiration > now   timedelta(seconds=60):
        return self.token
    self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
    self.token_expiration = now   timedelta(seconds=expires_in)
    db.session.add(self)
    return self.token

def revoke_token(self):
    self.token_expiration = datetime.utcnow() - timedelta(seconds=1)

@staticmethod
def check_token(token):
    user = User.query.filter_by(token=token).first()
    if user is None or user.token_expiration < datetime.utcnow():
        return None
    return user
 

Прямоугольная модель

 class Rectangle(db.Model):
"""
Create a Rectangle table
"""

# Ensures table will be named in plural and not in singular
# as is the name of the model
__tablename__ = 'rectangles'

rectangle_id = db.Column(db.Integer, primary_key=True)
length = db.Column(db.Integer)
width = db.Column(db.Integer)
user_id = db.Column(db.Integer, db.ForeignKey(
    'users.user_id'), nullable=False)

def __repr__(self):
    return '<Rectangle: {} x {}>'.format(self.length, self.width)

def get_area(self):
    return self.length * self.width

def get_perimeter(self):
    return (self.length   self.width) * 2
 

api аутентификации

 from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
from ..models.user import User
from .errors import error_response

basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()


@basic_auth.verify_password
def verify_password(user_name, password):
    user = User.query.filter_by(user_name=user_name).first()
    if user and user.verify_password(password):
        return user


@basic_auth.error_handler
def basic_auth_error(status):
    return error_response(status)


@token_auth.verify_token
def verify_token(token):
    return User.check_token(token) if token else None


@token_auth.error_handler
def token_auth_error(status):
    return error_response(status)
 

token api

 from flask import jsonify
from .. import db
from . import api
from .auth import basic_auth, token_auth


@api.route('/login', methods=['POST'])
@basic_auth.login_required
def get_token():
    token = basic_auth.current_user().get_token()
    db.session.commit()
    return jsonify({'token': token})


@api.route('/logout', methods=['DELETE'])
@token_auth.login_required
def revoke_token():
    token_auth.current_user().revoke_token()
    db.session.commit()
    return '', 204
 

пользовательский api

 from . import api
from flask import jsonify, request, url_for, abort
from validate_email import validate_email
import safe
from ..models.user import User
from .errors import bad_request
from .. import db
from .auth import token_auth


@api.route('/users/<string:user_name>', methods=['GET'])
@token_auth.login_required
def get_user_by_user_name(user_name):
    if token_auth.current_user().user_name != user_name:
        abort(403)
    return jsonify(User.query.filter_by(user_name=user_name).one().to_dict())


@api.route('/users/<int:user_id>', methods=['GET'])
@token_auth.login_required
def get_user_by_id(user_id):
    if token_auth.current_user().user_id != user_id:
        abort(403)
    return jsonify(User.query.get_or_404(user_id).to_dict())


@api.route('/users/register', methods=['POST'])
def create_user():
    data = request.get_json() or {}
    if 'user_name' not in data or 'password' not in data:
        return bad_request('must include user_name and password fields')
    if User.query.filter_by(user_name=data['user_name']).first():
        return bad_request('please use a different username')
    if 'email' in data:
        email_valid = validate_email(
            email_address=data['email'], check_regex=True, check_mx=False)
        if not email_valid:
            return bad_request('please enter a valid email address')
        if User.query.filter_by(email=data['email']).first():
            return bad_request('please use a different email address')
    password_strength = safe.check(data['password'])
    if not password_strength.valid:
        return bad_request('please enter a stronger password')
    user = User()
    user.from_dict(data, new_user=True)
    db.session.add(user)
    db.session.commit()
    response = jsonify(user.to_dict())
    response.status_code = 201
    response.headers['Location'] = [url_for(
        'api.get_user_by_user_name', user_name=user.user_name), url_for(
        'api.get_user_by_id', user_id=user.user_id)]
    return response


@api.route('/users/<int:user_id>', methods=['PUT'])
@token_auth.login_required
def update_user_by_id(user_id):
    if token_auth.current_user().user_id != user_id:
        abort(403)
    user = User.query.get_or_404(user_id)
    data = request.get_json() or {}
    return update_user_helper(user, data)


@api.route('/users/<string:user_name>', methods=['PUT'])
@token_auth.login_required
def update_user_by_user_name(user_name):
    if token_auth.current_user().user_name != user_name:
        abort(403)
    user = User.query.filter_by(user_name=user_name).one()
    data = request.get_json() or {}
    return update_user_helper(user, data)


def update_user_helper(user, data):
    if 'user_name' in data and data['user_name'] != user.user_name and User.query.filter_by(user_name=data['user_name']).first():
        return bad_request('please use a different username')
    if 'email' in data and data['email'] != user.email and User.query.filter_by(email=data['email']).first():
        return bad_request('please use a different email address')
    if 'password' in data and not safe.check(data['password'].valid):
        return bad_request('please enter a stronger password')
    user.from_dict(data, new_user=False)
    db.session.commit()
    return jsonify(user.to_dict())


@api.route('/users/<string:user_name>', methods=['DELETE'])
@token_auth.login_required
def del_user_by_user_name(user_name):
    if token_auth.current_user().user_name != user_name:
        abort(403)
    db.session.delete(User.query.filter_by(user_name=user_name).one())
    db.session.commit()
    return '', 204


@api.route('/users/<int:user_id>', methods=['DELETE'])
@token_auth.login_required
def del_user_by_user_id(user_id):
    if token_auth.current_user().user_id != user_id:
        abort(403)
    db.session.delete(User.query.get_or_404(user_id))
    db.session.commit()
    return '', 204