SQLAlchemy ORM Essentials
SQLAlchemy is the most popular and powerful ORM (Object-Relational Mapping) library for Python. It allows you to work with databases using Python objects instead of writing raw SQL.
What is SQLAlchemy?
SQLAlchemy provides a full suite of enterprise-level persistence patterns, designed for efficient and high-performing database access.
Key Features:
Why Use SQLAlchemy?
Installation
# Basic installation
pip install sqlalchemy
# With async support
pip install sqlalchemy[asyncio]
# With PostgreSQL driver
pip install sqlalchemy psycopg2-binary
# With MySQL driver
pip install sqlalchemy pymysql
# For migrations
pip install alembic
Getting Started
Database Connection
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# SQLite (file-based)
engine = create_engine('sqlite:///example.db', echo=True)
# PostgreSQL
engine = create_engine('postgresql://user:password@localhost/dbname')
# MySQL
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# Create session factory
Session = sessionmaker(bind=engine)
session = Session()
Your First Model
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, Session
# Create base class
Base = declarative_base()
# Define model
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
# Create tables
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
# Create session
session = Session(engine)
# Create a user
user = User(username='john_doe', email='john@example.com')
session.add(user)
session.commit()
print(user.id) # Auto-generated ID
Defining Models
Column Types
from sqlalchemy import (
Column, Integer, String, Float, Boolean,
Date, DateTime, Time, Text, JSON, LargeBinary
)
from datetime import datetime
class Product(Base):
__tablename__ = 'products'
# Integer types
id = Column(Integer, primary_key=True)
quantity = Column(Integer, default=0)
# String types
name = Column(String(100), nullable=False)
sku = Column(String(50), unique=True)
description = Column(Text)
# Numeric types
price = Column(Float)
# Boolean
is_active = Column(Boolean, default=True)
# Date/Time
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
release_date = Column(Date)
# JSON (requires PostgreSQL, MySQL 5.7+, or SQLite 3.9+)
metadata = Column(JSON)
# Binary data
image = Column(LargeBinary)
Column Constraints
from sqlalchemy import Column, Integer, String, CheckConstraint
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
# Not null
username = Column(String(50), nullable=False)
# Unique constraint
email = Column(String(100), unique=True)
# Default value
status = Column(String(20), default='active')
# Check constraint
age = Column(Integer, CheckConstraint('age >= 18'))
# Index
phone = Column(String(20), index=True)
# Auto-incrementing
id = Column(Integer, primary_key=True, autoincrement=True)
Table Configuration
from sqlalchemy import Column, Integer, String, Index, UniqueConstraint
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
first_name = Column(String(50))
last_name = Column(String(50))
email = Column(String(100))
company = Column(String(100))
# Table-level constraints
__table_args__ = (
# Composite unique constraint
UniqueConstraint('email', 'company', name='uq_email_company'),
# Composite index
Index('idx_name', 'first_name', 'last_name'),
# Additional configuration
{'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8mb4'}
)
CRUD Operations
Create (Insert)
# Single record
user = User(username='alice', email='alice@example.com')
session.add(user)
session.commit()
# Multiple records
users = [
User(username='bob', email='bob@example.com'),
User(username='charlie', email='charlie@example.com'),
User(username='diana', email='diana@example.com')
]
session.add_all(users)
session.commit()
# Bulk insert (more efficient)
session.bulk_insert_mappings(User, [
{'username': 'eve', 'email': 'eve@example.com'},
{'username': 'frank', 'email': 'frank@example.com'}
])
session.commit()
Read (Query)
# Get all records
users = session.query(User).all()
# Get first record
user = session.query(User).first()
# Get by primary key
user = session.get(User, 1)
# Filter
users = session.query(User).filter(User.username == 'alice').all()
user = session.query(User).filter_by(username='alice').first()
# Multiple conditions
users = session.query(User).filter(
User.username == 'alice',
User.email.like('%@example.com')
).all()
# Order by
users = session.query(User).order_by(User.username).all()
users = session.query(User).order_by(User.created_at.desc()).all()
# Limit and offset
users = session.query(User).limit(10).offset(20).all()
# Count
count = session.query(User).count()
# Check existence
exists = session.query(User).filter(User.username == 'alice').first() is not None
# Or more efficiently
exists = session.query(User.id).filter(User.username == 'alice').scalar() is not None
Update
# Update single record
user = session.get(User, 1)
user.email = 'newemail@example.com'
session.commit()
# Update with query
session.query(User).filter(User.username == 'alice').update({
'email': 'alice_new@example.com'
})
session.commit()
# Bulk update
session.query(User).filter(User.status == 'pending').update({
'status': 'active'
})
session.commit()
Delete
# Delete single record
user = session.get(User, 1)
session.delete(user)
session.commit()
# Delete with query
session.query(User).filter(User.username == 'alice').delete()
session.commit()
# Delete all
session.query(User).delete()
session.commit()
Relationships
One-to-Many
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
# One user has many posts
posts = relationship('Post', back_populates='author')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'))
# Many posts belong to one user
author = relationship('User', back_populates='posts')
# Usage
user = User(username='alice')
post1 = Post(title='First Post', content='Content here')
post2 = Post(title='Second Post', content='More content')
user.posts.append(post1)
user.posts.append(post2)
session.add(user)
session.commit()
# Query
user = session.query(User).filter_by(username='alice').first()
print(user.posts) # List of posts
post = session.query(Post).first()
print(post.author) # User object
One-to-One
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
# One-to-one
profile = relationship('Profile', back_populates='user', uselist=False)
class Profile(Base):
__tablename__ = 'profiles'
id = Column(Integer, primary_key=True)
bio = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'), unique=True)
user = relationship('User', back_populates='profile')
# Usage
user = User(username='alice')
profile = Profile(bio='Software developer')
user.profile = profile
session.add(user)
session.commit()
Many-to-Many
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
# Association table
student_course = Table(
'student_course',
Base.metadata,
Column('student_id', Integer, ForeignKey('students.id')),
Column('course_id', Integer, ForeignKey('courses.id'))
)
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String(100))
courses = relationship('Course', secondary=student_course, back_populates='students')
class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String(200))
students = relationship('Student', secondary=student_course, back_populates='courses')
# Usage
student1 = Student(name='Alice')
student2 = Student(name='Bob')
course1 = Course(title='Python Programming')
course2 = Course(title='Database Design')
student1.courses.extend([course1, course2])
student2.courses.append(course1)
session.add_all([student1, student2])
session.commit()
# Query
student = session.query(Student).filter_by(name='Alice').first()
print(student.courses) # List of courses
course = session.query(Course).filter_by(title='Python Programming').first()
print(course.students) # List of students
Many-to-Many with Extra Attributes
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from datetime import datetime
class Enrollment(Base):
__tablename__ = 'enrollments'
student_id = Column(Integer, ForeignKey('students.id'), primary_key=True)
course_id = Column(Integer, ForeignKey('courses.id'), primary_key=True)
enrolled_at = Column(DateTime, default=datetime.utcnow)
grade = Column(String(2))
student = relationship('Student', back_populates='enrollments')
course = relationship('Course', back_populates='enrollments')
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String(100))
enrollments = relationship('Enrollment', back_populates='student')
class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String(200))
enrollments = relationship('Enrollment', back_populates='course')
# Usage
student = Student(name='Alice')
course = Course(title='Python Programming')
enrollment = Enrollment(grade='A')
enrollment.student = student
enrollment.course = course
session.add(enrollment)
session.commit()
# Query
student = session.query(Student).first()
for enrollment in student.enrollments:
print(f"{enrollment.course.title}: {enrollment.grade}")
Advanced Queries
Filtering
from sqlalchemy import and_, or_, not_
# Equality
users = session.query(User).filter(User.username == 'alice').all()
# Comparison
users = session.query(User).filter(User.age > 18).all()
users = session.query(User).filter(User.age >= 18).all()
users = session.query(User).filter(User.age < 65).all()
# IN clause
users = session.query(User).filter(User.username.in_(['alice', 'bob', 'charlie'])).all()
# LIKE
users = session.query(User).filter(User.email.like('%@example.com')).all()
# IS NULL / IS NOT NULL
users = session.query(User).filter(User.phone == None).all()
users = session.query(User).filter(User.phone != None).all()
# AND
users = session.query(User).filter(
and_(User.age > 18, User.status == 'active')
).all()
# OR
users = session.query(User).filter(
or_(User.username == 'alice', User.username == 'bob')
).all()
# NOT
users = session.query(User).filter(
not_(User.status == 'banned')
).all()
# BETWEEN
users = session.query(User).filter(User.age.between(18, 65)).all()
Joins
# Inner join
results = session.query(User, Post).join(Post).all()
# Left outer join
results = session.query(User).outerjoin(Post).all()
# Explicit join condition
results = session.query(User).join(Post, User.id == Post.user_id).all()
# Multiple joins
results = session.query(User).join(Post).join(Comment).all()
# Filter after join
results = session.query(User).join(Post).filter(Post.title.like('%Python%')).all()
Aggregations
from sqlalchemy import func
# Count
count = session.query(func.count(User.id)).scalar()
# Group by
results = session.query(
User.status,
func.count(User.id)
).group_by(User.status).all()
# Sum
total = session.query(func.sum(Order.amount)).scalar()
# Average
avg_age = session.query(func.avg(User.age)).scalar()
# Min/Max
min_price = session.query(func.min(Product.price)).scalar()
max_price = session.query(func.max(Product.price)).scalar()
# Having clause
results = session.query(
User.status,
func.count(User.id).label('count')
).group_by(User.status).having(func.count(User.id) > 10).all()
Subqueries
from sqlalchemy import select
# Scalar subquery
subq = session.query(func.avg(User.age)).scalar_subquery()
users = session.query(User).filter(User.age > subq).all()
# Table subquery
subq = session.query(
Post.user_id,
func.count(Post.id).label('post_count')
).group_by(Post.user_id).subquery()
results = session.query(
User.username,
subq.c.post_count
).join(subq, User.id == subq.c.user_id).all()
Eager Loading
from sqlalchemy.orm import joinedload, selectinload, subqueryload
# N+1 problem (bad)
users = session.query(User).all()
for user in users:
print(user.posts) # Separate query for each user
# Joined load (single query with JOIN)
users = session.query(User).options(joinedload(User.posts)).all()
# Select in load (two queries, more efficient for collections)
users = session.query(User).options(selectinload(User.posts)).all()
# Subquery load
users = session.query(User).options(subqueryload(User.posts)).all()
# Multiple relationships
users = session.query(User).options(
selectinload(User.posts),
selectinload(User.comments)
).all()
# Nested relationships
users = session.query(User).options(
selectinload(User.posts).selectinload(Post.comments)
).all()
Sessions and Transactions
Session Lifecycle
from sqlalchemy.orm import Session
# Create session
session = Session(engine)
# Add objects
user = User(username='alice')
session.add(user)
# Commit changes
session.commit()
# Rollback on error
try:
user = User(username='bob')
session.add(user)
# Some operation that might fail
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
Context Manager
# Automatic session cleanup
with Session(engine) as session:
user = User(username='alice')
session.add(user)
session.commit()
# Session is automatically closed
# With transaction
with Session(engine) as session:
with session.begin():
user = User(username='alice')
session.add(user)
# Automatically commits or rolls back
Session Scopes
from sqlalchemy.orm import scoped_session, sessionmaker
# Thread-local session
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
# Use throughout your application
def create_user(username):
user = User(username=username)
Session.add(user)
Session.commit()
def get_user(user_id):
return Session.get(User, user_id)
# Remove session when done
Session.remove()
Practical Examples
Example 1: Blog Application
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
posts = relationship('Post', back_populates='author', cascade='all, delete-orphan')
comments = relationship('Comment', back_populates='author', cascade='all, delete-orphan')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
slug = Column(String(200), unique=True, nullable=False)
content = Column(Text, nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
author = relationship('User', back_populates='posts')
comments = relationship('Comment', back_populates='post', cascade='all, delete-orphan')
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
author = relationship('User', back_populates='comments')
post = relationship('Post', back_populates='comments')
# Usage
def create_blog_post(session, username, title, content):
user = session.query(User).filter_by(username=username).first()
if not user:
raise ValueError("User not found")
slug = title.lower().replace(' ', '-')
post = Post(title=title, slug=slug, content=content, author=user)
session.add(post)
session.commit()
return post
def add_comment(session, post_id, username, content):
post = session.get(Post, post_id)
user = session.query(User).filter_by(username=username).first()
if not post or not user:
raise ValueError("Post or user not found")
comment = Comment(content=content, author=user, post=post)
session.add(comment)
session.commit()
return comment
def get_published_posts(session, limit=10):
return session.query(Post).filter_by(published=True).order_by(
Post.created_at.desc()
).limit(limit).all()
Example 2: E-commerce System
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Enum
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
class OrderStatus(enum.Enum):
PENDING = 'pending'
PAID = 'paid'
SHIPPED = 'shipped'
DELIVERED = 'delivered'
CANCELLED = 'cancelled'
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
description = Column(Text)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
order_items = relationship('OrderItem', back_populates='product')
class Customer(Base):
__tablename__ = 'customers'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True, nullable=False)
phone = Column(String(20))
orders = relationship('Order', back_populates='customer')
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
order_number = Column(String(50), unique=True, nullable=False)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
total_amount = Column(Float, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
customer_id = Column(Integer, ForeignKey('customers.id'), nullable=False)
customer = relationship('Customer', back_populates='orders')
items = relationship('OrderItem', back_populates='order', cascade='all, delete-orphan')
class OrderItem(Base):
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=False)
order_id = Column(Integer, ForeignKey('orders.id'), nullable=False)
product_id = Column(Integer, ForeignKey('products.id'), nullable=False)
order = relationship('Order', back_populates='items')
product = relationship('Product', back_populates='order_items')
# Business logic
def create_order(session, customer_id, items):
"""
items: [(product_id, quantity), ...]
"""
customer = session.get(Customer, customer_id)
if not customer:
raise ValueError("Customer not found")
# Generate order number
order_number = f"ORD-{datetime.now().strftime('%Y%m%d%H%M%S')}"
order = Order(
order_number=order_number,
customer=customer,
total_amount=0
)
total = 0
for product_id, quantity in items:
product = session.get(Product, product_id)
if not product:
raise ValueError(f"Product {product_id} not found")
if product.stock < quantity:
raise ValueError(f"Insufficient stock for {product.name}")
item = OrderItem(
product=product,
quantity=quantity,
unit_price=product.price
)
order.items.append(item)
# Update stock
product.stock -= quantity
total += product.price * quantity
order.total_amount = total
session.add(order)
session.commit()
return order
def get_customer_orders(session, customer_id):
return session.query(Order).filter_by(customer_id=customer_id).order_by(
Order.created_at.desc()
).all()
def get_order_details(session, order_id):
order = session.query(Order).options(
joinedload(Order.customer),
selectinload(Order.items).joinedload(OrderItem.product)
).filter_by(id=order_id).first()
return order
Example 3: Repository Pattern
from typing import List, Optional, Generic, TypeVar
from sqlalchemy.orm import Session
T = TypeVar('T')
class BaseRepository(Generic[T]):
def __init__(self, session: Session, model: type):
self.session = session
self.model = model
def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
self.session.commit()
return instance
def get(self, id: int) -> Optional[T]:
return self.session.get(self.model, id)
def get_all(self, limit: int = 100, offset: int = 0) -> List[T]:
return self.session.query(self.model).limit(limit).offset(offset).all()
def update(self, id: int, **kwargs) -> Optional[T]:
instance = self.get(id)
if instance:
for key, value in kwargs.items():
setattr(instance, key, value)
self.session.commit()
return instance
def delete(self, id: int) -> bool:
instance = self.get(id)
if instance:
self.session.delete(instance)
self.session.commit()
return True
return False
class UserRepository(BaseRepository[User]):
def __init__(self, session: Session):
super().__init__(session, User)
def get_by_username(self, username: str) -> Optional[User]:
return self.session.query(User).filter_by(username=username).first()
def get_by_email(self, email: str) -> Optional[User]:
return self.session.query(User).filter_by(email=email).first()
def get_active_users(self) -> List[User]:
return self.session.query(User).filter_by(is_active=True).all()
# Usage
with Session(engine) as session:
user_repo = UserRepository(session)
# Create
user = user_repo.create(username='alice', email='alice@example.com')
# Read
user = user_repo.get(1)
user = user_repo.get_by_username('alice')
users = user_repo.get_active_users()
# Update
user_repo.update(1, email='newemail@example.com')
# Delete
user_repo.delete(1)
Migrations with Alembic
Setup Alembic
# Initialize Alembic
alembic init alembic
# Edit alembic.ini - set database URL
# sqlalchemy.url = postgresql://user:pass@localhost/dbname
Configure Alembic
alembic/env.py:from alembic import context
from sqlalchemy import engine_from_config, pool
from myapp.models import Base # Import your Base
# Set target metadata
target_metadata = Base.metadata
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
Create and Run Migrations
# Create a migration
alembic revision --autogenerate -m "create users table"
# Review the generated migration file
# alembic/versions/xxxx_create_users_table.py
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1
# Show current version
alembic current
# Show migration history
alembic history
Manual Migration
"""create users table
Revision ID: xxxx
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('username', sa.String(50), nullable=False),
sa.Column('email', sa.String(100), nullable=False),
sa.UniqueConstraint('username'),
sa.UniqueConstraint('email')
)
def downgrade():
op.drop_table('users')
Best Practices
1. Use Context Managers
# Good
with Session(engine) as session:
user = User(username='alice')
session.add(user)
session.commit()
# Bad
session = Session(engine)
user = User(username='alice')
session.add(user)
session.commit()
# Forgot to close!
2. Handle Transactions Properly
# Good
with Session(engine) as session:
try:
user = User(username='alice')
session.add(user)
session.commit()
except Exception as e:
session.rollback()
raise
# Better - automatic rollback
with Session(engine) as session:
with session.begin():
user = User(username='alice')
session.add(user)
3. Avoid N+1 Queries
# Bad - N+1 problem
users = session.query(User).all()
for user in users:
print(user.posts) # Separate query for each user!
# Good - eager loading
from sqlalchemy.orm import selectinload
users = session.query(User).options(selectinload(User.posts)).all()
for user in users:
print(user.posts) # Already loaded
4. Use Indexes
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(100), unique=True, index=True) # Indexed
username = Column(String(50), index=True) # Indexed
created_at = Column(DateTime, index=True) # For sorting
5. Use Enums for Fixed Values
import enum
from sqlalchemy import Enum
class UserStatus(enum.Enum):
ACTIVE = 'active'
INACTIVE = 'inactive'
BANNED = 'banned'
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
status = Column(Enum(UserStatus), default=UserStatus.ACTIVE)
6. Validation in Models
from sqlalchemy.orm import validates
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(100))
age = Column(Integer)
@validates('email')
def validate_email(self, key, email):
if '@' not in email:
raise ValueError("Invalid email address")
return email
@validates('age')
def validate_age(self, key, age):
if age < 0 or age > 150:
raise ValueError("Invalid age")
return age
Testing
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
@pytest.fixture
def engine():
# Use in-memory SQLite for tests
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
return engine
@pytest.fixture
def session(engine):
connection = engine.connect()
transaction = connection.begin()
session = Session(bind=connection)
yield session
session.close()
transaction.rollback()
connection.close()
def test_create_user(session):
user = User(username='testuser', email='test@example.com')
session.add(user)
session.commit()
assert user.id is not None
assert user.username == 'testuser'
def test_get_user(session):
user = User(username='testuser', email='test@example.com')
session.add(user)
session.commit()
retrieved = session.query(User).filter_by(username='testuser').first()
assert retrieved is not None
assert retrieved.email == 'test@example.com'
Key Takeaways
Additional Resources
Official Documentation:
Tutorials:
Tools:
Next Steps
SQLAlchemy is powerful and flexible. Start with basic models and queries, then gradually explore advanced features as your needs grow!