How to use OAuth2 password , Bearer with JWT Token in Python FastAPI
- Install passlib pip install “passlib[bcrypt]” or py -m pip install “passlib[bcrypt]”
2. To check passlib install or not use “pip freeze” or “py -m pip freeze”
3. Import Passlib in main.py
from passlib.context import CryptContext
4. Now define passlib hashing algorithm in main.py
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
5 Now inside user registration function use above algorithm
#hash the password - user.password
hashed_password = pwd_context.hash(user.password)
user.password = hashed_password
6. Run API using Postman and check Database
Complete Code
main.py
from typing import Optional
from fastapi import FastAPI, Response, status, HTTPException, Depends
from fastapi.params import Body
from pydantic import BaseModel
from passlib.context import CryptContext
from random import randrange
import psycopg2
from psycopg2.extras import RealDictCursor
import time
from sqlalchemy.orm import Session
from .import models, schemas, utils
from .database import engine, SessionLocal, get_db
from .routers import post, users, auth
# pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# class Post(BaseModel):
# title:str
# content:str
# published : bool =True
app.include_router(post.router)
app.include_router(users.router)
app.include_router(auth.router)
models.py
# from datetime import datetime
from sqlalchemy import Column, Integer, String, Boolean, TIMESTAMP, text
from sqlalchemy.sql.expression import null
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, nullable=False)
email = Column(String, nullable=False, unique=True)
password = Column(String, nullable=False)
created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=text('now()'))
utills.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
#create function for hash password
def hashPassword(password: str):
return pwd_context.hash(password)
def verify(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
class UserCreate(BaseModel):
email:EmailStr
password:str
class UserOut(BaseModel):
id: int
email: EmailStr
created_at: datetime
class config:
orm_mode = True
class UserLogin(BaseModel):
email: EmailStr
password: str
oauth.py
from jose import JWTError, jwt
from datetime import datetime, timedelta
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_enceode = data.copy()
expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_enceode.update({"exp": expire})
encode_jwt = jwt.encode(to_enceode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# SQLALCHEMY_DATABASE_URL = 'postgresql://<username>:<password>@<ip-address/hostname>/<database_name>'
SQLALCHEMY_DATABASE_URL = 'postgresql://postgres:123456789@localhost/fastapi_ORM_DB'
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
routers/users.py
from fastapi import FastAPI, Response, status, HTTPException, Depends, APIRouter
from sqlalchemy.orm import Session
from .. import models, schemas, utils
from ..database import engine, SessionLocal, get_db
router = APIRouter()
#Create User pwd_context
@router.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
#hash the password - user.password
# hashed_password = pwd_context.hash(user.password)
hashed_password = utils.hashPassword(user.password)
user.password = hashed_password
new_user = models.User(**user.dict())
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
@router.get("/users/{id}", response_model=schemas.UserOut)
async def get_user(id: int, db: Session = Depends(get_db)):
get_user = db.query(models.User).filter(models.User.id == id).first()
if not get_user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"user with {id} does not exist")
return get_user
keep learning 🙂