#python #api #flask #http-token-authentication #flask-httpauth
#python #API #flask #http-token-аутентификация #flask-httpauth
Вопрос:
Я создаю API Flask, и я настроил логин с помощью HTTPBasicAuth и HTTPTokenAuth flask_httpauth. То есть сначала я отправляю имя пользователя и пароль, после чего API возвращает мне токен, и я отправляю этот токен в последующих вызовах API для аутентификации.
Допустим, у меня есть 2 таблицы, пользователи и прямоугольники. Таблица Rectangles имеет rectangle_id и user_id (и другие атрибуты, связанные с прямоугольником). Как мне настроить функциональность CRUD для таблицы Rectangles таким образом, чтобы только прямоугольники с соответствующим идентификатором пользователя из токена были обработаны в CRUD?
Для справки, код ниже (или на https://github.com/SamsonChoo/PythonCRUDApi ):
Модель пользователя
class User(UserMixin, db.Model):
"""
Create a User table
"""
# Ensures table will be named in plural and not in singular
# as is the name of the model
__tablename__ = 'users'
user_id = db.Column(db.Integer, primary_key=True)
user_name = db.Column(db.String(60), index=True,
unique=True, nullable=False)
email = db.Column(db.String(60), index=True, unique=True)
first_name = db.Column(db.String(60), index=True)
last_name = db.Column(db.String(60), index=True)
password_hash = db.Column(db.String(128), nullable=False)
token = db.Column(db.String(32), index=True, unique=True)
token_expiration = db.Column(db.DateTime)
rectangles = db.relationship(
'Rectangles', cascade='all,delete', backref='users')
def __repr__(self):
return '<User: {}>'.format(self.username)
def set_password(self, password):
"""
Set password to a hashed password
"""
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
"""
Check if hashed password matches actual password
"""
return check_password_hash(self.password_hash, password)
def get_id(self):
return (self.user_id)
def to_dict(self):
data = {
'user_id': self.user_id,
'user_name': self.user_name,
'email': self.email,
'first_name': self.first_name,
'last_name': self.last_name,
'_links': {
'self_by_user_name': url_for('api.get_user_by_user_name', user_name=self.user_name),
'self_by_id': url_for('api.get_user_by_id', user_id=self.user_id)
}
}
return data
def from_dict(self, data, new_user=False):
for field in ['user_name', 'email', 'first_name', 'last_name']:
if field in data and data[field]:
setattr(self, field, data[field])
if new_user and 'password' in data:
self.set_password(data['password'])
def get_token(self, expires_in=3600):
now = datetime.utcnow()
if self.token and self.token_expiration > now timedelta(seconds=60):
return self.token
self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
self.token_expiration = now timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def revoke_token(self):
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
@staticmethod
def check_token(token):
user = User.query.filter_by(token=token).first()
if user is None or user.token_expiration < datetime.utcnow():
return None
return user
Прямоугольная модель
class Rectangle(db.Model):
"""
Create a Rectangle table
"""
# Ensures table will be named in plural and not in singular
# as is the name of the model
__tablename__ = 'rectangles'
rectangle_id = db.Column(db.Integer, primary_key=True)
length = db.Column(db.Integer)
width = db.Column(db.Integer)
user_id = db.Column(db.Integer, db.ForeignKey(
'users.user_id'), nullable=False)
def __repr__(self):
return '<Rectangle: {} x {}>'.format(self.length, self.width)
def get_area(self):
return self.length * self.width
def get_perimeter(self):
return (self.length self.width) * 2
api аутентификации
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
from ..models.user import User
from .errors import error_response
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()
@basic_auth.verify_password
def verify_password(user_name, password):
user = User.query.filter_by(user_name=user_name).first()
if user and user.verify_password(password):
return user
@basic_auth.error_handler
def basic_auth_error(status):
return error_response(status)
@token_auth.verify_token
def verify_token(token):
return User.check_token(token) if token else None
@token_auth.error_handler
def token_auth_error(status):
return error_response(status)
token api
from flask import jsonify
from .. import db
from . import api
from .auth import basic_auth, token_auth
@api.route('/login', methods=['POST'])
@basic_auth.login_required
def get_token():
token = basic_auth.current_user().get_token()
db.session.commit()
return jsonify({'token': token})
@api.route('/logout', methods=['DELETE'])
@token_auth.login_required
def revoke_token():
token_auth.current_user().revoke_token()
db.session.commit()
return '', 204
пользовательский api
from . import api
from flask import jsonify, request, url_for, abort
from validate_email import validate_email
import safe
from ..models.user import User
from .errors import bad_request
from .. import db
from .auth import token_auth
@api.route('/users/<string:user_name>', methods=['GET'])
@token_auth.login_required
def get_user_by_user_name(user_name):
if token_auth.current_user().user_name != user_name:
abort(403)
return jsonify(User.query.filter_by(user_name=user_name).one().to_dict())
@api.route('/users/<int:user_id>', methods=['GET'])
@token_auth.login_required
def get_user_by_id(user_id):
if token_auth.current_user().user_id != user_id:
abort(403)
return jsonify(User.query.get_or_404(user_id).to_dict())
@api.route('/users/register', methods=['POST'])
def create_user():
data = request.get_json() or {}
if 'user_name' not in data or 'password' not in data:
return bad_request('must include user_name and password fields')
if User.query.filter_by(user_name=data['user_name']).first():
return bad_request('please use a different username')
if 'email' in data:
email_valid = validate_email(
email_address=data['email'], check_regex=True, check_mx=False)
if not email_valid:
return bad_request('please enter a valid email address')
if User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
password_strength = safe.check(data['password'])
if not password_strength.valid:
return bad_request('please enter a stronger password')
user = User()
user.from_dict(data, new_user=True)
db.session.add(user)
db.session.commit()
response = jsonify(user.to_dict())
response.status_code = 201
response.headers['Location'] = [url_for(
'api.get_user_by_user_name', user_name=user.user_name), url_for(
'api.get_user_by_id', user_id=user.user_id)]
return response
@api.route('/users/<int:user_id>', methods=['PUT'])
@token_auth.login_required
def update_user_by_id(user_id):
if token_auth.current_user().user_id != user_id:
abort(403)
user = User.query.get_or_404(user_id)
data = request.get_json() or {}
return update_user_helper(user, data)
@api.route('/users/<string:user_name>', methods=['PUT'])
@token_auth.login_required
def update_user_by_user_name(user_name):
if token_auth.current_user().user_name != user_name:
abort(403)
user = User.query.filter_by(user_name=user_name).one()
data = request.get_json() or {}
return update_user_helper(user, data)
def update_user_helper(user, data):
if 'user_name' in data and data['user_name'] != user.user_name and User.query.filter_by(user_name=data['user_name']).first():
return bad_request('please use a different username')
if 'email' in data and data['email'] != user.email and User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
if 'password' in data and not safe.check(data['password'].valid):
return bad_request('please enter a stronger password')
user.from_dict(data, new_user=False)
db.session.commit()
return jsonify(user.to_dict())
@api.route('/users/<string:user_name>', methods=['DELETE'])
@token_auth.login_required
def del_user_by_user_name(user_name):
if token_auth.current_user().user_name != user_name:
abort(403)
db.session.delete(User.query.filter_by(user_name=user_name).one())
db.session.commit()
return '', 204
@api.route('/users/<int:user_id>', methods=['DELETE'])
@token_auth.login_required
def del_user_by_user_id(user_id):
if token_auth.current_user().user_id != user_id:
abort(403)
db.session.delete(User.query.get_or_404(user_id))
db.session.commit()
return '', 204