Authentication is a very part of any rest api. Authentication means certain routes are protected via login and certain routes are protected via roles.
We will use jwt token to our authentication purposes and use the flask library https://flask-jwt-extended.readthedocs.io/en/latest/
If you are not familiar with what is JWT token, its best to google about it as there would be many resources online.
Let’s start by installing the token in our app
pip install flask-jwt-extended
Next follow this guide https://flask-jwt-extended.readthedocs.io/en/latest/basic_usage.html and setup the basic route for login/ and protected/ route its very straight forward.
Next test it on postman as follows
and now next lets test the protected route
So this basic authentication system works well.
In a more real world auth system, we would have a registration route, users table, roles, permissions and database. Let’s try to implement all these step by step for now without the database.
User Registration
Let’s create a simple route to register a user, we will encrypt password and generate a unique id for every user. Store users in a list.
We will be using some new packages in this e.g passlib so that needs to be installed first
pip install passlib
Here is the code
users = []
@app.route('/register', methods=['POST'])
def register():
if not request.json:
abort(500)
username = request.json.get("username", None)
password = request.json.get("password", None)
name = request.json.get("name", None)
if username is None or password is None or name is None:
abort(500)
global users
users = [user for user in users if user["username"] == username]
if len(users) > 0:
return jsonify("username already exists!") , 500
id = uuid.uuid4().hex
hash = pbkdf2_sha256.hash(password)
users.append({
"id" : id,
"username" : username,
"password" : hash,
"name" : name
})
return jsonify(id)
all good, real easy!
LOGIN
@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify(msg="Missing JSON in request"), 500
username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify(msg="Missing username parameter"), 500
if not password:
return jsonify(msg="Missing password parameter"), 500
user = [user for user in users if user["username"] == username]
if len(user) == 0:
return jsonify("username failed!"), 400
if not pbkdf2_sha256.verify(password, user[0]["password"]):
return jsonify("password failed!"), 400
access_token = create_access_token(identity=user[0]["id"])
return jsonify(access_token=access_token), 200
In the above login code, we are using user id to create access token, but many times its much easier simply use a user object to create access token and code looks much readable. To do that we need make a custom function to create access token as described here https://flask-jwt-extended.readthedocs.io/en/latest/tokens_from_complex_object.html
Let’s see how the code looks
# Create a function that will be called whenever create_access_token
# is used. It will take whatever object is passed into the
# create_access_token method, and lets us define what custom claims
# should be added to the access token.
@jwt.user_identity_loader
def user_identity_lookup(user):
return user["id"]
@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify(msg="Missing JSON in request"), 500
username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify({msg="Missing username parameter"), 500
if not password:
return jsonify({msg="Missing password parameter"), 500
global users
users = [user for user in users if user["username"] == username]
if len(users) == 0:
return jsonify("username failed!"), 400
user = users[0]
if not pbkdf2_sha256.verify(password, user["password"]):
return jsonify("password failed!"), 400
access_token = create_access_token(identity=user)
return jsonify(access_token=access_token), 200
Profile
Now let’s look at the profile/ route, this will be a protected route to access data
# Protect a view with jwt_required, which requires a valid access token
# in the request to access.
@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
return jsonify(logged_in_as=current_user), 200
This looks good, except we need a generic method to load full user data from access key. As almost all protected routes would need user data, so it needs to be a generic global function.
Let’s see how to do it
@jwt.user_loader_callback_loader
def user_loader_callback(identity):
print(identity)
global users
print(users)
users = [user for user in users if user["id"] == identity]
print(users)
if len(users) > 0:
del users[0]["password"]
return users[0]
return []
# Protect a view with jwt_required, which requires a valid access token
# in the request to access.
@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_current_user
current_user = get_current_user()
return jsonify(logged_in_as=current_user), 200
ROLES
Now we need to assign different user roles to user, like assume we have only two roles for now ADMIN and NORMAL
For now, we will just put roles based on username instead of having an extra column for roles.
def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
verify_jwt_in_request()
user = get_current_user()
if user["username"] == "manish":
return fn(*args, **kwargs)
return jsonify(msg='Admins only!'), 403
return wrapper
@app.route("/admin_only")
@jwt_required
@admin_required
def admin_only():
return ""
At this stage we have most of the things need to setup authorization with rest api.
At good assignment would be integrate the authorization with todo api’s. The way to setup that would be
- /todo GET to have optional authorization i.e if user is logged in he can see only his tasks, if he is admin user then he can see tasks of all users and if its not logged in at all it should return only the latest 5 tasks
- /todo DELETE, PUT, POST can only be done via logged in user and admin can delete anyone’s task
- /todo can only marked done by admin
Its best to do this fully on your own, i will add full code reference below
import datetime
from flask import Flask, jsonify, abort, request
from flask_jwt_extended import (
JWTManager, jwt_required, create_access_token,
get_jwt_identity, get_current_user, verify_jwt_in_request,
jwt_optional
)
from passlib.hash import pbkdf2_sha256
from functools import wraps
import uuid
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'xxxxxxxxxxxxxx' # Change this!
jwt = JWTManager(app)
tasks = [
{
'id': 1,
'title': 'Buy groceries',
'description': 'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False,
'due': datetime.datetime.now()
},
{
'id': 2,
'title': 'Learn Python',
'description': 'Need to find a good Python tutorial on the web',
'done': False,
'due': datetime.datetime.now()
}
]
users = []
@app.route('/register', methods=['POST'])
def register():
if not request.json:
abort(500)
username = request.json.get("username", None)
password = request.json.get("password", None)
name = request.json.get("name", None)
if username is None or password is None or name is None:
abort(500)
global users
users = [user for user in users if user["username"] == username]
if len(users) > 0:
return jsonify("username already exists!"), 500
id = uuid.uuid4().hex
hash = pbkdf2_sha256.hash(password)
users.append({
"id": id,
"username": username,
"password": hash,
"name": name
})
return jsonify(id)
# Create a function that will be called whenever create_access_token
# is used. It will take whatever object is passed into the
# create_access_token method, and lets us define what custom claims
# should be added to the access token.
@jwt.user_identity_loader
def user_identity_lookup(user):
return user["id"]
@app.route('/login', methods=['POST'])
def login():
if not request.is_json:
return jsonify({msg="Missing JSON in request"), 500
username = request.json.get('username', None)
password = request.json.get('password', None)
if not username:
return jsonify({msg="Missing username parameter"), 500
if not password:
return jsonify(msg="Missing password parameter"), 500
global users
users = [user for user in users if user["username"] == username]
if len(users) == 0:
return jsonify("username failed!"), 400
user = users[0]
if not pbkdf2_sha256.verify(password, user["password"]):
return jsonify("password failed!"), 400
access_token = create_access_token(identity=user)
return jsonify(access_token=access_token), 200
@jwt.user_loader_callback_loader
def user_loader_callback(identity):
global users
users = [user for user in users if user["id"] == identity]
if len(users) > 0:
if users[0]["username"] == "manish":
users[0]["role"] = "admin"
else:
users[0]["role"] = "normal"
return users[0]
return {}
# Protect a view with jwt_required, which requires a valid access token
# in the request to access.
@app.route('/profile', methods=['GET'])
@jwt_required
def profile():
# Access the identity of the current user with get_current_user
current_user = get_current_user()
return jsonify(logged_in_as=current_user), 200
def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
verify_jwt_in_request()
user = get_current_user()
if user["username"] == "manish":
return fn(*args, **kwargs)
return jsonify(msg='Admins only!'), 403
return wrapper
@app.route("/admin_only")
@jwt_required
@admin_required
def admin_only():
return ""
def sort_due_date(x):
return x["due"]
@app.route('/todo', methods=["GET"])
@app.route('/todo/<string:direction>', methods=["GET"])
@jwt_optional
def todo(direction=None):
# direction is optional
current_user = get_current_user()
print(current_user)
if direction == "ASC":
direction = True
else:
direction = False
global tasks
if direction is not None:
tasks.sort(reverse=direction, key=sort_due_date)
if current_user is not None and "id" in current_user:
if current_user["role"] == "normal":
tasks = [task for task in tasks if task["user"] == current_user["id"] ]
else:
tasks = [task for i, task, in enumerate(tasks) if i < 5 ]
#https://stackoverflow.com/questions/41003206/getting-index-with-list-comprehension-in-a-list-in-python
return jsonify(tasks)
@app.route('/todo', methods=["POST"])
@jwt_required
def add_todo():
if not request.json:
abort(500)
title = request.json.get("title", None)
desc = request.json.get("description", "")
due = request.json.get("due", None)
if due is not None:
due = datetime.datetime.strptime(due, "%d-%m-%Y")
else:
due = datetime.datetime.now()
current_user = get_current_user()
global tasks
tasks.append({
"id": len(tasks) + 1,
"title": title,
"description": desc,
"done": False,
"due": due,
"user" : current_user["id"]
})
return jsonify(len(tasks))
def update(task, task_id, data):
if task["id"] == task_id:
if 'title' in data:
task["title"] = data['title']
if 'description' in data:
task["description"] = data["description"]
return task
@app.route("/todo/<int:id>", methods=['PUT'])
@jwt_required
def update_todo(id):
if not request.json:
abort(500)
todo_id = request.json.get("id", None)
title = request.json.get("title", None)
desc = request.json.get("description", "")
if todo_id is None or title is None:
return jsonify(message="Invalid Request"), 500
global tasks
tasks = [update(task, id, request.json) for task in tasks]
return jsonify(tasks)
@app.route("/todo/<int:id>", methods=["DELETE"])
def delete_todo(id):
global tasks
tasks = [task for task in tasks if task["id"] != id]
return jsonify(tasks)
def mark(task, status, task_id):
if task_id == task["id"]:
task["done"] = status
return task
@app.route("/todo/mark/<int:task_id>/<int:status>", methods=["PUT"])
@jwt_required
@admin_required
def mark_task(task_id, status):
global tasks
if status == 1:
status = True
else:
status = False
tasks = [mark(task, status, task_id) for task in tasks]
return jsonify(tasks)