Skip to content

API Reference

AirModel: async ORM for Pydantic models and PostgreSQL.

AirDB

Manages an asyncpg connection pool for :class:AirModel subclasses.

Example::

db = AirDB()
app = air.Air(lifespan=db.lifespan("postgresql://user:pass@host/dbname"))
Source code in src/airmodel/main.py
class AirDB:
    """Manages an asyncpg connection pool for :class:`AirModel` subclasses.

    Example::

        db = AirDB()
        app = air.Air(lifespan=db.lifespan("postgresql://user:pass@host/dbname"))
    """

    def __init__(self) -> None:
        self.pool: Any | None = None  # asyncpg.Pool once connected

    # -- connection lifecycle ------------------------------------------------

    def connect(self, pool: Any) -> None:
        """Set the connection pool and register this as the active database.

        In production, called automatically by :meth:`lifespan`. In tests,
        call directly with a mock pool::

            db = AirDB()
            db.connect(fake_pool)
        """
        self.pool = pool
        _set_current_db(self)

    def disconnect(self) -> None:
        """Unregister this database and clear the pool reference.

        In production, called automatically by :meth:`lifespan`. In tests,
        call in teardown::

            db.disconnect()
        """
        self.pool = None
        _set_current_db(None)

    def lifespan(self, url: str, **pool_kwargs: Any) -> Any:
        """Return an async context manager suitable for ASGI lifespan.

        Args:
            url: PostgreSQL connection string (supports ``postgresql://`` and
                ``postgres://`` schemes, including ``?sslmode=require`` for
                TLS connections like NeonDB).
            **pool_kwargs: Extra keyword arguments forwarded to
                :func:`asyncpg.create_pool`.

        Returns:
            An async context manager that opens the pool on entry and closes
            it on exit.
        """
        db = self

        @asynccontextmanager
        async def _lifespan(app: Any) -> AsyncIterator[None]:
            import asyncpg  # noqa: PLC0415

            pool = await asyncpg.create_pool(url, **pool_kwargs)
            db.connect(pool)
            try:
                yield
            finally:
                if db.pool is not None:
                    await db.pool.close()
                db.disconnect()

        return _lifespan

    # -- transactions --------------------------------------------------------

    @asynccontextmanager
    async def transaction(self) -> AsyncIterator[None]:
        """Async context manager that wraps a block in a database transaction.

        Acquires a connection from the pool, starts a transaction on it, and
        routes all CRUD operations inside the block through that connection
        instead of the pool.

        On clean exit the transaction is committed. If an exception propagates
        out of the block, the transaction is rolled back and the exception is
        re-raised.

        Example::

            async with db.transaction():
                await Item.create(name="a")
                await Item.create(name="b")  # atomic with the first
        """
        pool = self.pool
        if pool is None:
            msg = "No database connection. Ensure AirDB.lifespan() is active."
            raise RuntimeError(msg)
        async with pool.acquire() as conn:
            txn = conn.transaction()
            await txn.start()
            token = _current_connection.set(conn)
            try:
                yield
                await txn.commit()
            except BaseException:
                await txn.rollback()
                raise
            finally:
                _current_connection.reset(token)

    # -- table management ----------------------------------------------------

    async def create_tables(self) -> None:
        """Create or migrate tables for every registered :class:`AirModel`.

        For each model, runs ``CREATE TABLE IF NOT EXISTS`` and then
        ``ALTER TABLE ADD COLUMN`` for any model fields not yet present
        in the database. Non-destructive: never drops columns, never
        changes types. New columns are added without ``NOT NULL`` so
        existing rows aren't broken.
        """
        if self.pool is None:
            msg = "Database pool is not initialized. Did you forget to use db.lifespan()?"
            raise RuntimeError(msg)
        for table_cls in _table_registry:
            sql = table_cls._create_table_sql()
            await self.pool.execute(sql)

            existing = await _get_existing_columns(self.pool, table_cls._table_name())
            if not existing:
                continue

            pk = table_cls._pk_field()
            for field_name in table_cls.model_fields:
                if field_name == pk:
                    continue
                if field_name not in existing:
                    await self.pool.execute(table_cls._add_column_sql(field_name))

connect(pool)

Set the connection pool and register this as the active database.

In production, called automatically by :meth:lifespan. In tests, call directly with a mock pool::

db = AirDB()
db.connect(fake_pool)
Source code in src/airmodel/main.py
def connect(self, pool: Any) -> None:
    """Set the connection pool and register this as the active database.

    In production, called automatically by :meth:`lifespan`. In tests,
    call directly with a mock pool::

        db = AirDB()
        db.connect(fake_pool)
    """
    self.pool = pool
    _set_current_db(self)

create_tables() async

Create or migrate tables for every registered :class:AirModel.

For each model, runs CREATE TABLE IF NOT EXISTS and then ALTER TABLE ADD COLUMN for any model fields not yet present in the database. Non-destructive: never drops columns, never changes types. New columns are added without NOT NULL so existing rows aren't broken.

Source code in src/airmodel/main.py
async def create_tables(self) -> None:
    """Create or migrate tables for every registered :class:`AirModel`.

    For each model, runs ``CREATE TABLE IF NOT EXISTS`` and then
    ``ALTER TABLE ADD COLUMN`` for any model fields not yet present
    in the database. Non-destructive: never drops columns, never
    changes types. New columns are added without ``NOT NULL`` so
    existing rows aren't broken.
    """
    if self.pool is None:
        msg = "Database pool is not initialized. Did you forget to use db.lifespan()?"
        raise RuntimeError(msg)
    for table_cls in _table_registry:
        sql = table_cls._create_table_sql()
        await self.pool.execute(sql)

        existing = await _get_existing_columns(self.pool, table_cls._table_name())
        if not existing:
            continue

        pk = table_cls._pk_field()
        for field_name in table_cls.model_fields:
            if field_name == pk:
                continue
            if field_name not in existing:
                await self.pool.execute(table_cls._add_column_sql(field_name))

disconnect()

Unregister this database and clear the pool reference.

In production, called automatically by :meth:lifespan. In tests, call in teardown::

db.disconnect()
Source code in src/airmodel/main.py
def disconnect(self) -> None:
    """Unregister this database and clear the pool reference.

    In production, called automatically by :meth:`lifespan`. In tests,
    call in teardown::

        db.disconnect()
    """
    self.pool = None
    _set_current_db(None)

lifespan(url, **pool_kwargs)

Return an async context manager suitable for ASGI lifespan.

Parameters:

Name Type Description Default
url str

