A complete build guide for the MTG App: Python + FastAPI + Strawberry + SQLAlchemy on the backend, React + Apollo Client on the frontend, communicating over GraphQL.
Spin up PostgreSQL and Redis with Docker Compose. These are the only infrastructure dependencies.
Postgres and Redis are the only external services. Running them in Docker means zero system-level install dependencies and easy teardown. The Celery worker and FastAPI app run outside Docker in dev for easier hot-reloading.
version: "3.9"
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: mtg
POSTGRES_USER: mtg
POSTGRES_PASSWORD: mtg
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
pgdata: mkdir mtg-app && cd mtg-appdocker-compose.yml at the project rootdocker compose up -dpsql postgresql://mtg:mtg@localhost:5432/mtg -c '\l'git init && git commit -m 'chore: docker compose infrastructure'Create the Python project structure, virtual environment, and install all backend dependencies.
All Python code lives in backend/. Separate the FastAPI app from Celery worker entrypoints so each can be started independently.
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── db.py # SQLAlchemy async engine
│ ├── models/
│ │ ├── __init__.py
│ │ └── models.py # SQLAlchemy models
│ ├── graphql/
│ │ ├── __init__.py
│ │ ├── schema.py # Strawberry schema root
│ │ ├── types.py # Strawberry types
│ │ └── resolvers/
│ │ ├── cards.py
│ │ ├── collection.py
│ │ ├── converter.py
│ │ ├── decks.py
│ │ └── prices.py
│ ├── services/
│ │ ├── scryfall_sync.py
│ │ ├── converter.py
│ │ └── price_snapshot.py
│ └── tasks/
│ └── tasks.py # Celery task definitions
├── alembic/
├── alembic.ini
├── celery_worker.py
└── requirements.txt fastapi==0.111.0
uvicorn[standard]==0.29.0
strawberry-graphql[fastapi]==0.227.0
sqlalchemy[asyncio]==2.0.30
asyncpg==0.29.0
alembic==1.13.1
celery==5.4.0
redis==5.0.4
httpx==0.27.0
pydantic==2.7.0
pydantic-settings==2.2.1
python-multipart==0.0.9
pytest==8.2.0
pytest-asyncio==0.23.6 cd backend
python -m venv .venv
source .venv/bin/activate # Windows: .venvScriptsactivate
pip install -r requirements.txt from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
from app.graphql.schema import schema
app = FastAPI(title="MTG App API")
graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")
@app.get("/health")
def health():
return {"status": "ok"} backend/ directory tree above (use mkdir -p)requirements.txt and install dependenciesbackend/app/main.py — the schema import will fail until Phase 3, that’s fineuvicorn app.main:app --reload and confirm /health returns 200git commit -m 'chore: backend scaffold'Bootstrap the React TypeScript SPA, install Apollo Client, and configure Tailwind + shadcn/ui.
This is a pure client-side SPA — no SSR needed for a personal local tool. Vite gives fast HMR and a simpler mental model. Apollo Client handles all data fetching against the GraphQL endpoint.
npm create vite@latest frontend -- --template react-ts
cd frontend
npm install
# Apollo Client
npm install @apollo/client graphql
# UI
npm install -D tailwindcss postcss autoprefixer
npx tailwindcss init -p
npx shadcn-ui@latest init
# Utilities
npm install @scryfall/api-types papaparse recharts
npm install -D @types/papaparse
# GraphQL codegen
npm install -D @graphql-codegen/cli
@graphql-codegen/client-preset import { ApolloClient, InMemoryCache, HttpLink } from "@apollo/client";
export const client = new ApolloClient({
link: new HttpLink({
uri: import.meta.env.VITE_GRAPHQL_URL ?? "http://localhost:8000/graphql",
}),
cache: new InMemoryCache(),
}); import React from "react";
import ReactDOM from "react-dom/client";
import { ApolloProvider } from "@apollo/client";
import { client } from "./apollo";
import App from "./App";
import "./index.css";
ReactDOM.createRoot(document.getElementById("root")!).render(
<React.StrictMode>
<ApolloProvider client={client}>
<App />
</ApolloProvider>
</React.StrictMode>
); VITE_GRAPHQL_URL=http://localhost:8000/graphql src/apollo.tssrc/main.tsx to wrap with ApolloProvidernpm run dev — confirm it boots at localhost:5173git commit -m 'chore: frontend scaffold'Define all five SQLAlchemy async models. These are the single source of truth for your DB schema.
Use SQLAlchemy 2.0 style (DeclarativeBase). All primary keys are UUIDs generated in Python. Prices stored as integers (cents). The cards table is a read-only Scryfall cache — only the sync job writes to it.
Store all prices as Integer (cents), never Float. Float rounding will corrupt price history over time.
from sqlalchemy.ext.asyncio import (
create_async_engine, async_sessionmaker, AsyncSession
)
from sqlalchemy.orm import DeclarativeBase
import os
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://mtg:mtg@localhost:5432/mtg"
)
engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
class Base(DeclarativeBase):
pass
async def get_db():
async with AsyncSessionLocal() as session:
yield session from sqlalchemy import (
String, Integer, Boolean, Text,
DateTime, ForeignKey, ARRAY, func
)
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.db import Base
import uuid
class Card(Base):
__tablename__ = "cards"
id: Mapped[str] = mapped_column(String, primary_key=True) # Scryfall UUID
name: Mapped[str] = mapped_column(String, nullable=False, index=True)
set_code: Mapped[str] = mapped_column(String, nullable=False)
set_name: Mapped[str] = mapped_column(String, nullable=False)
collector_number: Mapped[str] = mapped_column(String, nullable=False)
mana_cost: Mapped[str | None] = mapped_column(String)
cmc: Mapped[int] = mapped_column(Integer, default=0)
type_line: Mapped[str] = mapped_column(String, nullable=False)
oracle_text: Mapped[str | None] = mapped_column(Text)
color_identity: Mapped[list] = mapped_column(ARRAY(String), default=[])
rarity: Mapped[str] = mapped_column(String, nullable=False)
price_usd: Mapped[int | None] = mapped_column(Integer) # cents
price_usd_foil: Mapped[int | None] = mapped_column(Integer) # cents
price_eur: Mapped[int | None] = mapped_column(Integer) # cents
image_uris: Mapped[dict | None] = mapped_column(JSONB)
legalities: Mapped[dict | None] = mapped_column(JSONB)
scryfall_data: Mapped[dict] = mapped_column(JSONB, nullable=False)
updated_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class CollectionItem(Base):
__tablename__ = "collection"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
scryfall_id: Mapped[str] = mapped_column(
String, ForeignKey("cards.id"), nullable=False, index=True
)
quantity: Mapped[int] = mapped_column(Integer, default=1)
foil: Mapped[bool] = mapped_column(Boolean, default=False)
condition: Mapped[str] = mapped_column(String, default="NM")
language: Mapped[str] = mapped_column(String, default="en")
purchase_price_cents: Mapped[int | None] = mapped_column(Integer)
acquired_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
notes: Mapped[str | None] = mapped_column(Text)
card: Mapped["Card"] = relationship("Card")
class Deck(Base):
__tablename__ = "decks"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
name: Mapped[str] = mapped_column(String, nullable=False)
format: Mapped[str] = mapped_column(String, default="commander")
description: Mapped[str | None] = mapped_column(Text)
commander_id: Mapped[str | None] = mapped_column(
String, ForeignKey("cards.id")
)
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
deck_cards: Mapped[list["DeckCard"]] = relationship(
"DeckCard", back_populates="deck", cascade="all, delete-orphan"
)
class DeckCard(Base):
__tablename__ = "deck_cards"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
deck_id: Mapped[str] = mapped_column(
String, ForeignKey("decks.id", ondelete="CASCADE"),
nullable=False, index=True
)
scryfall_id: Mapped[str] = mapped_column(
String, ForeignKey("cards.id"), nullable=False
)
quantity: Mapped[int] = mapped_column(Integer, default=1)
board: Mapped[str] = mapped_column(String, default="mainboard")
categories: Mapped[list] = mapped_column(ARRAY(String), default=[])
foil: Mapped[bool] = mapped_column(Boolean, default=False)
deck: Mapped["Deck"] = relationship("Deck", back_populates="deck_cards")
card: Mapped["Card"] = relationship("Card")
class PriceHistory(Base):
__tablename__ = "price_history"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
scryfall_id: Mapped[str] = mapped_column(
String, ForeignKey("cards.id"), nullable=False, index=True
)
price_usd: Mapped[int | None] = mapped_column(Integer)
price_usd_foil: Mapped[int | None] = mapped_column(Integer)
price_eur: Mapped[int | None] = mapped_column(Integer)
snapshot_date: Mapped[str] = mapped_column(String, nullable=False) # YYYY-MM-DD
recorded_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class EdhrecCache(Base):
__tablename__ = "edhrec_cache"
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
slug: Mapped[str] = mapped_column(String, nullable=False, unique=True, index=True)
data: Mapped[dict] = mapped_column(JSONB, nullable=False)
expires_at: Mapped[DateTime] = mapped_column(DateTime(timezone=True), nullable=False)
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
) backend/app/db.pybackend/app/models/models.py with all five models abovepython -c 'from app.models.models import Base; print(Base.metadata.tables.keys())' — should list all 5 tablesgit commit -m 'feat: SQLAlchemy models'Initialize Alembic and generate the initial migration from your SQLAlchemy models.
Alembic manages schema migrations. Unlike Drizzle Kit, you own the migration files completely — always review generated migrations before running them. Use async migrations with asyncpg.
Learn these 3 Alembic commands and run them yourself — never ask Claude to manage migrations. It will make mistakes.
cd backend
alembic init alembic # Replace the run_migrations_offline and run_migrations_online
# functions with async versions, and import your models:
from app.db import DATABASE_URL
from app.models.models import Base # import all models
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
# In env.py, set:
target_metadata = Base.metadata
# Update the DATABASE_URL in alembic.ini OR set it dynamically:
config.set_main_option("sqlalchemy.url", DATABASE_URL) # Generate migration from current models
alembic revision --autogenerate -m "initial schema"
# Review the generated file in alembic/versions/
# Then apply it:
alembic upgrade head alembic upgrade head # apply all pending migrations
alembic downgrade -1 # roll back one migration
alembic current # show current revision
alembic history # show migration history
alembic revision --autogenerate -m "add column" # generate new migration alembic init alembic in the backend/ directoryalembic/env.py to import Base.metadata and set the DB URLalembic revision --autogenerate -m 'initial schema'alembic/versions/ and review it — confirm all 5 tables appearalembic upgrade headpsql postgresql://mtg:mtg@localhost:5432/mtg -c '\dt'git commit -m 'feat: initial alembic migration'Python async service that fetches the Scryfall default_cards bulk file and upserts into the cards table.
Scryfall publishes a default_cards bulk file updated every 12 hours. It’s a JSON array of ~30k card objects (~100MB). Always fetch the metadata endpoint first to get the current download URL — never hardcode it.
Don't hardcode the download URL — it changes on each update. Always fetch https://api.scryfall.com/bulk-data/default-cards first.
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.models.models import Card
import asyncio
BULK_META_URL = "https://api.scryfall.com/bulk-data/default-cards"
def _price_cents(val: str | None) -> int | None:
if val is None:
return None
try:
return round(float(val) * 100)
except (ValueError, TypeError):
return None
async def sync_scryfall_bulk(db: AsyncSession) -> int:
async with httpx.AsyncClient(timeout=300) as client:
meta = (await client.get(BULK_META_URL)).json()
download_url: str = meta["download_uri"]
print(f"Downloading bulk data from {download_url}...")
response = await client.get(download_url)
card_array: list[dict] = response.json()
print(f"Upserting {len(card_array)} cards...")
BATCH = 500
for i in range(0, len(card_array), BATCH):
batch = card_array[i : i + BATCH]
values = [
{
"id": c["id"],
"name": c["name"],
"set_code": c["set"],
"set_name": c["set_name"],
"collector_number": c["collector_number"],
"mana_cost": c.get("mana_cost"),
"cmc": round(c.get("cmc") or 0),
"type_line": c.get("type_line", ""),
"oracle_text": c.get("oracle_text"),
"color_identity": c.get("color_identity", []),
"rarity": c["rarity"],
"price_usd": _price_cents(c.get("prices", {}).get("usd")),
"price_usd_foil": _price_cents(c.get("prices", {}).get("usd_foil")),
"price_eur": _price_cents(c.get("prices", {}).get("eur")),
"image_uris": c.get("image_uris"),
"legalities": c.get("legalities"),
"scryfall_data": c,
}
for c in batch
]
stmt = pg_insert(Card).values(values)
stmt = stmt.on_conflict_do_update(
index_elements=["id"],
set_={
"name": stmt.excluded.name,
"price_usd": stmt.excluded.price_usd,
"price_usd_foil": stmt.excluded.price_usd_foil,
"price_eur": stmt.excluded.price_eur,
"scryfall_data": stmt.excluded.scryfall_data,
},
)
await db.execute(stmt)
await db.commit()
if i % 5000 == 0:
print(f" {i} / {len(card_array)}")
print("Bulk sync complete.")
return len(card_array) # backend/scripts/seed.py
import asyncio
from app.db import AsyncSessionLocal
from app.services.scryfall_sync import sync_scryfall_bulk
async def main():
async with AsyncSessionLocal() as db:
count = await sync_scryfall_bulk(db)
print(f"Synced {count} cards.")
asyncio.run(main()) cd backend
python scripts/seed.py
# Expect 2–5 minutes on first run backend/app/services/scryfall_sync.pybackend/scripts/seed.pypython scripts/seed.pypsql ... -c "SELECT name, price_usd FROM cards WHERE name = 'Lightning Bolt' LIMIT 1;"git commit -m 'feat: scryfall bulk sync service'Register the sync and price snapshot jobs as Celery beat tasks with Redis as the broker.
Celery is the standard Python async task queue. With Redis as the broker, it handles scheduling, retries, and concurrency cleanly. Celery Beat handles cron-style scheduling — no need for a separate cron daemon.
from celery import Celery
import os
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
celery_app = Celery(
"mtg",
broker=REDIS_URL,
backend=REDIS_URL,
include=["app.tasks.tasks"],
)
celery_app.conf.beat_schedule = {
"scryfall-sync-daily": {
"task": "app.tasks.tasks.scryfall_sync_task",
"schedule": 3600 * 24, # every 24h
},
"price-snapshot-daily": {
"task": "app.tasks.tasks.price_snapshot_task",
"schedule": 3600 * 24,
},
}
celery_app.conf.timezone = "UTC" import asyncio
from celery_worker import celery_app
from app.db import AsyncSessionLocal
from app.services.scryfall_sync import sync_scryfall_bulk
from app.services.price_snapshot import take_price_snapshot
@celery_app.task(name="app.tasks.tasks.scryfall_sync_task")
def scryfall_sync_task():
async def _run():
async with AsyncSessionLocal() as db:
await sync_scryfall_bulk(db)
asyncio.run(_run())
@celery_app.task(name="app.tasks.tasks.price_snapshot_task")
def price_snapshot_task():
async def _run():
async with AsyncSessionLocal() as db:
await take_price_snapshot(db)
asyncio.run(_run()) # In one terminal: worker
celery -A celery_worker worker --loglevel=info
# In another terminal: beat scheduler
celery -A celery_worker beat --loglevel=info backend/celery_worker.pybackend/app/tasks/tasks.py — the price_snapshot import will be stubbed until lesson 14celery -A celery_worker call app.tasks.tasks.scryfall_sync_taskgit commit -m 'feat: celery tasks and scheduler'Define all Strawberry GraphQL types from your SQLAlchemy models. These mirror the DB schema with GraphQL-friendly naming.
Strawberry types are the GraphQL schema contract. They live separately from SQLAlchemy models — a Strawberry type is what the client sees, a model is what the DB stores. Keep them in sync manually; don’t try to derive one from the other automatically.
import strawberry
from typing import Optional
from datetime import datetime
@strawberry.type
class CardType:
id: str
name: str
set_code: str
set_name: str
collector_number: str
mana_cost: Optional[str]
cmc: int
type_line: str
oracle_text: Optional[str]
color_identity: list[str]
rarity: str
price_usd: Optional[int] # cents
price_usd_foil: Optional[int] # cents
price_eur: Optional[int] # cents
@strawberry.type
class CollectionItemType:
id: str
scryfall_id: str
quantity: int
foil: bool
condition: str
language: str
purchase_price_cents: Optional[int]
acquired_at: datetime
card: Optional[CardType] = None
@strawberry.type
class DeckCardType:
id: str
deck_id: str
scryfall_id: str
quantity: int
board: str
categories: list[str]
foil: bool
card: Optional[CardType] = None
@strawberry.type
class DeckType:
id: str
name: str
format: str
description: Optional[str]
commander_id: Optional[str]
created_at: datetime
updated_at: datetime
deck_cards: list[DeckCardType] = strawberry.field(default_factory=list)
@strawberry.type
class PriceHistoryType:
id: str
scryfall_id: str
price_usd: Optional[int]
price_usd_foil: Optional[int]
price_eur: Optional[int]
snapshot_date: str
recorded_at: datetime
@strawberry.type
class ConversionResultType:
csv: str
count: int import strawberry
from strawberry.fastapi import GraphQLRouter
from app.graphql.resolvers.cards import CardsQuery
from app.graphql.resolvers.collection import CollectionQuery, CollectionMutation
from app.graphql.resolvers.converter import ConverterMutation
from app.graphql.resolvers.decks import DecksQuery, DecksMutation
from app.graphql.resolvers.prices import PricesQuery
@strawberry.type
class Query(CardsQuery, CollectionQuery, DecksQuery, PricesQuery):
pass
@strawberry.type
class Mutation(CollectionMutation, ConverterMutation, DecksMutation):
pass
schema = strawberry.Schema(query=Query, mutation=Mutation) backend/app/graphql/types.py with all Strawberry typesbackend/app/graphql/schema.pyuvicorn app.main:app --reload and open http://localhost:8000/graphql in the browsergit commit -m 'feat: strawberry schema skeleton'Implement the card search, card by ID, and collection CRUD resolvers.
Each resolver module defines a mixin class with @strawberry.field methods. These are composed into the root Query/Mutation types in schema.py. Use the FastAPI dependency injection pattern to get the AsyncSession into resolvers via strawberry.Info.
# In backend/app/main.py, update the GraphQLRouter setup:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_db
from typing import AsyncGenerator
async def get_context(db: AsyncSession = Depends(get_db)):
return {"db": db}
graphql_app = GraphQLRouter(schema, context_getter=get_context) import strawberry
from strawberry.types import Info
from sqlalchemy import select, or_, func
from app.models.models import Card
from app.graphql.types import CardType
from typing import Optional
def _to_card_type(c: Card) -> CardType:
return CardType(
id=c.id, name=c.name, set_code=c.set_code,
set_name=c.set_name, collector_number=c.collector_number,
mana_cost=c.mana_cost, cmc=c.cmc, type_line=c.type_line,
oracle_text=c.oracle_text, color_identity=c.color_identity or [],
rarity=c.rarity, price_usd=c.price_usd,
price_usd_foil=c.price_usd_foil, price_eur=c.price_eur,
)
@strawberry.type
class CardsQuery:
@strawberry.field
async def search_cards(
self, info: Info, query: str, limit: int = 20
) -> list[CardType]:
db = info.context["db"]
stmt = (
select(Card)
.where(func.lower(Card.name).contains(query.lower()))
.limit(min(limit, 50))
)
result = await db.execute(stmt)
return [_to_card_type(c) for c in result.scalars()]
@strawberry.field
async def card_by_id(
self, info: Info, id: str
) -> Optional[CardType]:
db = info.context["db"]
result = await db.execute(select(Card).where(Card.id == id))
card = result.scalar_one_or_none()
return _to_card_type(card) if card else None import strawberry
import uuid
from strawberry.types import Info
from sqlalchemy import select
from app.models.models import CollectionItem
from app.graphql.types import CollectionItemType
@strawberry.type
class CollectionQuery:
@strawberry.field
async def collection(self, info: Info) -> list[CollectionItemType]:
db = info.context["db"]
result = await db.execute(select(CollectionItem))
return [_to_type(item) for item in result.scalars()]
@strawberry.type
class CollectionMutation:
@strawberry.mutation
async def add_to_collection(
self,
info: Info,
scryfall_id: str,
quantity: int = 1,
foil: bool = False,
condition: str = "NM",
language: str = "en",
purchase_price_cents: int | None = None,
) -> CollectionItemType:
db = info.context["db"]
item = CollectionItem(
id=str(uuid.uuid4()),
scryfall_id=scryfall_id,
quantity=quantity,
foil=foil,
condition=condition,
language=language,
purchase_price_cents=purchase_price_cents,
)
db.add(item)
await db.commit()
await db.refresh(item)
return _to_type(item) backend/app/main.py to inject db session into GraphQL contextbackend/app/graphql/resolvers/cards.pybackend/app/graphql/resolvers/collection.pyquery { searchCards(query: "Lightning Bolt") { id name priceUsd } }npx tsc --noEmit in frontend — run codegen after next lesson to generate TS typesgit commit -m 'feat: cards and collection resolvers'Generate TypeScript types from the Strawberry schema so the frontend has full type safety.
Codegen introspects the live GraphQL schema and generates TypeScript types for every query and mutation you write. You write .graphql files in the frontend, run codegen, and get fully-typed hooks. This is the key advantage of GraphQL over tRPC for a two-language stack.
import type { CodegenConfig } from "@graphql-codegen/cli";
const config: CodegenConfig = {
schema: "http://localhost:8000/graphql",
documents: ["src/**/*.graphql"],
generates: {
"./src/graphql/": {
preset: "client",
presetConfig: {
gqlTagName: "gql",
},
},
},
};
export default config; {
"scripts": {
"codegen": "graphql-codegen --config codegen.ts",
"codegen:watch": "graphql-codegen --config codegen.ts --watch"
}
} query SearchCards($query: String!, $limit: Int) {
searchCards(query: $query, limit: $limit) {
id
name
setCode
setName
manaCost
cmc
typeLine
colorIdentity
rarity
priceUsd
priceUsdFoil
imageUris
}
}
query CardById($id: String!) {
cardById(id: $id) {
id
name
oracleText
legalities
imageUris
}
} // In a React component:
import { useQuery } from "@apollo/client";
import { SearchCardsDocument } from "@/graphql/graphql";
function CardSearch({ query }: { query: string }) {
const { data, loading } = useQuery(SearchCardsDocument, {
variables: { query, limit: 20 },
skip: query.length < 2,
});
return <div>{data?.searchCards.map(c => c.name).join(", ")}</div>;
} frontend/codegen.tscodegen script to package.jsonsrc/graphql/queries/cards.graphqlnpm run codegen — confirm src/graphql/graphql.ts is generatedgit commit -m 'feat: graphql codegen setup'Build the universal converter in Python: parsers for each format, canonical pivot, and serializers.
Every format conversion goes through a canonical dict as an intermediate. You never convert Manabox to Moxfield directly — only Manabox → canonical → Moxfield. This means N parsers + N serializers, not N² converters.
import csv
import io
from dataclasses import dataclass
from typing import Literal
CardCondition = Literal["NM", "LP", "MP", "HP", "DMG"]
@dataclass
class CanonicalCard:
scryfall_id: str
quantity: int
foil: bool
condition: CardCondition
language: str
purchase_price: float | None = None
MANABOX_TO_CANONICAL: dict[str, CardCondition] = {
"Near Mint": "NM",
"Lightly Played": "LP",
"Moderately Played": "MP",
"Heavily Played": "HP",
"Damaged": "DMG",
}
CANONICAL_TO_MANABOX = {v: k for k, v in MANABOX_TO_CANONICAL.items()}
def from_manabox(csv_text: str) -> list[CanonicalCard]:
reader = csv.DictReader(io.StringIO(csv_text))
cards = []
for row in reader:
cards.append(CanonicalCard(
scryfall_id=row["Scryfall ID"],
quantity=int(row.get("Quantity", 1) or 1),
foil=row.get("Foil", "").lower() == "foil",
condition=MANABOX_TO_CANONICAL.get(row.get("Condition", ""), "NM"),
language=row.get("Language", "en") or "en",
purchase_price=float(row["Purchase Price"]) if row.get("Purchase Price") else None,
))
return cards
def to_manabox(cards: list[CanonicalCard]) -> str:
fieldnames = ["Scryfall ID", "Quantity", "Foil", "Condition", "Language", "Purchase Price"]
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
for c in cards:
writer.writerow({
"Scryfall ID": c.scryfall_id,
"Quantity": c.quantity,
"Foil": "foil" if c.foil else "",
"Condition": CANONICAL_TO_MANABOX.get(c.condition, "Near Mint"),
"Language": c.language,
"Purchase Price": str(c.purchase_price) if c.purchase_price else "",
})
return out.getvalue()
MOXFIELD_CONDITION_MAP: dict[str, CardCondition] = {
"NM": "NM", "LP": "LP", "MP": "MP", "HP": "HP", "DMG": "DMG",
}
def from_moxfield(csv_text: str) -> list[CanonicalCard]:
reader = csv.DictReader(io.StringIO(csv_text))
cards = []
for row in reader:
cards.append(CanonicalCard(
scryfall_id=row["Scryfall ID"],
quantity=int(row.get("Count", 1) or 1),
foil=row.get("Foil", "").lower() in ("foil", "etched"),
condition=MOXFIELD_CONDITION_MAP.get(row.get("Condition", "NM"), "NM"),
language=row.get("Language", "en") or "en",
))
return cards
def to_moxfield(cards: list[CanonicalCard]) -> str:
fieldnames = ["Count", "Scryfall ID", "Condition", "Foil", "Language"]
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
for c in cards:
writer.writerow({
"Count": c.quantity,
"Scryfall ID": c.scryfall_id,
"Condition": c.condition,
"Foil": "foil" if c.foil else "",
"Language": c.language,
})
return out.getvalue()
def from_archidekt(csv_text: str) -> list[CanonicalCard]:
reader = csv.DictReader(io.StringIO(csv_text))
cards = []
for row in reader:
cards.append(CanonicalCard(
scryfall_id=row["scryfall_id"],
quantity=int(row.get("quantity", 1) or 1),
foil=row.get("foil", "").lower() == "true",
condition=MOXFIELD_CONDITION_MAP.get(row.get("condition", "NM"), "NM"),
language="en",
))
return cards
def to_archidekt(cards: list[CanonicalCard]) -> str:
fieldnames = ["quantity", "scryfall_id", "condition", "foil"]
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
for c in cards:
writer.writerow({
"quantity": c.quantity,
"scryfall_id": c.scryfall_id,
"condition": c.condition,
"foil": str(c.foil).lower(),
})
return out.getvalue()
CollectionFormat = Literal["manabox", "moxfield", "archidekt"]
FROM_ADAPTERS = {"manabox": from_manabox, "moxfield": from_moxfield, "archidekt": from_archidekt}
TO_ADAPTERS = {"manabox": to_manabox, "moxfield": to_moxfield, "archidekt": to_archidekt}
def convert(csv_text: str, from_fmt: CollectionFormat, to_fmt: CollectionFormat) -> tuple[str, int]:
canonical = FROM_ADAPTERS[from_fmt](csv_text)
output = TO_ADAPTERS[to_fmt](canonical)
return output, len(canonical) backend/app/services/converter.pyconvert(csv, 'manabox', 'moxfield'), check that scryfallIds are preservedgit commit -m 'feat: python CSV converter service'Expose the converter as a GraphQL mutation and build the upload → convert → download UI in React.
import strawberry
from strawberry.types import Info
from app.services.converter import convert, CollectionFormat
from app.graphql.types import ConversionResultType
@strawberry.type
class ConverterMutation:
@strawberry.mutation
async def convert_csv(
self,
info: Info,
csv: str,
from_format: str,
to_format: str,
) -> ConversionResultType:
output, count = convert(csv, from_format, to_format) # type: ignore
return ConversionResultType(csv=output, count=count) mutation ConvertCsv($csv: String!, $fromFormat: String!, $toFormat: String!) {
convertCsv(csv: $csv, fromFormat: $fromFormat, toFormat: $toFormat) {
csv
count
}
} import { useMutation } from "@apollo/client";
import { ConvertCsvDocument } from "@/graphql/graphql";
export function ConverterPage() {
const [convert, { data, loading }] = useMutation(ConvertCsvDocument);
const handleFile = async (e: React.ChangeEvent<HTMLInputElement>) => {
const file = e.target.files?.[0];
if (!file) return;
const csv = await file.text();
await convert({ variables: { csv, fromFormat, toFormat } });
};
const handleDownload = () => {
if (!data?.convertCsv.csv) return;
const blob = new Blob([data.convertCsv.csv], { type: "text/csv" });
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = "converted.csv";
a.click();
};
return (/* JSX with file input, format selects, download button */);
} backend/app/graphql/resolvers/converter.pyConverterMutation to schema.pynpm run codegen to generate the TypeScript mutation hookgit commit -m 'feat: converter resolver and UI'GraphQL CRUD for decks and deck_cards, including color identity legality checking.
Color identity legality is a pure function — no DB query needed. A card is legal in a commander deck if its color identity is a subset of the commander’s color identity. Scryfall stores these as string arrays: ["W", "U"].
def is_legal_in_commander(
commander_identity: list[str],
card_identity: list[str]
) -> bool:
return all(c in commander_identity for c in card_identity) import strawberry
import uuid
from strawberry.types import Info
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.models.models import Deck, DeckCard
from app.graphql.types import DeckType, DeckCardType
@strawberry.type
class DecksQuery:
@strawberry.field
async def decks(self, info: Info) -> list[DeckType]:
db = info.context["db"]
result = await db.execute(
select(Deck).options(
selectinload(Deck.deck_cards)
)
)
return [_to_deck_type(d) for d in result.scalars()]
@strawberry.field
async def deck(self, info: Info, id: str) -> DeckType | None:
db = info.context["db"]
result = await db.execute(
select(Deck)
.where(Deck.id == id)
.options(selectinload(Deck.deck_cards).selectinload(DeckCard.card))
)
deck = result.scalar_one_or_none()
return _to_deck_type(deck) if deck else None
@strawberry.type
class DecksMutation:
@strawberry.mutation
async def create_deck(
self, info: Info, name: str,
format: str = "commander",
commander_id: str | None = None,
) -> DeckType:
db = info.context["db"]
deck = Deck(id=str(uuid.uuid4()), name=name,
format=format, commander_id=commander_id)
db.add(deck)
await db.commit()
await db.refresh(deck)
return _to_deck_type(deck)
@strawberry.mutation
async def add_card_to_deck(
self, info: Info, deck_id: str, scryfall_id: str,
quantity: int = 1,
board: str = "mainboard",
categories: list[str] = [],
) -> DeckCardType:
db = info.context["db"]
dc = DeckCard(
id=str(uuid.uuid4()), deck_id=deck_id,
scryfall_id=scryfall_id, quantity=quantity,
board=board, categories=categories,
)
db.add(dc)
await db.commit()
await db.refresh(dc)
return _to_deck_card_type(dc) backend/app/graphql/resolvers/decks.pyDecksQuery and DecksMutation to schema.pynpm run codegengit commit -m 'feat: deck resolvers'Daily price snapshot Celery task and GraphQL query for price history.
The snapshot job reads all unique scryfall_id values from the collection table, gets each card’s current price from cards, and inserts a row into price_history. Use INSERT ... ON CONFLICT DO NOTHING to avoid duplicate snapshots on the same day.
Price trends only become useful after ~2 weeks of daily snapshots. Add this job now so data accumulates while you build the UI.
import uuid
from datetime import date
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.models.models import CollectionItem, Card, PriceHistory
async def take_price_snapshot(db: AsyncSession) -> int:
today = date.today().isoformat()
result = await db.execute(
select(CollectionItem.scryfall_id).distinct()
)
ids = [row[0] for row in result]
count = 0
for scryfall_id in ids:
card_result = await db.execute(
select(Card.price_usd, Card.price_usd_foil, Card.price_eur)
.where(Card.id == scryfall_id)
)
card = card_result.first()
if not card:
continue
stmt = pg_insert(PriceHistory).values(
id=str(uuid.uuid4()),
scryfall_id=scryfall_id,
price_usd=card.price_usd,
price_usd_foil=card.price_usd_foil,
price_eur=card.price_eur,
snapshot_date=today,
).on_conflict_do_nothing()
await db.execute(stmt)
count += 1
await db.commit()
return count # backend/app/graphql/resolvers/prices.py
import strawberry
from strawberry.types import Info
from sqlalchemy import select
from app.models.models import PriceHistory
from app.graphql.types import PriceHistoryType
@strawberry.type
class PricesQuery:
@strawberry.field
async def price_history(
self, info: Info, scryfall_id: str
) -> list[PriceHistoryType]:
db = info.context["db"]
result = await db.execute(
select(PriceHistory)
.where(PriceHistory.scryfall_id == scryfall_id)
.order_by(PriceHistory.snapshot_date)
)
return [_to_type(r) for r in result.scalars()] backend/app/services/price_snapshot.pybackend/app/tasks/tasks.py to uncomment the price snapshot task importbackend/app/graphql/resolvers/prices.pyPricesQuery to schema.pycelery -A celery_worker call app.tasks.tasks.price_snapshot_taskgit commit -m 'feat: price snapshot service and resolver'Define CanonicalCard and DeckCard in TypeScript. Paste these into every frontend session.
GraphQL Codegen generates types for your queries/mutations, but your app-level types (canonical card, converter state) aren’t part of the schema. Keep these in src/lib/types.ts and paste them into every new session.
Save this file and paste it at the top of every new frontend AI session alongside the relevant .graphql query files.
import type { Card } from "@scryfall/api-types";
// Re-export for convenience
export type ScryfallCard = Card;
// Format-agnostic internal representation
export interface CanonicalCard {
scryfallId: string;
quantity: number;
foil: boolean;
condition: CardCondition;
language: string;
purchasePrice?: number;
}
export type CardCondition =
| "NM" | "LP" | "MP" | "HP" | "DMG";
// A card as it lives inside a deck (from GraphQL)
export interface DeckCardView {
id: string;
deckId: string;
scryfallId: string;
quantity: number;
board: "mainboard" | "sideboard" | "maybeboard" | "commander";
categories: string[];
foil: boolean;
card?: {
id: string;
name: string;
manaCost?: string;
cmc: number;
typeLine: string;
colorIdentity: string[];
priceUsd?: number;
imageUris?: Record<string, string>;
};
}
export type CollectionFormat = "manabox" | "moxfield" | "archidekt"; frontend/src/lib/types.tsnpx tsc --noEmit — should compile with zero errorsgit commit -m 'feat: core frontend type definitions'Table view of owned cards with add/remove actions, total value display, and import from CSV.
# src/graphql/queries/collection.graphql
query GetCollection {
collection {
id
scryfallId
quantity
foil
condition
language
purchasePriceCents
card {
id
name
setName
priceUsd
priceUsdFoil
imageUris
}
}
}
mutation AddToCollection(
$scryfallId: String!
$quantity: Int
$foil: Boolean
$condition: String
) {
addToCollection(
scryfallId: $scryfallId
quantity: $quantity
foil: $foil
condition: $condition
) {
id
scryfallId
quantity
}
} // Compute total collection value in cents
function collectionValue(items: CollectionItemType[]): number {
return items.reduce((sum, item) => {
const price = item.foil
? (item.card?.priceUsdFoil ?? item.card?.priceUsd ?? 0)
: (item.card?.priceUsd ?? 0);
return sum + price * item.quantity;
}, 0);
}
// Display as dollars
const displayValue = (cents: number) =>
"$" + (cents / 100).toFixed(2); src/graphql/queries/collection.graphql and run codegenTable with columns: card name, set, qty, condition, foil, pricegit commit -m 'feat: collection UI'Card search, board management, CMC curve chart, and commander legality highlighting.
The deck builder is the most token-intensive feature. Plan for multiple sessions. Build in this order: (1) card search panel, (2) board tabs, (3) card grid/list toggle, (4) CMC curve chart, (5) color identity legality highlighting.
Take back the wheel here. Deck builder interaction design benefits from human judgment about layout and drag-drop patterns. Start a session, get scaffolding, then refine by hand.
// Aggregate deck cards into CMC buckets for Recharts
function buildCmcCurve(cards: DeckCardView[]): CmcBucket[] {
const buckets = new Map<number, number>();
for (const dc of cards) {
if (dc.board !== "mainboard") continue;
const cmc = dc.card?.cmc ?? 0;
buckets.set(cmc, (buckets.get(cmc) ?? 0) + dc.quantity);
}
return Array.from(buckets.entries())
.sort(([a], [b]) => a - b)
.map(([cmc, count]) => ({ cmc, count }));
}
// Recharts usage:
// <BarChart data={buildCmcCurve(deckCards)}>
// <Bar dataKey="count" />
// <XAxis dataKey="cmc" />
// </BarChart> function isLegalInCommander(
commanderIdentity: string[],
cardIdentity: string[]
): boolean {
return cardIdentity.every((c) => commanderIdentity.includes(c));
}
// In search results, highlight illegal cards:
const isIllegal = commander &&
!isLegalInCommander(commander.colorIdentity, card.colorIdentity); src/graphql/queries/decks.graphql with deck CRUD queries/mutations and run codegensearchCards query with debouncegit commit -m 'feat: deck builder UI'Recharts area chart showing collection value and individual card price trends over time.
# src/graphql/queries/prices.graphql
query PriceHistory($scryfallId: String!) {
priceHistory(scryfallId: $scryfallId) {
snapshotDate
priceUsd
priceUsdFoil
}
} The total collection value over time requires a server-side aggregation query. Add a collectionValueHistory resolver that groups price_history by snapshot_date, joins to collection on scryfall_id, and sums price_usd * quantity. Return an array of { date, totalCents }.
import { AreaChart, Area, XAxis, YAxis, Tooltip, ResponsiveContainer } from "recharts";
function PriceChart({ data }: { data: { snapshotDate: string; priceUsd?: number }[] }) {
const chartData = data.map(d => ({
date: d.snapshotDate,
price: (d.priceUsd ?? 0) / 100, // display in dollars
}));
return (
<ResponsiveContainer width="100%" height={200}>
<AreaChart data={chartData}>
<XAxis dataKey="date" tick={{ fontSize: 11 }} />
<YAxis tickFormatter={(v) => "$" + v} />
<Tooltip formatter={(v: number) => "$" + v.toFixed(2)} />
<Area
type="monotone" dataKey="price"
stroke="#1D9E75" fill="#E1F5EE"
/>
</AreaChart>
</ResponsiveContainer>
);
} collectionValueHistory resolver on the Python side (group-by query)git commit -m 'feat: price history charts'Python cache layer for EDHRec commander recommendations, displayed in the deck builder.
EDHRec has no official API. The pattern: check edhrec_cache first (TTL 7 days). On a miss, fetch the unofficial JSON endpoint, store it, and return. Isolate JSON parsing behind a single adapter function so a schema change is a one-place fix.
EDHRec's JSON structure has changed in the past. Always wrap parsing in a try/except and return an empty list on parse failure rather than crashing.
import httpx
import uuid
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.models import EdhrecCache
def _commander_slug(name: str) -> str:
"""Convert 'Atraxa, Praetors Voice' → 'atraxa-praetors-voice'"""
return name.lower().replace(",", "").replace("'", "").replace(" ", "-")
def _parse_recommendations(data: dict) -> list[dict]:
"""Adapter layer — isolate parsing here so one place to fix."""
try:
card_views = data.get("container", {}).get("json_dict", {}).get("cardviews", [])
return [
{
"scryfall_id": cv.get("sanitized", ""),
"name": cv.get("name", ""),
"synergy_score": cv.get("synergy", 0),
"inclusion_rate": cv.get("inclusion", 0),
}
for cv in card_views[:40]
]
except Exception:
return []
async def get_edhrec_recs(db: AsyncSession, commander_name: str) -> list[dict]:
slug = _commander_slug(commander_name)
result = await db.execute(
select(EdhrecCache).where(EdhrecCache.slug == slug)
)
cached = result.scalar_one_or_none()
if cached and cached.expires_at > datetime.utcnow():
return _parse_recommendations(cached.data)
url = f"https://json.edhrec.com/pages/commanders/{slug}.json"
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url)
response.raise_for_status()
fresh_data = response.json()
expires = datetime.utcnow() + timedelta(days=7)
if cached:
cached.data = fresh_data
cached.expires_at = expires
else:
db.add(EdhrecCache(
id=str(uuid.uuid4()),
slug=slug,
data=fresh_data,
expires_at=expires,
))
await db.commit()
return _parse_recommendations(fresh_data) # Add to backend/app/graphql/schema.py or a new resolvers/edhrec.py
@strawberry.type
class EdhrecRecommendation:
name: str
synergy_score: float
inclusion_rate: float
owned: bool = False # set by joining against collection
@strawberry.type
class EdhrecQuery:
@strawberry.field
async def edhrec_recommendations(
self, info: Info, commander_name: str
) -> list[EdhrecRecommendation]:
db = info.context["db"]
recs = await get_edhrec_recs(db, commander_name)
# TODO: join against collection to set owned=True
return [EdhrecRecommendation(**r, owned=False) for r in recs] backend/app/services/edhrec.pyEdhrecRecommendation Strawberry type and EdhrecQuery resolverschema.py and run codegengit commit -m 'feat: edhrec integration'