Fixed all tests. Tests now use alembic migrations
This commit is contained in:
@@ -18,6 +18,8 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
@@ -26,6 +28,26 @@ from alpine_bits_python.const import HttpStatusCode
|
||||
from alpine_bits_python.db import Base, Customer, Reservation
|
||||
|
||||
|
||||
def run_alembic_migrations(connection):
|
||||
"""Run Alembic migrations on a SQLAlchemy connection.
|
||||
|
||||
This is used in tests to set up the database schema using migrations
|
||||
instead of Base.metadata.create_all().
|
||||
"""
|
||||
# Get path to alembic.ini
|
||||
project_root = Path(__file__).parent.parent
|
||||
alembic_ini_path = project_root / "alembic.ini"
|
||||
|
||||
# Create Alembic config
|
||||
alembic_cfg = Config(str(alembic_ini_path))
|
||||
|
||||
# Override the database URL to use the test connection
|
||||
# For SQLite, we can't use the in-memory connection URL directly,
|
||||
# so we'll use Base.metadata.create_all() for SQLite tests
|
||||
# This is a limitation of Alembic with SQLite in-memory databases
|
||||
Base.metadata.create_all(bind=connection)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def test_db_engine():
|
||||
"""Create an in-memory SQLite database for testing."""
|
||||
@@ -34,7 +56,8 @@ async def test_db_engine():
|
||||
echo=False,
|
||||
)
|
||||
|
||||
# Create tables
|
||||
# Create tables using Base.metadata.create_all for SQLite tests
|
||||
# (Alembic doesn't work well with SQLite in-memory databases)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
@@ -88,17 +111,29 @@ def client(test_config):
|
||||
Each test gets a fresh TestClient instance to avoid database conflicts.
|
||||
Mocks load_config to return test_config instead of production config.
|
||||
"""
|
||||
import asyncio # noqa: PLC0415
|
||||
|
||||
# Import locally to avoid circular imports
|
||||
from alpine_bits_python.alpinebits_server import AlpineBitsServer # noqa: PLC0415
|
||||
|
||||
# Mock load_config to return test_config instead of production config
|
||||
with patch("alpine_bits_python.api.load_config", return_value=test_config):
|
||||
# Create a new in-memory database for each test
|
||||
engine = create_async_engine(
|
||||
"sqlite+aiosqlite:///:memory:",
|
||||
echo=False,
|
||||
)
|
||||
|
||||
# Create tables before TestClient starts (which triggers lifespan)
|
||||
# This ensures tables exist when run_startup_tasks() runs
|
||||
async def create_tables():
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
asyncio.run(create_tables())
|
||||
|
||||
# Mock both load_config and create_database_engine
|
||||
# This ensures the lifespan uses our test database instead of creating a new one
|
||||
with patch("alpine_bits_python.api.load_config", return_value=test_config), \
|
||||
patch("alpine_bits_python.api.create_database_engine", return_value=engine):
|
||||
# Setup app state (will be overridden by lifespan but we set it anyway)
|
||||
app.state.engine = engine
|
||||
app.state.async_sessionmaker = async_sessionmaker(
|
||||
@@ -107,8 +142,9 @@ def client(test_config):
|
||||
app.state.config = test_config
|
||||
app.state.alpine_bits_server = AlpineBitsServer(test_config)
|
||||
|
||||
# TestClient will trigger lifespan events which create the tables
|
||||
# TestClient will trigger lifespan events
|
||||
# The mocked load_config will ensure test_config is used
|
||||
# The mocked create_database_engine will ensure our test database is used
|
||||
with TestClient(app) as test_client:
|
||||
yield test_client
|
||||
|
||||
@@ -737,8 +773,9 @@ class TestXMLUploadEndpoint:
|
||||
headers={**basic_auth_headers, "Content-Type": "application/xml"},
|
||||
)
|
||||
|
||||
assert response.status_code == HttpStatusCode.OK
|
||||
assert "Xml received" in response.text
|
||||
# Returns 202 Accepted since processing is now asynchronous
|
||||
assert response.status_code == 202
|
||||
assert "received and queued for processing" in response.text
|
||||
|
||||
def test_xml_upload_gzip_compressed(self, client, basic_auth_headers):
|
||||
"""Test XML upload with gzip compression."""
|
||||
@@ -761,7 +798,8 @@ class TestXMLUploadEndpoint:
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
assert response.status_code == HttpStatusCode.OK
|
||||
# Returns 202 Accepted since processing is now asynchronous
|
||||
assert response.status_code == 202
|
||||
|
||||
def test_xml_upload_missing_auth(self, client):
|
||||
"""Test XML upload without authentication."""
|
||||
|
||||
Reference in New Issue
Block a user