I was jolted awake by an alert at 2 AM. Our production customer-service bot had suddenly developed the memory of a goldfish—right after a user provided their order number, the bot would ask again, “What’s your order number?” in the very next turn. Users were furious, and orders got stuck at confirmation. I spent the next hour combing through LangChain’s memory module logs until 3 AM, when I finally uncovered the root cause: under specific concurrency scenarios, the memory store key was being silently overwritten, causing context to vanish. The worst part? If only we had an automated test suite for memory integrity, this bug would never have exploded in production.
Breaking Down the Problem
Our application is an order-assistant agent that helps users fill in order details (order number, shipping address, promo code, etc.) over multiple turns. The agent is powered by an LLM, and conversation context is persisted using a ConversationBufferMemory that stores messages in Redis, keyed by session ID. On the surface, everything worked smoothly: after each turn, we appended the Human and AI messages to the memory; on the next request, we loaded them from Redis and stitched them into the prompt.
The problem, however, was precisely in that “append” operation. Our workers run as multiple processes. When requests belonging to the same session got routed to different workers, a classic read-modify-write race condition occurred: Process A reads the current memory list → Process B reads the same list → A appends a message and writes back → B appends a message and writes back, but B’s write overwrites A’s append, causing A’s entire turn to silently disappear. Conventional solutions like distributed locks could solve this, but you have to get lock granularity, timeouts, and retry strategies just right, or you’ll introduce new performance bottlenecks. And the worst part? How do you know that the lock logic you write today won’t time out tomorrow because the LLM takes longer to generate a response? Without a repeatable way to verify, every change feels like walking a tightrope.
Solution Design
I decided to abandon the “fix it later” mindset and instead build a contract test suite for the memory store—automated tests that simulate multi-turn conversations and verify memory integrity and idempotency. These tests don’t measure LLM response quality; they validate the memory module itself: given a sequence of dialogue inputs, after storing and retrieving them, assert that no critical information is lost.
Why not use LangChain’s built-in memory testing utilities? Because LangChain’s memory module only provides basic unit-test examples, and they don’t cover real production issues like concurrent writes, key collisions, or serialization round-trips. What I needed was a test suite that could be dropped directly into CI: based on pytest, using fakeredis to simulate Redis, and leveraging coroutines to mimic concurrency. The cost is nearly zero, yet it catches 90% of memory-loss bugs. Architecturally, the tests are organized into three layers:
- Single-threaded sequential correctness — basic functionality: can we store and retrieve memory correctly?
- Concurrency safety — simulate multiple writers appending simultaneously and assert the final memory is intact.
- Edge cases — key expiration, extremely long messages, special-character serialization, etc.
This way, any change to the memory store (e.g., switching from Redis to Postgres, or changing the serialization format) must pass these three gates first.
Core Implementation
Memory Store Interface (abstracting away storage details)
This snippet defines a storage-agnostic memory interface so tests can be written against the interface rather than being coupled to Redis. I designed two core methods, save_context and load_memory, and also exposed an asynchronous asave_context for concurrency testing.
from abc import ABC, abstractmethod
from typing import Dict, List
class BaseMemoryStore(ABC):
@abstractmethod
def save_context(self, session_id: str, human_msg: str, ai_msg: str) -> None:
"""保存一轮对话上下文"""
...
@abstractmethod
def load_memory(self, session_id: str) -> List[Dict[str, str]]:
"""返回 [{'human': ..., 'ai': ...}, ...]"""
...
# 异步版本供并发测试使用
async def asave_context(self, session_id: str, human_msg: str, ai_msg: str) -> None:
# 默认用同步实现,子类可覆盖
self.save_context(session_id, human_msg, ai_msg)
Redis-based Production Implementation (with atomic append)
This is the actual Redis implementation we use in production. It uses Redis List’s RPUSH for atomic appends, avoiding the read-modify-write race. Just be careful with serialization—you must be able to deserialize back to the original structure without loss. Here I use json.dumps and make sure the key doesn’t expire prematurely.
import json
import redis
from typing import List, Dict
class RedisMemoryStore(BaseMemoryStore):
def __init__(self, redis_client: redis.Redis, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def _key(self, session_id: str) -> str:
return f"mem:{session_id}"
def save_context(self, session_id: str, human_msg: str, ai_msg: str) -> None:
key = self._key(session_id)
item = json.dumps({"human": human_msg, "ai": ai_msg})
# 原子追加,避免 read-modify-write 竞态
self.redis.rpush(key, item)
self.redis.expire(key, self.ttl)
def load_memory(self, session_id: str) -> List[Dict[str, str]]:
key = self._key(session_id)
items = self.redis.lrange(key, 0, -1)
return [json.loads(i) for i in items]
Automated Test Suite (concurrency is the key)
To test concurrency, I simply use asyncio to spawn multiple tasks writing to the same session_id concurrently, then assert that the final stored messages contain exactly the expected set. Each message carries a unique sequence number so it’s easy to spot which one was lost.
im
Top comments (0)