Azure Queue Storageで非同期タスク実行中に遭遇した「トランザクション境界問題」。Transactional Outbox パターンで解決した実装プロセス

BLOG TITLE
Resonalエンジニアリング部

Resonalエンジニアリング部

2026/01/09

サービス上で、ユーザーが CSV や Excel ファイルをアップロードすると、非同期でパース処理が走る仕組みを作成しています。API サーバーはアップロードを受け付けてタスクを作成し、実際のパース処理は Worker プロセスに任せるような仕組みです。API と Worker の間の通信には Azure Queue Storageを利用しています。

DB の操作だけで完結する処理であれば、トランザクションの整合性は DB が保証してくれます。「ユーザーを作成して、そのユーザーのプロフィールも作成する」といった処理は、両方成功するか両方失敗するかのどちらかがあるべき姿のはずです。

ところが、今回のようにAzure Queue Storageのような 非同期のメッセージングサービスを使うと話が変わってきます。DB とキューは別々の仕組みで動いていて、同じトランザクションに参加できません。「DB への保存」と「キューへの送信」を一緒にコミット/ロールバックする方法がないのです。この 2 つをどう整合させるかという「トランザクション境界問題」にぶつかりました。

今回は、Transactional Outbox パターンでその問題を解決しました。その実装プロセスを紹介していきます。

問題の発見:キューの送信と DB 保存のタイミング

このサービスでは、アップロードされたファイル、CSVやExcelの情報を SourceFile というテーブルで管理しています。ファイル名やストレージ上のパスを保持します。Taskテーブルも用意しており、こちらはは非同期処理のジョブ管理のためのものです。Task には処理の進捗状況(PENDING / RUNNINGなど)を記録します。

処理の流れは以下のようになっています。

1. ユーザーがファイルをアップロード
2. SourceFile レコードを DB に保存(ファイルのメタ情報)
3. Task レコードを DB に保存 (非同期ジョブの管理用)
4. Azure Queue Storageにパース依頼メッセージを送信
5. DBトランザクションをコミット
6. Worker がキューからメッセージを取得してパース実行

一見問題なさそうですが、4番目と5番目の間に落とし穴があります。

キュー送信成功 → DB コミット失敗で不整合が発生する可能性

実際に書いていたコードは以下の通りです。

async def execute(self, ...) -> CreateSourceFileResult:
 # 1. SourceFile を作成して DB に保存
 source_file = SourceFile.create(...)
 self._repository.save(source_file)

 # 2. Task を作成して DB に保存
 task = Task.create(...)
 self._task_repository.save(task)

 # 3. Azure Queue Storageにメッセージを送信
 await self._queue_client.send_message({
   "task_id": str(task.task_id),
   "source_file_id": str(source_file.source_file_id),
 })
 
 # この後で DB commit が実行される
 return CreateSourceFileResult(source_file=source_file, task=task)

キューへのメッセージ送信は成功したけど、その後の DB コミットが失敗するケースを考えてみてください。キューには「task_id: xxx-xxx-xxx を処理してね」というメッセージが残ってしまいます。でも DB には task_id: xxx-xxx-xxx は存在しません。DBではロールバックされる仕組みが整っているからです。

そうした場合、ワーカーがキューからメッセージを取り出して処理しようとしたとき、タスクがDBにレコードがないためエラーになります。

発生確率は当然ながら低いはずです。DB コミットが失敗するなんて滅多におきません。ディスクがいっぱいになったり、ネットワークの瞬断があったり、DB サーバーがクラッシュしたり。運用が長くなれば、起きる可能性があります。

正直、まだMVPのようなレベルなので実装を後回しにしてもよかったのですが、自分の知識として担保しておきたいため、先んじて実装をしておくこととしました。

Transactional Outbox パターンとは

さて、この問題を解決するの一つの手段が Transactional Outbox パターン です。

