How to use OAuth2 password , Bearer with JWT Token in Python FastAPI

  1. Install passlib  pip install “passlib[bcrypt]”  or py -m pip install “passlib[bcrypt]”

2. To check passlib install or not use “pip freeze” or “py -m pip freeze”

3. Import Passlib in main.py

from passlib.context import CryptContext

4. Now define passlib hashing algorithm in main.py

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

5 Now inside user registration function use above algorithm

  #hash  the password - user.password
    hashed_password = pwd_context.hash(user.password)
    user.password = hashed_password

6. Run API using Postman and check Database

Complete Code

main.py

from typing import Optional
from fastapi import FastAPI, Response, status, HTTPException, Depends
from fastapi.params import Body
from pydantic import BaseModel
from passlib.context import CryptContext
from random import randrange
import psycopg2
from psycopg2.extras import RealDictCursor
import time
from sqlalchemy.orm import Session
from .import models, schemas, utils
from .database import engine, SessionLocal, get_db
from .routers import post, users, auth

# pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
models.Base.metadata.create_all(bind=engine)

app = FastAPI()

# class Post(BaseModel):
#     title:str
#     content:str
#     published : bool =True

app.include_router(post.router)
app.include_router(users.router)
app.include_router(auth.router)

 

models.py

# from datetime import datetime
from sqlalchemy import Column, Integer, String, Boolean, TIMESTAMP, text
from sqlalchemy.sql.expression import null
from .database import Base


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, nullable=False)
    email = Column(String, nullable=False, unique=True)
    password = Column(String, nullable=False)
    created_at = Column(TIMESTAMP(timezone=True), nullable=False, server_default=text('now()'))

   

utills.py

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

#create function for hash password
def hashPassword(password: str):
    return pwd_context.hash(password)

def verify(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

schemas.py

from pydantic import BaseModel, EmailStr
from datetime import datetime

class UserCreate(BaseModel):
    email:EmailStr
    password:str

class UserOut(BaseModel):
    id: int
    email: EmailStr
    created_at: datetime

    class config:
        orm_mode = True

class UserLogin(BaseModel):
    email: EmailStr
    password: str

oauth.py

from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict):
    to_enceode = data.copy()

    expire = datetime.now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_enceode.update({"exp": expire})

    encode_jwt = jwt.encode(to_enceode, SECRET_KEY, algorithm=ALGORITHM)

    return encode_jwt

database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# SQLALCHEMY_DATABASE_URL = 'postgresql://<username>:<password>@<ip-address/hostname>/<database_name>'
SQLALCHEMY_DATABASE_URL = 'postgresql://postgres:123456789@localhost/fastapi_ORM_DB'

engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

routers/users.py

from fastapi import FastAPI, Response, status, HTTPException, Depends, APIRouter
from sqlalchemy.orm import Session
from .. import models, schemas, utils
from ..database import engine, SessionLocal, get_db

router = APIRouter()

#Create User pwd_context

@router.post("/users", status_code=status.HTTP_201_CREATED, response_model=schemas.UserOut)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):

    #hash  the password - user.password
    # hashed_password = pwd_context.hash(user.password)
     hashed_password = utils.hashPassword(user.password)
     user.password = hashed_password

     new_user = models.User(**user.dict())
     db.add(new_user)
     db.commit()
     db.refresh(new_user)

     return new_user


@router.get("/users/{id}", response_model=schemas.UserOut)
async def get_user(id: int, db: Session = Depends(get_db)):
    get_user = db.query(models.User).filter(models.User.id == id).first()
    if not get_user:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"user with {id} does not exist")
    return get_user 

keep learning 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *