class Memo::Service

Overview

Main service class for semantic search operations

Encapsulates configuration and provides clean API for indexing and search.

Usage

# Initialize service with data directory
memo = Memo::Service.new(
  data_dir: "/var/data/memo",
  provider: "openai",
  api_key: ENV["OPENAI_API_KEY"]
)

# Index documents
memo.index(source_type: "event", source_id: 123, text: "Document text...")

# Search
results = memo.search(query: "search query", limit: 10)

# Clean up
memo.close

Database Files

Memo stores data in the specified directory:

Defined in:

memo/service.cr

Constant Summary

TEXT_SCHEMA = "text_store"

Schema name for ATTACHed text database

Constructors

Instance Method Summary

Constructor Detail

def self.new(data_dir : String, provider : String, api_key : String | Nil = nil, model : String | Nil = nil, dimensions : Int32 | Nil = nil, max_tokens : Int32 | Nil = nil, chunking_max_tokens : Int32 = 2000, store_text : Bool = true, attach : Hash(String, String) | Nil = nil, batch_size : Int32 = 100, max_retries : Int32 = 3) #

Initialize service with data directory

Required:

  • data_dir: Directory path for database files
  • provider: "openai" or "mock"
  • api_key: Provider API key (not needed for mock)

Optional:

  • store_text: Enable text storage in text.db (default true)
  • attach: Hash of alias => path for databases to ATTACH
  • model: Embedding model (default depends on provider)
  • dimensions: Vector dimensions (auto-detected from model)
  • max_tokens: Provider token limit (auto-detected)
  • chunking_max_tokens: Max tokens per chunk (default 2000)

Example with ATTACH for unified queries:

memo = Memo::Service.new(
  data_dir: "/var/data/memo",
  attach: {"main" => "data.db"},
  provider: "openai",
  api_key: key
)
# Now can use sql_where: "c.source_id IN (SELECT id FROM main.artifact ...)"

[View source]
def self.new(db : DB::Database, provider : String, api_key : String | Nil = nil, model : String | Nil = nil, dimensions : Int32 | Nil = nil, max_tokens : Int32 | Nil = nil, chunking_max_tokens : Int32 = 2000, batch_size : Int32 = 100, max_retries : Int32 = 3) #

Initialize service with existing database connection

Use this when caller manages the connection lifecycle. Caller is responsible for closing the connection.


[View source]

Instance Method Detail

def batch_size : Int32 #

[View source]
def chunking_config : Config::Chunking #

[View source]
def clear_completed_queue : Int32 #

Clear completed items from the queue

Removes successfully processed items (status = 0). Returns number of items removed.


[View source]
def clear_queue : Int32 #

Clear all items from the queue

Removes all items regardless of status. Returns number of items removed.


[View source]
def close #

Close database connection

Should be called when done with service to free resources. Safe to call multiple times.

Note: If service was initialized with an existing db connection, close is a no-op (caller owns the connection).


[View source]
def data_dir : String | Nil #

[View source]
def db : DB::Database #

[View source]
def delete(source_id : Int64, source_type : String | Nil = nil) : Int32 #

Delete all chunks for a source

Removes all chunks with the given source_id (and optionally source_type). Orphaned embeddings (not referenced by any chunk) are also cleaned up.

Returns number of chunks deleted.

source_type: Optional filter to only delete chunks with matching source_type. If nil, deletes all chunks with the given source_id regardless of type.

TODO Consider adding delete_batch(source_ids : Array(Int64)) if bulk deletion becomes a common use case. Unlike index_batch, there's no API call savings, but it could reduce transaction overhead for large deletions.


[View source]
def dimensions : Int32 #

[View source]
def enqueue(source_type : String, source_id : Int64, text : String, pair_id : Int64 | Nil = nil, parent_id : Int64 | Nil = nil) #

Enqueue a document for later embedding

Adds the document to the embed_queue table without embedding it. Use process_queue to embed queued items.

If the source is already in the queue, the text is updated.


[View source]
def enqueue(doc : Document) #

Enqueue a document (Document overload)


[View source]
def enqueue_batch(docs : Array(Document)) #

Enqueue multiple documents for later embedding

More efficient than calling enqueue() multiple times.


[View source]
def index(source_type : String, source_id : Int64, text : String, pair_id : Int64 | Nil = nil, parent_id : Int64 | Nil = nil) : Int32 #

Index a document

Enqueues the document and processes it immediately with retry support. Returns number of chunks successfully stored.


[View source]
def index(doc : Document) : Int32 #

Index a document (Document overload)

Convenience method that accepts a Document struct.


[View source]
def index_batch(docs : Array(Document)) : Int32 #

Index multiple documents in a batch

Enqueues all documents and processes them with retry support. More efficient than calling index() multiple times.

Returns total number of documents successfully processed.


[View source]
def mark_as_read(chunk_ids : Array(Int64)) #

Mark chunks as read (increment read_count)


[View source]
def process_queue : Int32 #

Process queued items

Embeds pending items from the queue using the service's batch_size. Returns number of items successfully processed.

Failed items have their status set to the error code and can be retried up to max_retries times.


[View source]
def process_queue_async #

Process queued items asynchronously

Spawns a fiber to process the queue and returns immediately. Use queue_stats to check progress.


[View source]
def process_queue_item(source_type : String, source_id : Int64) : Int32 #

Process a specific queued item

Used by index() for immediate processing with retry support. Returns number of chunks stored.


[View source]
def projection_vectors : Array(Array(Float64)) #

[View source]
def provider : Providers::Base #

[View source]
def queue_config : Config::Queue #

[View source]
def queue_stats : QueueStats #

Get queue statistics

Returns counts of pending and failed items.


[View source]
def reindex(source_type : String) : Int32 #

Re-index all content of a given source type

Deletes existing embeddings and queues text for re-embedding. Requires text storage to be enabled.

Returns number of items queued for re-indexing.


[View source]
def reindex(source_type : String, &block : Int64 -> String) : Int32 #

Re-index all content of a given source type using a block to fetch text

Use this when text storage is disabled. The block receives each source_id and should return the text to embed.

Returns number of items queued for re-indexing.

Example:

memo.reindex("article") do |source_id|
  app.get_article_text(source_id)
end
memo.process_queue

[View source]
def search(query : String, limit : Int32 = 10, min_score : Float64 = 0.7, source_type : String | Nil = nil, source_id : Int64 | Nil = nil, pair_id : Int64 | Nil = nil, parent_id : Int64 | Nil = nil, like : String | Array(String) | Nil = nil, match : String | Nil = nil, sql_where : String | Nil = nil, include_text : Bool = false) : Array(Search::Result) #

Search for semantically similar chunks

Automatically generates query embedding and searches.

Returns array of search results ranked by similarity.

like: LIKE pattern(s) to filter by text content. Single string or array of strings for AND filtering. Example: like: "%cats%" or like: ["%cats%", "%dogs%"] Only works when text storage is enabled.

match: FTS5 full-text search query. Supports AND, OR, NOT, prefix*, "phrases". Example: match: "cats OR dogs", match: "quick brown*" Only works when text storage is enabled.

sql_where: Raw SQL fragment for filtering chunks. Used with ATTACH to filter by external database tables. Example: "c.source_id IN (SELECT id FROM main.artifact WHERE kind = 'goal')"

include_text: If true, includes text content in search results. Only works when text storage is enabled.


[View source]
def service_id : Int64 #

[View source]
def stats : Stats #

Get statistics about indexed content

Returns counts of embeddings, chunks, and unique sources.


[View source]
def text_storage? : Bool #

Track whether text storage is enabled


[View source]