考え方はシンプルです。キューに直接メッセージを送信するのではなく、「送信すべきメッセージ」を管理するためのテーブルを用意してそちらにメッセージを書き込みます。そして別のプロセスがそのテーブルを監視して、実際のキュー送信を行います。

こうすることで、業務データ(今回でいうSourceFile, Task)と送信予定メッセージ(Outbox)が同じ DB トランザクション内で保存されます。コミットが成功すれば両方とも保存され、失敗すれば両方ともロールバックされます。データベースの「原子性」という性質を活用しているわけです。

原子性とは「複数の操作がすべて成功するか、すべて失敗するかのどちらかになる」という性質です。「半分だけ成功」という中途半端な状態にはなりません。データベースのトランザクションには ACID 特性というものがあります。その中の「A」が原子性(Atomicity)です。

銀行の振込処理を例にすると分かりやすいです。A さんの口座から 1 万円引いて、B さんの口座に 1 万円足す。この 2 つの操作は必ずセットで成功するか、セットで失敗する必要があります。A さんから引いたけど B さんに足せなかった、という状態になってしまうのが大問題ということです。

Outbox パターンは、「DB 保存」と「メッセージ送信予約」の原子性を担保するための手法です。

Outbox パターンを適用した後の全体のフロー

Outbox パターンを導入した後の処理フローを図にすると、次のようになります。

  1. DB保存(一元管理)
    • ファイル・タスク・送信待ちメッセージを同一トランザクションで保存。
    • ※ ここで「メッセージの送り漏れ」を防ぎます。
  2. キューへ転送(非同期)
    • 別プロセスがDBから「PENDING」を拾い、Azure Queue Storageへ送信。
    • 送信完了後、DBのステータスを更新。
  3. バックグラウンド処理
    • ワーカーがキューからメッセージを受け取り、パースを実行。

ポイントは「UseCase 内ではキューに直接送信しない」ことです。代わりに OutboxMessage というレコードを DB に書き込むだけ。実際の キュー送信は別プロセスの OutboxProcessor が担当します。

具体的なOutboxパターンの実装

それでは実装についてそれぞれコードを書きながら解説していきます。

OutboxMessage エンティティ

まずは Outbox に格納するメッセージを表すエンティティを作ります。

from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import uuid


class OutboxMessageStatus(str, Enum):
    """Outbox メッセージのステータス"""
    PENDING = "PENDING"  # 未処理
    PROCESSING = "PROCESSING"  # 処理中
    COMPLETED = "COMPLETED"  # 送信完了
    FAILED = "FAILED"  # 送信失敗


class OutboxEventType(str, Enum):
    """イベントの種類"""
    PARSE_TASK = "PARSE_TASK"


@dataclass
class OutboxMessage:
    """Outbox メッセージエンティティ"""
    message_id: uuid.UUID
    event_type: OutboxEventType
    payload: dict[str, Any]
    status: OutboxMessageStatus
    retry_count: int
    created_at: datetime
    processed_at: datetime | None

    @staticmethod
    def create(
        event_type: OutboxEventType,
        payload: dict[str, Any],
    ) -> "OutboxMessage":
        """新しい Outbox メッセージを作成する"""
        return OutboxMessage(
            message_id=uuid.uuid4(),
            event_type=event_type,
            payload=payload,
            status=OutboxMessageStatus.PENDING,
            retry_count=0,
            created_at=datetime.now(UTC),
            processed_at=None,
        )

    def mark_processing(self) -> None:
        """処理中に変更する"""
        self.status = OutboxMessageStatus.PROCESSING

    def mark_completed(self) -> None:
        """完了に変更する"""
        self.status = OutboxMessageStatus.COMPLETED
        self.processed_at = datetime.now(UTC)

    def mark_failed(self) -> None:
        """失敗に変更し、リトライ回数を増やす"""
        self.status = OutboxMessageStatus.FAILED
        self.retry_count += 1

payload フィールドには JSON 形式で任意のデータを入れられるようにしています。パースタスクの場合は task_idsource_file_id を格納します。

テーブル設計と Repository

次に DB テーブルと Repository(永続層) を作ります。このプロジェクトでは SQLModel を使っています。JSONB は PostgreSQL 固有の型なので、 sa_column パラメータを通じて SQLAlchemy のColumn を直接指定しています。

from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, Session, SQLModel, select


class OutboxMessageTable(SQLModel, table=True):
    """Outbox メッセージテーブル"""
    __tablename__ = "outbox_messages"

    message_id: uuid.UUID = Field(primary_key=True)
    event_type: str = Field(index=True)
    payload: dict = Field(sa_column=Column(JSONB))
    status: str = Field(index=True)
    retry_count: int = Field(default=0)
    created_at: datetime
    processed_at: datetime | None = Field(default=None)


class OutboxRepository:
    """Outbox メッセージの Repository"""

    def __init__(self, session: Session) -> None:
        self._session = session

    def save(self, message: OutboxMessage) -> None:
        """メッセージを保存する"""
        table = OutboxMessageTable(
            message_id=message.message_id,
            event_type=message.event_type.value,
            payload=message.payload,
            status=message.status.value,
            retry_count=message.retry_count,
            created_at=message.created_at,
            processed_at=message.processed_at,
        )
        self._session.merge(table)
        self._session.flush()

    def find_pending_messages(self, limit: int = 100) -> list[OutboxMessage]:
        """PENDING のメッセージを取得する"""
        statement = (
            select(OutboxMessageTable)
            .where(OutboxMessageTable.status == "PENDING")
            .order_by(OutboxMessageTable.created_at)
            .limit(limit)
            .with_for_update(skip_locked=True)
        )
        results = self._session.exec(statement).all()
        return [self._to_entity(r) for r in results]

ちなみにwith_for_update(skip_locked=True) は複数の Worker が同時に動いている場合に重要になります。

FOR UPDATE は「この行をロックして、他のトランザクションが更新できないようにする」という SQL の機能です。Worker A がメッセージを取得したら、Worker B は同じメッセージを取得できなくなります。SKIP LOCKED は「すでにロックされている行はスキップする」という意味です。Worker B は Worker A がロック中の行を待たずに、ロックされていない別の行を取得できます。

これにより、複数Workerを並列で動かしても、同じメッセージを二重に処理してしまうことがなくなります。

OutboxProcessor(ポーリングワーカー)

Outbox テーブルを監視して、実際にキューに送信する処理を書きます。

import asyncio
import logging

logger = logging.getLogger(__name__)


class OutboxProcessor:
    """Outbox テーブルをポーリングして Queue に送信する"""

    def __init__(
        self,
        queue_sender: "QueueSender",
        poll_interval_seconds: float = 5.0,
        batch_size: int = 100,
        max_retries: int = 3,
    ) -> None:
        self._queue_sender = queue_sender
        self._poll_interval = poll_interval_seconds
        self._batch_size = batch_size
        self._max_retries = max_retries
        self._running = False

    async def start(self) -> None:
        """ポーリングを開始する"""
        self._running = True
        logger.info("OutboxProcessor started")

        while self._running:
            try:
                processed_count = await self._process_batch()
                if processed_count > 0:
                    logger.info(f"Processed {processed_count} outbox messages")
            except Exception as e:
                logger.error(f"Error processing outbox: {e}")

            await asyncio.sleep(self._poll_interval)

    def stop(self) -> None:
        """ポーリングを停止する"""
        self._running = False
        logger.info("OutboxProcessor stopping")

    async def _process_batch(self) -> int:
        """バッチ処理を実行する"""
        with get_session_context() as session:
            repo = OutboxRepository(session)
            messages = repo.find_pending_messages(limit=self._batch_size)

            processed = 0
            for message in messages:
                success = await self._process_message(message, repo)
                if success:
                    processed += 1

            return processed

    async def _process_message(
        self,
        message: OutboxMessage,
        repo: OutboxRepository,
    ) -> bool:
        """1件のメッセージを処理する"""
        try:
            message.mark_processing()
            repo.save(message)

            await self._queue_sender.send(
                event_type=message.event_type,
                payload=message.payload,
            )

            message.mark_completed()
            repo.save(message)
            return True

        except Exception as e:
            logger.error(f"Failed to send message {message.message_id}: {e}")
            message.mark_failed()
            repo.save(message)
            return False

