This project is a hierarchical long-term memory system built for AI Agents. Its primary functions are:
- Data Ingestion: Receives and processes raw dialogue history.
- Knowledge Construction: Transforms unstructured dialogues into structured Knowledge Graphs (KG) and user personas.
- Unified Retrieval: Provides a unified API to retrieve context information from different memory layers.
The system adopts a layered architecture with clear responsibilities for each layer:
-
L0: Event Memory Layer
- Responsibility: Processes raw dialogue streams into independent, structured Topical Memories. This serves as the raw source of truth for all memories.
- Core Technologies: Elasticsearch (Storage and Retrieval), Redis (Task Scheduling).
-
L1: Knowledge Graph Layer
- Responsibility: Consumes topical memories produced by L0. It uses LLMs to analyze and extract entities and relationships, constructing a structured Knowledge Graph. This layer is responsible for forming long-term, associative knowledge.
- Core Technologies: Neo4j (Graph Storage and Querying), Redis (Task Queues).
-
L2: Unified Retrieval Layer
- Responsibility: Acts as the single entry point for all memory queries. It sends requests concurrently to L0 (Keyword/Vector Retrieval) and L1 (Graph Retrieval), then aggregates, filters, and formats the results.
- Core Technologies: FastAPI, asyncio (Concurrency).
Data Flow Diagram:
// Ingestion Flow
[Raw Dialogue] -> [L0: MemoryServer] -> [ES: Store L0 Memory] -> [Redis: Publish Task] -> [L1: Listener] -> [Neo4j: Build Graph]
// Retrieval Flow
[API Query] -> [L2: Retriever] --Concurrent--> [L0: RagServer(ES)] & [L1: MemoryGraph(Neo4j)] -> [Formatted JSON]
This is the core pipeline for transforming dialogues into knowledge.
-
Trigger: An external service calls the
/L0_memory/trigger_memory_generationAPI (main.py), passinguser_id,role_id, and the latest message timestamp. -
Task Scheduling (L0):
l0_memory/memory_server.pyreceives the request. It first attempts to acquire a Redis Distributed Lock based onuser_idandrole_id(l0_memory/redis_db/state_manager.py) to ensure dialogues for the same user are processed serially.- It checks if the number of new messages since the last processing exceeds the batch threshold (
TOPIC_ANALYSIS_BATCH_SIZE).
-
Asynchronous Processing (L0):
- If the threshold is met,
MemoryServerfetches the dialogue data for the relevant period from the history database (e.g., MySQL/Supabase) and submits it to a ThreadPoolExecutor. - A background thread executes the
process_chunkmethod inl0_memory/core/memory_processor.py.
- If the threshold is met,
-
Memory Generation (L0):
MemoryProcessorcalls the LLM (viallm_interfacefactory) usingTOPIC_SPLIT_PROMPTto segment long dialogues into independent, semantically complete topics.- For each topic, it calls the LLM again using
L0_SUMMARY_GENERATION_PROMPTto generate a structured JSON object containingcore_factandconclusions. - It generates vectors for
core_factandconclusionsusing the client created by theembedding_interfacefactory.
-
Storage & Notification (L0):
- The final L0 memory document (including vectors) is written to Elasticsearch (
l0_memory/utils/elasticsearch_client.py). l0_memory/utils/redis_publisher.pypushes the newly generatedmemory_idinto a user-specific Redis Task Queue (List) and publishes a notification (content:user_session_key) to a global Pub/Sub channel (l1_memory_tasks). This is the hand-off point from L0 to L1.
- The final L0 memory document (including vectors) is written to Elasticsearch (
-
State Update (L0):
- Upon task completion, the
on_task_completedcallback updates the user'slast_processed_created_attimestamp in Redis and releases the distributed lock.
- Upon task completion, the
- Trigger: The API receives a retrieval request, handled by
l2_retrieval/retriever.py (L2Retriever). - Scheduling:
L2Retrievercreates an asynchronous task for L0. - Adapter: This task calls the L0 retrieval service via
l2_retrieval/l0_adapter.py. - Execution:
l0_memory/rag_system/rag_server.pyreceives the request and executes two types of retrieval:- Recency Retrieval (
get_recency_context): Queries ES directly for the latest N L0 memories for the user. - Relevance Retrieval (
get_relevance_context): Callsl0_memory/rag_system/elasticsearch_retriever.pyto perform keyword, vector, or hybrid search to findcore_factandconclusionsmost relevant to the query.
- Recency Retrieval (
settings.py: Global Configuration Center. Defines database connections, LLM/Embedding model configs, API keys, thread pool sizes, RAG parameters, etc.new_prompts.py: Stores all carefully engineered instruction templates (Prompts) for LLMs.
memory_server.py: Main Service Entry for L0. Listens for API requests, manages distributed locks and thread pools, and schedules memory generation tasks.core/memory_processor.py: Core L0 Processor. Encapsulates the pipeline from dialogue chunks to structured L0 memories (Topic Splitting -> Summary Generation -> Vectorization -> Storage -> Task Publishing).memory_generation/processor.py: Encapsulates logic for calling LLMs to perform topic splitting and summary generation.data_processing/loader.py: Utility for loading dialogue history and formatting it into LLM-friendly text.history_db/: History Database Adapters.base.py: Abstract base class.mysql_hdb.py,supabase.py: Concrete implementations for fetching raw dialogues.
embedding_interface/&llm_interface/: Factory Pattern implementations.base.py: Abstract base classes for clients.factory.py: Factories to dynamically create client instances (e.g., volc, openai) based onsettings.py.volc_client.py, etc.: Concrete provider implementations.
rag_system/: L0 Retrieval System.rag_server.py: Entry point offering "Recency" and "Relevance" retrieval interfaces.elasticsearch_retriever.py: Core logic for Keyword, Vector, and Hybrid search (including RRF fusion) on Elasticsearch.
redis_db/state_manager.py: Encapsulates Redis operations for state management (session progress) and distributed locks.utils/: General utilities.elasticsearch_client.py: Low-level interactions with Elasticsearch.redis_publisher.py: Logic for publishing to Redis queues/channels (L0->L1 bridge).logger.py,schema.py,supabase_log_handler.py: Logging, Pydantic models, and structured log reporting.
Key configurations in settings.py related to L0:
-
Models:
LLM_PROVIDER,L0_LLM_CONFIGS: Controls the LLM used for L0 generation and RAG keyword extraction.EMBEDDING_PROVIDER,EMBEDDING_CONFIGS: Controls the Embedding model for L0 vectorization.
-
Database & Services:
ES_CONFIG,ES_INDEX_...: Elasticsearch connection and index names.REDIS_CONFIG,REDIS_KEY_TEMPLATES: Redis connection and key templates. Used for State Management, Distributed Locks, and L1 Task Publishing.HISTORY_DB_PROVIDER: Specifies the source database for raw dialogue history.
-
Workflow Parameters:
TOPIC_ANALYSIS_BATCH_SIZE: Threshold formemory_server.pyto trigger batch processing.MEMORY_SERVER_MAX_WORKERS: Thread pool size controlling L0 concurrency.DEFAULT_PROCESSING_START_TIME: Default start timestamp for new sessions.
-
RAG Parameters:
RAG_RETRIEVAL_MODE: Mode forelasticsearch_retriever.py(hybrid,keyword_only,vector_only).RAG_..._TOP_K: Number of facts/conclusions to retrieve.RAG_..._THRESHOLD: Score thresholds for filtering.RAG_RECENCY_CONTEXT_SIZE: Number of latest L0 memories to return.
This pipeline follows immediately after L0 task publishing.
-
Task Listening (L1):
l1_memory/l1_listener.pyis a long-running background service. It subscribes to thel1_memory_taskschannel via Redis Pub/Sub.- Upon receiving a message (
user_session_key),L1TaskListenersubmits it to a thread pool to avoid blocking the listener loop.
-
Task Processing (L1):
- The background thread acquires an L1-level Redis Distributed Lock for the
user_session_keyto ensure serial graph construction for the user. - It then loops through the user-specific Redis Task Queue (
l1_task_queue:{user_session_key}), popping (LPOP) pendingmemory_ids.
- The background thread acquires an L1-level Redis Distributed Lock for the
-
Data Loading & Graph Construction (L1):
- For each
memory_id,l1_memory/data_loader.pyloads and formats the L0 memory data from Elasticsearch. - The
addmethod inl1_memory/graphs/neo4j.py (MemoryGraph)is called. This is the Core Logic of L1: a. Entity Extraction: Calls LLM via_retrieve_nodes_from_datawithEXTRACT_ENTITIES_PROMPTto extract structured Entity Paths (e.g.,User -> Interest -> Music -> Classical). b. Relation Extraction: Calls LLM via_establish_nodes_relations_from_datawithEXTRACT_RELATIONS_PROMPTto generate context-rich Relationships. c. Conclusion Matching: Callsl1_memory/graphs/value_conclusions.pyusingCONCLUSION_MATCHING_PROMPTto attach high-value L0 conclusions to the most relevant graph relationships. d. Entity Linking/Merging: Before writing,MemoryGraphcomputes vectors for new entities and performs vector similarity search in Neo4j (_search_and_get_node_name). If similar entities exist, an LLM Decision Process (_llm_merge_decision) determines whether to merge, ensuring graph quality. e. Writing to Graph: Constructs a complex Cypher query to write entities, relationships, history, and conclusions atomically into Neo4j.
- For each
-
Knowledge Evolution (L1):
- During writing,
MemoryGraphincrementsmentionscounts on relationships andcommunity_influence_scoreon abstract nodes. - Thresholds (
inference_threshold,summary_threshold) trigger asynchronous updates inl1_memory/graphs/inference.py (InferenceProcessor)andl1_memory/graphs/summary.py (SummaryProcessor).InferenceProcessor: Generates deeper Inferences for relationships incrementally.SummaryProcessor: Generates Community Summaries for abstract nodes and triggers Hierarchical Summary updates for parent nodes (viais_dirtyflags).
- During writing,
-
Task Completion (L1):
- After processing the user's queue, the L1 distributed lock is released. Failed tasks are moved to a Dead Letter Queue.
-
Concurrent Scheduling (L2):
l2_retrieval/retriever.py (L2Retriever)creates a parallel asynchronous task for L1 alongside the L0 task.
-
Adapter & Call (L2 -> L1):
- The task calls L1 retrieval via
l2_retrieval/l1_adapter.py. L1Adaptercallsl1_memory/memory/main.py (Memory).retrieve, which delegates tol1_memory/graphs/graph_retrieval.py (GraphRetriever).
- The task calls L1 retrieval via
-
Graph Retrieval (L1):
extract_knowledgeinGraphRetrieveris the Core L1 Retrieval Logic.- It builds a complex Cypher query utilizing Neo4j's Vector Index (
db.index.vector.queryNodes) to find the most similar nodes for multiple query entities in parallel. - The query fetches node properties, 1-hop neighbor relationships, inferences, history, conclusions, and summaries.
_format_resultsaggregates and deduplicates the raw Neo4j output into a clean, entity-centric dictionary.
-
Integration & Formatting (L2):
L2Retrieverwaits for all L0 and L1 tasks to complete.- It calls
l2_retrieval/formatter.pyto format the Recency/Relevance data from L0 and the Graph Knowledge from L1 into a unified, structured string for upstream consumption.
l1_listener.py: Main Service Entry for L1. A background consumer listening to Redis tasks and processing graph construction via a thread pool.data_loader.py: L0 -> L1 Data Adapter. Loads L0 memories from ES and converts them for L1.graphs/neo4j.py (MemoryGraph): L1 Core Business Logic. Handles graph R/W operations, entity extraction, linking, and database writes.graphs/graph_retrieval.py (GraphRetriever): L1 Core Retrieval Logic. Builds/executes Cypher queries to extract knowledge efficiently.graphs/inference.py&summary.py: Knowledge Evolution. Handle relationship inference and node summary updates.graphs/value_conclusions.py: Matches and attaches L0 high-value conclusions to L1 graph relationships.graphs/tools.py&utils.py: Schema definitions and Prompt templates for L1 function calling.memory/main.py (Memory): L1 Top-Level Facade. Provides a high-level API for external callers (L2), hidingMemoryGraphcomplexity.dirty_summary.py: Background scheduler triggering global, periodic Hierarchical Summary updates viaSummaryProcessor.llms/&embeddings/: Factory implementations for L1's LLM and Embedding clients.
retriever.py (L2Retriever): L2 Orchestrator. Receives requests, schedules concurrent retrieval from L0/L1, and aggregates results.l0_adapter.py&l1_adapter.py: Adapters. Decouple L2 from direct L0/L1 dependencies.formatter.py: Presentation Layer. Formats diverse data structures from L0 and L1 into a clean string.
-
L1 Models & DB:
L1_LLM_CONFIG,L1_EMBEDDER_CONFIG: Models used for graph construction/retrieval.L1_GRAPH_STORE_CONFIG: Neo4j connection details.
-
L1 Workflow Parameters:
L1_LISTENER_MAX_WORKERS: Thread pool size for L1 graph construction.inference_threshold,summary_threshold: Thresholds triggering asynchronous evolution tasks.node_similar_threshold: Similarity threshold for Entity Linking.retrieval_similar_threshold,retrieval_top_k: Thresholds for graph vector retrieval.
Initialize backend storage before running the service.
Execute the following via Kibana Dev Tools or curl. Note: The index name (e.g., l0_topical_memories) must match ES_INDEX_... in config/settings.py.
// 1. L0 Topical Memories Index
// Stores structured L0 memories, supports hybrid retrieval.
PUT /l0_topical_memories
{
"mappings": {
"properties": {
"memory_id": { "type": "keyword" },
"metadata": {
"properties": {
"user_id": { "type": "keyword" },
"role_id": { "type": "keyword" },
"created_at": { "type": "date" }
}
},
"core_fact": { "type": "text" },
"conclusions": { "type": "object", "enabled": false },
"keywords": { "type": "keyword" },
"atomic_units": {
"type": "nested",
"properties": {
"unit_id": { "type": "keyword" },
"unit_type": { "type": "keyword" },
"content": { "type": "text" },
"tag": { "type": "keyword" },
"content_vector": {
"type": "dense_vector",
"dims": 2560 // Note: Match dimensions with Embedding model in settings.py
}
}
}
}
}
}Run the FastAPI service from the project root:
uvicorn main:app --reload --host 0.0.0.0 --port 8000This initializes all background listeners/schedulers and listens on http://0.0.0.0:8000.
- Endpoint:
POST /L0_memory/trigger_memory_generation - Purpose: Triggers the L0 memory generation pipeline.
- Timing: Called after each dialogue turn.
- Body:
{ "user_id": "string", "role_id": int, "created_at": "string (ISO 8601 format, e.g., 2025-05-23T10:00:00.000Z)" }
- Endpoint:
POST /L0_memory/retrieve_rag_context - Purpose: Retrieves context from L0 and L1.
- Timing: Called when the main chat model needs context.
- Body:
{ "user_id": "string", "role_id": int, "query": "string (Natural language query)", "entity_queries": [ // (Optional) Structured queries, prioritized over keywords { "name": "string (entity name)", "retrieve": ["summary", "inference", "history", "conclusions"] // (Optional) } ], "keywords": "string (space-separated, legacy support)" // (Optional) }
# Example: Trigger Memory Generation
curl -X POST "http://127.0.0.1:8000/L0_memory/trigger_memory_generation" \
-H "Content-Type: application/json" \
-d '{
"user_id": "test_user_01",
"role_id": 10001,
"created_at": "2025-08-01T12:00:00.000Z"
}'
# Example: Execute Unified Retrieval
curl -X POST "http://127.0.0.1:8000/L0_memory/retrieve_rag_context" \
-H "Content-Type: application/json" \
-d '{
"user_id": "test_user_01",
"role_id": 10001,
"query": "What music has the user been interested in recently?",
"entity_queries": [
{"name": "Music", "retrieve": ["summary", "history"]},
{"name": "User", "retrieve": []}
]
}'