PostgreSQL connection string (supports postgresql:// and postgres:// schemes, including ?sslmode=require for TLS connections like NeonDB).

required
**pool_kwargs Any

Extra keyword arguments forwarded to :func:asyncpg.create_pool.

{}

Returns:

Type Description
Any

An async context manager that opens the pool on entry and closes

Any

it on exit.

Source code in src/airmodel/main.py
def lifespan(self, url: str, **pool_kwargs: Any) -> Any:
    """Return an async context manager suitable for ASGI lifespan.

    Args:
        url: PostgreSQL connection string (supports ``postgresql://`` and
            ``postgres://`` schemes, including ``?sslmode=require`` for
            TLS connections like NeonDB).
        **pool_kwargs: Extra keyword arguments forwarded to
            :func:`asyncpg.create_pool`.

    Returns:
        An async context manager that opens the pool on entry and closes
        it on exit.
    """
    db = self

    @asynccontextmanager
    async def _lifespan(app: Any) -> AsyncIterator[None]:
        import asyncpg  # noqa: PLC0415

        pool = await asyncpg.create_pool(url, **pool_kwargs)
        db.connect(pool)
        try:
            yield
        finally:
            if db.pool is not None:
                await db.pool.close()
            db.disconnect()

    return _lifespan

transaction() async

Async context manager that wraps a block in a database transaction.

Acquires a connection from the pool, starts a transaction on it, and routes all CRUD operations inside the block through that connection instead of the pool.

On clean exit the transaction is committed. If an exception propagates out of the block, the transaction is rolled back and the exception is re-raised.

Example::

async with db.transaction():
    await Item.create(name="a")
    await Item.create(name="b")  # atomic with the first
Source code in src/airmodel/main.py
@asynccontextmanager
async def transaction(self) -> AsyncIterator[None]:
    """Async context manager that wraps a block in a database transaction.

    Acquires a connection from the pool, starts a transaction on it, and
    routes all CRUD operations inside the block through that connection
    instead of the pool.

    On clean exit the transaction is committed. If an exception propagates
    out of the block, the transaction is rolled back and the exception is
    re-raised.

    Example::

        async with db.transaction():
            await Item.create(name="a")
            await Item.create(name="b")  # atomic with the first
    """
    pool = self.pool
    if pool is None:
        msg = "No database connection. Ensure AirDB.lifespan() is active."
        raise RuntimeError(msg)
    async with pool.acquire() as conn:
        txn = conn.transaction()
        await txn.start()
        token = _current_connection.set(conn)
        try:
            yield
            await txn.commit()
        except BaseException:
            await txn.rollback()
            raise
        finally:
            _current_connection.reset(token)

AirModel

Bases: BaseModel

Base class for database-backed Pydantic models.

Subclass this and declare fields using standard Pydantic annotations. Use :func:AirField with primary_key=True for auto-incrementing primary keys.

The table name is derived from the class name (converted to snake_case). All query methods are async class methods.

Example::

class User(AirModel):
    id: int | None = AirField(default=None, primary_key=True)
    name: str
    email: str
Source code in src/airmodel/main.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
class AirModel(BaseModel):
    """Base class for database-backed Pydantic models.

    Subclass this and declare fields using standard Pydantic annotations.
    Use :func:`AirField` with ``primary_key=True`` for auto-incrementing
    primary keys.

    The table name is derived from the class name (converted to snake_case). All query
    methods are async class methods.

    Example::

        class User(AirModel):
            id: int | None = AirField(default=None, primary_key=True)
            name: str
            email: str
    """

    model_config = ConfigDict(from_attributes=True)

    def __init_subclass__(cls, **kwargs: Any) -> None:
        super().__init_subclass__(**kwargs)
        _table_registry.append(cls)

    # -- SQL generation helpers ----------------------------------------------

    @classmethod
    def _table_name(cls) -> str:
        prefix = _table_prefix(cls.__module__)
        snake = re.sub(r"(?<=[a-z0-9])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])", "_", cls.__name__).lower()
        return f"{prefix}_{snake}"

    @classmethod
    def _pk_field(cls) -> str | None:
        """Return the name of the primary-key field, or None."""
        for name, info in cls.model_fields.items():
            if _is_primary_key(info):
                return name
        return None

    @classmethod
    def _column_defs(cls) -> list[str]:
        """Return a list of ``"column_name TYPE [constraints]"`` strings."""
        cols: list[str] = []
        for field_name, field_info in cls.model_fields.items():
            annotation = field_info.annotation
            is_pk = _is_primary_key(field_info)

            if is_pk:
                cols.append(f'"{field_name}" BIGSERIAL PRIMARY KEY')
                continue

            # Determine the base Python type (unwrap Optional if needed)
            if _is_optional(annotation):
                base_type = _unwrap_optional(annotation)
            else:
                base_type = annotation

            pg_type = _pg_type(base_type)

            # NOT NULL when the field is required and not Optional
            nullable = _is_optional(annotation) if annotation else False
            is_required = field_info.is_required()

            constraint = ""
            if is_required and not nullable:
                constraint = " NOT NULL"

            cols.append(f'"{field_name}" {pg_type}{constraint}')

        return cols

    @classmethod
    def _create_table_sql(cls) -> str:
        """Generate a ``CREATE TABLE IF NOT EXISTS`` statement."""
        cols = cls._column_defs()
        cols_sql = ", ".join(cols)
        return f'CREATE TABLE IF NOT EXISTS "{cls._table_name()}" ({cols_sql})'

    @classmethod
    def _add_column_sql(cls, field_name: str) -> str:
        """Generate an ``ALTER TABLE ADD COLUMN`` statement for a single field.

        Never includes ``NOT NULL``, even for required fields, because
        existing rows have no value for the new column.
        """
        field_info = cls.model_fields[field_name]
        annotation = field_info.annotation
        if _is_optional(annotation):
            base_type = _unwrap_optional(annotation)
        else:
            base_type = annotation
        pg_type = _pg_type(base_type)
        return f'ALTER TABLE "{cls._table_name()}" ADD COLUMN "{field_name}" {pg_type}'

    @classmethod
    def _non_pk_fields(cls) -> list[str]:
        """Return field names excluding the primary key."""
        pk = cls._pk_field()
        return [n for n in cls.model_fields if n != pk]

    # -- CRUD class methods --------------------------------------------------

    @classmethod
    async def create(cls, **kwargs: Any) -> Self:
        """Insert a row and return the populated model instance.

        Keyword arguments correspond to column values. The primary-key field
        (if any) is excluded from the INSERT and populated from the
        ``RETURNING`` clause.

        Returns:
            A new instance of this Table subclass with all fields set.
        """
        pool = _get_pool()
        fields = cls._non_pk_fields()
        insert_fields = [f for f in fields if f in kwargs]
        columns = ", ".join(f'"{f}"' for f in insert_fields)
        placeholders = ", ".join(f"${i + 1}" for i in range(len(insert_fields)))
        values = [kwargs[f] for f in insert_fields]

        sql = f'INSERT INTO "{cls._table_name()}" ({columns}) VALUES ({placeholders}) RETURNING *'  # noqa: S608
        row = await pool.fetchrow(sql, *values)
        return cls.model_validate(dict(row))

    @classmethod
    async def get(cls, **kwargs: Any) -> Self | None:
        """Fetch exactly one row matching the given keyword filters.

        Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
        icontains, in, isnull) in addition to plain equality.

        Returns:
            An instance of this Model subclass, or ``None`` if no row matches.

        Raises:
            MultipleObjectsReturned: If more than one row matches the filters.
        """
        pool = _get_pool()
        conditions, values = _parse_kwargs(kwargs)
        where = " AND ".join(conditions)
        sql = f'SELECT * FROM "{cls._table_name()}" WHERE {where} LIMIT 2'  # noqa: S608
        rows = await pool.fetch(sql, *values)
        if not rows:
            return None
        if len(rows) > 1:
            msg = f"{cls.__name__}.get() matched more than one row. Use filter() to retrieve multiple results."
            raise MultipleObjectsReturned(msg)
        return cls.model_validate(dict(rows[0]))

    @classmethod
    async def filter(
        cls,
        *,
        order_by: str | None = None,
        limit: int | None = None,
        offset: int | None = None,
        **kwargs: Any,
    ) -> list[Self]:
        """Fetch all rows matching the given keyword filters.

        Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
        icontains, in, isnull) in addition to plain equality.

        Args:
            order_by: Optional field name to sort by. Prefix with ``-`` for
                descending order (e.g. ``"-name"``).
            limit: Maximum number of rows to return.
            offset: Number of rows to skip before returning results.
            **kwargs: Column name/value pairs to filter by, with optional
                ``__lookup`` suffixes.

        Returns:
            A list of model instances (possibly empty).
        """
        pool = _get_pool()
        if not kwargs:
            return await cls.all(order_by=order_by, limit=limit, offset=offset)
        conditions, values = _parse_kwargs(kwargs)
        where = " AND ".join(conditions)
        sql = f'SELECT * FROM "{cls._table_name()}" WHERE {where}'  # noqa: S608
        if order_by is not None:
            if order_by.startswith("-"):
                sql += f' ORDER BY "{order_by[1:]}" DESC'
            else:
                sql += f' ORDER BY "{order_by}" ASC'
        if limit is not None:
            sql += f" LIMIT {limit}"
        if offset is not None:
            sql += f" OFFSET {offset}"
        rows = await pool.fetch(sql, *values)
        return [cls.model_validate(dict(r)) for r in rows]

    @classmethod
    async def all(
        cls,
        *,
        order_by: str | None = None,
        limit: int | None = None,
        offset: int | None = None,
    ) -> list[Self]:
        """Fetch every row from the table.

        Args:
            order_by: Optional field name to sort by. Prefix with ``-`` for
                descending order (e.g. ``"-name"``).
            limit: Maximum number of rows to return.
            offset: Number of rows to skip before returning results.

        Returns:
            A list of all model instances.
        """
        pool = _get_pool()
        sql = f'SELECT * FROM "{cls._table_name()}"'  # noqa: S608
        if order_by is not None:
            if order_by.startswith("-"):
                sql += f' ORDER BY "{order_by[1:]}" DESC'
            else:
                sql += f' ORDER BY "{order_by}" ASC'
        if limit is not None:
            sql += f" LIMIT {limit}"
        if offset is not None:
            sql += f" OFFSET {offset}"
        rows = await pool.fetch(sql)
        return [cls.model_validate(dict(r)) for r in rows]

    @classmethod
    async def count(cls, **kwargs: Any) -> int:
        """Return the number of rows, optionally filtered by keyword arguments.

        Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
        icontains, in, isnull) in addition to plain equality.

        Returns:
            Integer row count.
        """
        pool = _get_pool()
        if kwargs:
            conditions, values = _parse_kwargs(kwargs)
            where = " AND ".join(conditions)
            sql = f'SELECT COUNT(*) FROM "{cls._table_name()}" WHERE {where}'  # noqa: S608
            return await pool.fetchval(sql, *values)
        sql = f'SELECT COUNT(*) FROM "{cls._table_name()}"'  # noqa: S608
        return await pool.fetchval(sql)

    # -- Bulk class methods ---------------------------------------------------

    @classmethod
    async def bulk_create(cls, items: list[dict[str, Any]]) -> list[Self]:
        """Insert multiple rows in a single query and return the new instances.

        Builds a multi-row INSERT with ``RETURNING *`` so only one round-trip
        is needed regardless of list length.

        Args:
            items: A list of dicts, each mapping column names to values.

        Returns:
            A list of model instances, one per inserted row.
        """
        if not items:
            return []

        pool = _get_pool()
        fields = cls._non_pk_fields()
        # Use the columns present in the first item (all items must have the same keys)
        insert_fields = [f for f in fields if f in items[0]]
        columns = ", ".join(f'"{f}"' for f in insert_fields)

        # Build ($1, $2), ($3, $4), ... with flattened parameter list
        value_groups: list[str] = []
        all_values: list[Any] = []
        for i, item in enumerate(items):
            offset = i * len(insert_fields)
            placeholders = ", ".join(f"${offset + j + 1}" for j in range(len(insert_fields)))
            value_groups.append(f"({placeholders})")
            all_values.extend(item[f] for f in insert_fields)

        values_sql = ", ".join(value_groups)
        sql = f'INSERT INTO "{cls._table_name()}" ({columns}) VALUES {values_sql} RETURNING *'
        rows = await pool.fetch(sql, *all_values)
        return [cls.model_validate(dict(r)) for r in rows]

    @classmethod
    async def bulk_update(cls, set_values: dict[str, Any], **filter_kwargs: Any) -> int:
        """Update multiple rows matching the filter and return the count affected.

        Args:
            set_values: A dict of column names to new values for the SET clause.
            **filter_kwargs: Column name/value pairs (with optional ``__lookup``
                suffixes) for the WHERE clause.

        Returns:
            The number of rows updated.
        """
        pool = _get_pool()
        set_clauses = [f'"{col}" = ${i + 1}' for i, col in enumerate(set_values)]
        set_sql = ", ".join(set_clauses)
        set_params = list(set_values.values())

        conditions, where_params = _parse_kwargs(filter_kwargs, start_idx=len(set_values) + 1)
        where_sql = " AND ".join(conditions)

        sql = f'UPDATE "{cls._table_name()}" SET {set_sql} WHERE {where_sql}'
        status = await pool.execute(sql, *set_params, *where_params)
        # asyncpg returns e.g. "UPDATE 3"
        return int(status.split()[-1])

    @classmethod
    async def bulk_delete(cls, **filter_kwargs: Any) -> int:
        """Delete all rows matching the filter and return the count deleted.

        Args:
            **filter_kwargs: Column name/value pairs (with optional ``__lookup``
                suffixes) for the WHERE clause.

        Returns:
            The number of rows deleted.
        """
        pool = _get_pool()
        conditions, values = _parse_kwargs(filter_kwargs)
        where_sql = " AND ".join(conditions)

        sql = f'DELETE FROM "{cls._table_name()}" WHERE {where_sql}'
        status = await pool.execute(sql, *values)
        # asyncpg returns e.g. "DELETE 5"
        return int(status.split()[-1])

    # -- Instance methods ----------------------------------------------------

    async def save(self, *, update_fields: list[str] | None = None) -> None:
        """Update the row identified by this instance's primary key.

        All non-PK fields are written. Raises :class:`ValueError` if the
        model has no primary-key field or if the PK value is ``None``.
        """
        pool = _get_pool()
        pk = self._pk_field()
        if pk is None:
            msg = f"{type(self).__name__} has no primary_key field"
            raise ValueError(msg)
        pk_value = getattr(self, pk)
        if pk_value is None:
            msg = "Cannot save a row without a primary key value. Use create() for new rows."
            raise ValueError(msg)

        if update_fields is not None and len(update_fields) == 0:
            msg = "update_fields cannot be empty. Omit the argument to update all fields."
            raise ValueError(msg)

        fields = update_fields if update_fields is not None else self._non_pk_fields()
        set_clauses = [f'"{f}" = ${i + 1}' for i, f in enumerate(fields)]
        values = [getattr(self, f) for f in fields]
        pk_placeholder = f"${len(fields) + 1}"

        sql = f'UPDATE "{self._table_name()}" SET {", ".join(set_clauses)} WHERE "{pk}" = {pk_placeholder} RETURNING *'
        row = await pool.fetchrow(sql, *values, pk_value)
        for field_name in type(self).model_fields:
            if field_name in row:
                object.__setattr__(self, field_name, row[field_name])

    async def delete(self) -> None:
        """Delete the row identified by this instance's primary key.

        Raises :class:`ValueError` if the model has no primary-key field or
        if the PK value is ``None``.
        """
        pool = _get_pool()
        pk = self._pk_field()
        if pk is None:
            msg = f"{type(self).__name__} has no primary_key field"
            raise ValueError(msg)
        pk_value = getattr(self, pk)
        if pk_value is None:
            msg = "Cannot delete a row without a primary key value."
            raise ValueError(msg)

        sql = f'DELETE FROM "{self._table_name()}" WHERE "{pk}" = $1'
        await pool.execute(sql, pk_value)
        object.__setattr__(self, pk, None)

all(*, order_by=None, limit=None, offset=None) async classmethod

Fetch every row from the table.

Parameters:

Name Type Description Default
order_by str | None

Optional field name to sort by. Prefix with - for descending order (e.g. "-name").

None
limit int | None

Maximum number of rows to return.

None
offset int | None

Number of rows to skip before returning results.

None

Returns:

Type Description
list[Self]

A list of all model instances.

Source code in src/airmodel/main.py
@classmethod
async def all(
    cls,
    *,
    order_by: str | None = None,
    limit: int | None = None,
    offset: int | None = None,
) -> list[Self]:
    """Fetch every row from the table.

    Args:
        order_by: Optional field name to sort by. Prefix with ``-`` for
            descending order (e.g. ``"-name"``).
        limit: Maximum number of rows to return.
        offset: Number of rows to skip before returning results.

    Returns:
        A list of all model instances.
    """
    pool = _get_pool()
    sql = f'SELECT * FROM "{cls._table_name()}"'  # noqa: S608
    if order_by is not None:
        if order_by.startswith("-"):
            sql += f' ORDER BY "{order_by[1:]}" DESC'
        else:
            sql += f' ORDER BY "{order_by}" ASC'
    if limit is not None:
        sql += f" LIMIT {limit}"
    if offset is not None:
        sql += f" OFFSET {offset}"
    rows = await pool.fetch(sql)
    return [cls.model_validate(dict(r)) for r in rows]

bulk_create(items) async classmethod

Insert multiple rows in a single query and return the new instances.

Builds a multi-row INSERT with RETURNING * so only one round-trip is needed regardless of list length.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

A list of dicts, each mapping column names to values.

required

Returns:

Type Description
list[Self]

A list of model instances, one per inserted row.

Source code in src/airmodel/main.py
@classmethod
async def bulk_create(cls, items: list[dict[str, Any]]) -> list[Self]:
    """Insert multiple rows in a single query and return the new instances.

    Builds a multi-row INSERT with ``RETURNING *`` so only one round-trip
    is needed regardless of list length.

    Args:
        items: A list of dicts, each mapping column names to values.

    Returns:
        A list of model instances, one per inserted row.
    """
    if not items:
        return []

    pool = _get_pool()
    fields = cls._non_pk_fields()
    # Use the columns present in the first item (all items must have the same keys)
    insert_fields = [f for f in fields if f in items[0]]
    columns = ", ".join(f'"{f}"' for f in insert_fields)

    # Build ($1, $2), ($3, $4), ... with flattened parameter list
    value_groups: list[str] = []
    all_values: list[Any] = []
    for i, item in enumerate(items):
        offset = i * len(insert_fields)
        placeholders = ", ".join(f"${offset + j + 1}" for j in range(len(insert_fields)))
        value_groups.append(f"({placeholders})")
        all_values.extend(item[f] for f in insert_fields)

    values_sql = ", ".join(value_groups)
    sql = f'INSERT INTO "{cls._table_name()}" ({columns}) VALUES {values_sql} RETURNING *'
    rows = await pool.fetch(sql, *all_values)
    return [cls.model_validate(dict(r)) for r in rows]

bulk_delete(**filter_kwargs) async classmethod

Delete all rows matching the filter and return the count deleted.

Parameters:

Name Type Description Default
**filter_kwargs Any

Column name/value pairs (with optional __lookup suffixes) for the WHERE clause.

{}

Returns:

Type Description
int

The number of rows deleted.

Source code in src/airmodel/main.py
@classmethod
async def bulk_delete(cls, **filter_kwargs: Any) -> int:
    """Delete all rows matching the filter and return the count deleted.

    Args:
        **filter_kwargs: Column name/value pairs (with optional ``__lookup``
            suffixes) for the WHERE clause.

    Returns:
        The number of rows deleted.
    """
    pool = _get_pool()
    conditions, values = _parse_kwargs(filter_kwargs)
    where_sql = " AND ".join(conditions)

    sql = f'DELETE FROM "{cls._table_name()}" WHERE {where_sql}'
    status = await pool.execute(sql, *values)
    # asyncpg returns e.g. "DELETE 5"
    return int(status.split()[-1])

bulk_update(set_values, **filter_kwargs) async classmethod

Update multiple rows matching the filter and return the count affected.

Parameters:

Name Type Description Default
set_values dict[str, Any]

A dict of column names to new values for the SET clause.

required
**filter_kwargs Any

Column name/value pairs (with optional __lookup suffixes) for the WHERE clause.

{}

Returns:

Type Description
int

The number of rows updated.

Source code in src/airmodel/main.py
@classmethod
async def bulk_update(cls, set_values: dict[str, Any], **filter_kwargs: Any) -> int:
    """Update multiple rows matching the filter and return the count affected.

    Args:
        set_values: A dict of column names to new values for the SET clause.
        **filter_kwargs: Column name/value pairs (with optional ``__lookup``
            suffixes) for the WHERE clause.

    Returns:
        The number of rows updated.
    """
    pool = _get_pool()
    set_clauses = [f'"{col}" = ${i + 1}' for i, col in enumerate(set_values)]
    set_sql = ", ".join(set_clauses)
    set_params = list(set_values.values())

    conditions, where_params = _parse_kwargs(filter_kwargs, start_idx=len(set_values) + 1)
    where_sql = " AND ".join(conditions)

    sql = f'UPDATE "{cls._table_name()}" SET {set_sql} WHERE {where_sql}'
    status = await pool.execute(sql, *set_params, *where_params)
    # asyncpg returns e.g. "UPDATE 3"
    return int(status.split()[-1])

count(**kwargs) async classmethod

Return the number of rows, optionally filtered by keyword arguments.

Supports Django-style __ lookups (gt, gte, lt, lte, contains, icontains, in, isnull) in addition to plain equality.

Returns:

Type Description
int

Integer row count.

Source code in src/airmodel/main.py
@classmethod
async def count(cls, **kwargs: Any) -> int:
    """Return the number of rows, optionally filtered by keyword arguments.

    Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
    icontains, in, isnull) in addition to plain equality.

    Returns:
        Integer row count.
    """
    pool = _get_pool()
    if kwargs:
        conditions, values = _parse_kwargs(kwargs)
        where = " AND ".join(conditions)
        sql = f'SELECT COUNT(*) FROM "{cls._table_name()}" WHERE {where}'  # noqa: S608
        return await pool.fetchval(sql, *values)
    sql = f'SELECT COUNT(*) FROM "{cls._table_name()}"'  # noqa: S608
    return await pool.fetchval(sql)

create(**kwargs) async classmethod

Insert a row and return the populated model instance.

Keyword arguments correspond to column values. The primary-key field (if any) is excluded from the INSERT and populated from the RETURNING clause.

Returns:

Type Description
Self

A new instance of this Table subclass with all fields set.

Source code in src/airmodel/main.py
@classmethod
async def create(cls, **kwargs: Any) -> Self:
    """Insert a row and return the populated model instance.

    Keyword arguments correspond to column values. The primary-key field
    (if any) is excluded from the INSERT and populated from the
    ``RETURNING`` clause.

    Returns:
        A new instance of this Table subclass with all fields set.
    """
    pool = _get_pool()
    fields = cls._non_pk_fields()
    insert_fields = [f for f in fields if f in kwargs]
    columns = ", ".join(f'"{f}"' for f in insert_fields)
    placeholders = ", ".join(f"${i + 1}" for i in range(len(insert_fields)))
    values = [kwargs[f] for f in insert_fields]

    sql = f'INSERT INTO "{cls._table_name()}" ({columns}) VALUES ({placeholders}) RETURNING *'  # noqa: S608
    row = await pool.fetchrow(sql, *values)
    return cls.model_validate(dict(row))

delete() async

Delete the row identified by this instance's primary key.

Raises :class:ValueError if the model has no primary-key field or if the PK value is None.

Source code in src/airmodel/main.py
async def delete(self) -> None:
    """Delete the row identified by this instance's primary key.

    Raises :class:`ValueError` if the model has no primary-key field or
    if the PK value is ``None``.
    """
    pool = _get_pool()
    pk = self._pk_field()
    if pk is None:
        msg = f"{type(self).__name__} has no primary_key field"
        raise ValueError(msg)
    pk_value = getattr(self, pk)
    if pk_value is None:
        msg = "Cannot delete a row without a primary key value."
        raise ValueError(msg)

    sql = f'DELETE FROM "{self._table_name()}" WHERE "{pk}" = $1'
    await pool.execute(sql, pk_value)
    object.__setattr__(self, pk, None)

filter(*, order_by=None, limit=None, offset=None, **kwargs) async classmethod

Fetch all rows matching the given keyword filters.

Supports Django-style __ lookups (gt, gte, lt, lte, contains, icontains, in, isnull) in addition to plain equality.

Parameters:

Name Type Description Default
order_by str | None

Optional field name to sort by. Prefix with - for descending order (e.g. "-name").

None
limit int | None

Maximum number of rows to return.

None
offset int | None

Number of rows to skip before returning results.

None
**kwargs Any

Column name/value pairs to filter by, with optional __lookup suffixes.

{}

Returns:

Type Description
list[Self]

A list of model instances (possibly empty).

Source code in src/airmodel/main.py
@classmethod
async def filter(
    cls,
    *,
    order_by: str | None = None,
    limit: int | None = None,
    offset: int | None = None,
    **kwargs: Any,
) -> list[Self]:
    """Fetch all rows matching the given keyword filters.

    Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
    icontains, in, isnull) in addition to plain equality.

    Args:
        order_by: Optional field name to sort by. Prefix with ``-`` for
            descending order (e.g. ``"-name"``).
        limit: Maximum number of rows to return.
        offset: Number of rows to skip before returning results.
        **kwargs: Column name/value pairs to filter by, with optional
            ``__lookup`` suffixes.

    Returns:
        A list of model instances (possibly empty).
    """
    pool = _get_pool()
    if not kwargs:
        return await cls.all(order_by=order_by, limit=limit, offset=offset)
    conditions, values = _parse_kwargs(kwargs)
    where = " AND ".join(conditions)
    sql = f'SELECT * FROM "{cls._table_name()}" WHERE {where}'  # noqa: S608
    if order_by is not None:
        if order_by.startswith("-"):
            sql += f' ORDER BY "{order_by[1:]}" DESC'
        else:
            sql += f' ORDER BY "{order_by}" ASC'
    if limit is not None:
        sql += f" LIMIT {limit}"
    if offset is not None:
        sql += f" OFFSET {offset}"
    rows = await pool.fetch(sql, *values)
    return [cls.model_validate(dict(r)) for r in rows]

get(**kwargs) async classmethod

Fetch exactly one row matching the given keyword filters.

Supports Django-style __ lookups (gt, gte, lt, lte, contains, icontains, in, isnull) in addition to plain equality.

Returns:

Type Description
Self | None

An instance of this Model subclass, or None if no row matches.

Raises:

Type Description
MultipleObjectsReturned

If more than one row matches the filters.

Source code in src/airmodel/main.py
@classmethod
async def get(cls, **kwargs: Any) -> Self | None:
    """Fetch exactly one row matching the given keyword filters.

    Supports Django-style ``__`` lookups (gt, gte, lt, lte, contains,
    icontains, in, isnull) in addition to plain equality.

    Returns:
        An instance of this Model subclass, or ``None`` if no row matches.

    Raises:
        MultipleObjectsReturned: If more than one row matches the filters.
    """
    pool = _get_pool()
    conditions, values = _parse_kwargs(kwargs)
    where = " AND ".join(conditions)
    sql = f'SELECT * FROM "{cls._table_name()}" WHERE {where} LIMIT 2'  # noqa: S608
    rows = await pool.fetch(sql, *values)
    if not rows:
        return None
    if len(rows) > 1:
        msg = f"{cls.__name__}.get() matched more than one row. Use filter() to retrieve multiple results."
        raise MultipleObjectsReturned(msg)
    return cls.model_validate(dict(rows[0]))

save(*, update_fields=None) async

Update the row identified by this instance's primary key.

All non-PK fields are written. Raises :class:ValueError if the model has no primary-key field or if the PK value is None.

Source code in src/airmodel/main.py
async def save(self, *, update_fields: list[str] | None = None) -> None:
    """Update the row identified by this instance's primary key.

    All non-PK fields are written. Raises :class:`ValueError` if the
    model has no primary-key field or if the PK value is ``None``.
    """
    pool = _get_pool()
    pk = self._pk_field()
    if pk is None:
        msg = f"{type(self).__name__} has no primary_key field"
        raise ValueError(msg)
    pk_value = getattr(self, pk)
    if pk_value is None:
        msg = "Cannot save a row without a primary key value. Use create() for new rows."
        raise ValueError(msg)

    if update_fields is not None and len(update_fields) == 0:
        msg = "update_fields cannot be empty. Omit the argument to update all fields."
        raise ValueError(msg)

    fields = update_fields if update_fields is not None else self._non_pk_fields()
    set_clauses = [f'"{f}" = ${i + 1}' for i, f in enumerate(fields)]
    values = [getattr(self, f) for f in fields]
    pk_placeholder = f"${len(fields) + 1}"

    sql = f'UPDATE "{self._table_name()}" SET {", ".join(set_clauses)} WHERE "{pk}" = {pk_placeholder} RETURNING *'
    row = await pool.fetchrow(sql, *values, pk_value)
    for field_name in type(self).model_fields:
        if field_name in row:
            object.__setattr__(self, field_name, row[field_name])

MultipleObjectsReturned

Bases: Exception

Raised by :meth:AirModel.get when the query matches more than one row.

Source code in src/airmodel/main.py
class MultipleObjectsReturned(Exception):
    """Raised by :meth:`AirModel.get` when the query matches more than one row."""