OutboxProcessor は 5 秒ごとに DB をチェックして、「PENDING のメッセージはあるかな?」と確認します。あればキューに送信し、なければ何もせずまた 5 秒待ちます。

Azure Queue Storage は pull 型なので、メッセージを取りに行く側がポーリングする必要があります。このプロジェクトではファイルを Azure Blob Storage に保存しているため、同じAzure Storage アカウント内で使える Queue Storage を採用しました。ファイルパース処理では 5 秒程度の遅延は許容範囲ですし、Service Busのような高度な機能(トピック、サブスクリプション、Dead Letter Queue など)は現時点では不要だったためです。

UseCase での統合

これまでのコードを UseCase に組み込みます。Outbox 経由に変更します。

class CreateSourceFileUseCase:
    """ファイルアップロードの UseCase"""

    def __init__(
        self,
        repository: SourceFileRepository,
        task_repository: TaskRepository,
        outbox_repository: OutboxRepository,  # 追加
    ) -> None:
        self._repository = repository
        self._task_repository = task_repository
        self._outbox_repository = outbox_repository

    def execute(self, ...) -> CreateSourceFileResult:
        # 1. SourceFile を作成
        source_file = SourceFile.create(...)
        self._repository.save(source_file)

        # 2. Task を作成
        task = Task.create(...)
        self._task_repository.save(task)

        # 3. Outbox メッセージを作成(Queue 直接送信の代わり)
        outbox_message = OutboxMessage.create(
            event_type=OutboxEventType.PARSE_TASK,
            payload={
                "task_id": str(task.task_id),
                "source_file_id": str(source_file.source_file_id),
            },
        )
        self._outbox_repository.save(outbox_message)

        # リクエスト終了時に get_session() が全て commit
        return CreateSourceFileResult(source_file=source_file, task=task)

変更点は、 queue_client.send_message() を呼んでいた部分が outbox_repository.save() に変わっただけです。UseCaseのコード自体はほとんど変わっていません。

トランザクション境界の管理

トランザクション管理の仕組みを確認します。FastAPI の Dependency Injection を使ってセッション管理をしています。

from contextlib import contextmanager
from typing import Generator

from sqlmodel import Session, create_engine

engine = create_engine(DATABASE_URL)

def get_session() -> Generator[Session, None, None]:
    """FastAPI Dependency: リクエスト単位でトランザクション管理"""
    with Session(engine) as session:
        try:
            yield session
            session.commit()  # 全て成功したらコミット
        except Exception:
            session.rollback()  # 失敗したらロールバック
            raise


@contextmanager
def get_session_context() -> Generator[Session, None, None]:
    """Worker 用: with 文で使えるコンテキストマネージャー"""
    with Session(engine) as session:
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise

なお、Repository の save() メソッドでは flush() を呼んでいて、 commit() は呼んでいません。この違いは重要です。

flush() は「今までの変更を DB に送信するが、確定はしない」という操作です。DB にデータが送られるので、自動採番された ID を取得できます。でもトランザクションはまだ開いたままです。commit() は「トランザクションを確定する」という操作です。これを呼んで初めて、変更が永続化されます。

UseCase 内では flush() だけを使い、 commit()get_session() に任せています。こうすることで、UseCase の処理がすべて完了してから commit が実行されます。途中で例外が発生したら get_session() が自動的に rollback してくれます。

この仕組みのおかげで、SourceFile, Task, OutboxMessage の 3 つが原子的に保存される形になっています。

Workerの起動

最後にこの仕組みを動かすWorkerについて解説します。ここでは 2 つのポーリングループが動いています。

  1. OutboxProcessor: DB の outbox_messages テーブルをポーリング(5秒間隔)→ Azure Queue Storage に送信
  2. QueueConsumer: Azure Queue Storage をポーリング(1秒間隔)→ パース処理を実行

この 2 つを 1 つの Worker プロセス内で並列に動かすために、WorkerManager を作成しています。

class WorkerManager:
    """OutboxProcessor と QueueConsumer のライフサイクル管理"""

    def __init__(self) -> None:
        self._outbox_processor: OutboxProcessor | None = None
        self._queue_consumer: QueueConsumer | None = None

    async def start(self) -> None:
        """両方を並列で起動する"""
        # Outbox → Queue
        queue_sender = AzureQueueSender()
        self._outbox_processor = OutboxProcessor(
            queue_sender=queue_sender,
            poll_interval_seconds=5.0,
            batch_size=100,
        )

        # Queue → Parse Handler
        self._queue_consumer = QueueConsumer(
            handler=parse_file_handler,
            poll_interval=1.0,
        )

        logger.info("Starting OutboxProcessor and QueueConsumer...")

        await asyncio.gather(
            self._outbox_processor.start(),
            self._queue_consumer.run(),
        )

    def shutdown(self) -> None:
        """グレースフルシャットダウン"""
        logger.info("Initiating graceful shutdown...")
        if self._outbox_processor:
            self._outbox_processor.stop()
        if self._queue_consumer:
            self._queue_consumer.stop()

asyncio.gather() で 2 つのポーリングループを並列実行しています。OutboxProcessor は 5 秒ごとに DB をチェックして新しいメッセージを キュー に送信し、QueueConsumer は 1 秒ごとに キュー をチェックしてパース処理を実行します。

ポーリング間隔が異なるのは役割の違いによります。OutboxProcessor は「メッセージを キュー に送る」だけなので多少遅延があっても問題ありません。一方 QueueConsumer は「実際の処理を実行する」ので、Queue にメッセージが入ったらなるべく早く取得したいため間隔を短くしています。

なお、OutboxProcessor と QueueConsumer を別プロセスに分けることも可能です。今回は 1 つの Worker プロセスにまとめましたが、スケールが必要になれば分離して別々にすることも可能です。

まとめ

Outbox パターンを導入したことで、「キュー 送信成功 → DB コミット失敗」による不整合の問題を解決できました。

Outbox パターンではDB と キューの整合性が保証されます。業務データと送信予定メッセージが同じトランザクションで保存されるため、どちらかだけが欠ける状態にはなりません。既存のトランザクション管理の仕組みにそのまま乗っかれるので、導入コストも低めです。また、送信に失敗したメッセージは DB に残っているので、リトライ処理も簡単に実装できます。

今回の実装ではポーリング方式となっているため、最大 5 秒程度の遅延が発生します。リアルタイム性が必要な場合は、CDC(Change Data Capture)や PostgreSQL の NOTIFY 機能を使えばほぼリアルタイムで検知できます。これはポーリング方式を選んだことによる制約であり、Outbox パターン自体の制約ではありません。

一方、Outbox テーブルにレコードが溜まっていく点は Outbox パターン自体の特性です。DB にメッセージを書き込む以上、COMPLETED のレコードは増え続けるため、定期的なクリーンアップ処理やパーティショニングが必要になります。

また、「キューの送信に失敗しても大きな問題にならない」ケースでは、直接送信でも構いません。たとえば通知メールの送信など、多少欠けても致命的ではない場合などです。

一方、整合性が崩れたときの影響を減らしたい場合は Outbox パターンが有効です。今回のファイルパース処理でも、不整合が起きれば Worker が「Task not found」で失敗するだけでシステムが壊れるわけではありません。ただ、ユーザーに再アップロードを強いることになるので、できれば避けたいという判断で導入しました。

Shareこのエントリーをはてなブックマークに追加