Entity Store¶
Identity map and caching for Stash entities.
StashEntityStore
¶
StashEntityStore(
client: StashClient,
default_ttl: timedelta | int | None = DEFAULT_TTL,
)
In-memory identity map with read-through caching for Stash entities.
Provides caching, selective field loading, and query capabilities for Stash GraphQL entities. All fetched entities are cached, and subsequent requests for the same entity return the cached version (if not expired).
All entities can be treated as "stubs" that may have incomplete data. Use populate() to selectively load additional fields as needed, avoiding expensive queries for data you don't need.
Example
async with StashContext(conn=...) as context:
client = context.client
store = context.store # Use context's singleton store
# Get by ID (cache miss -> fetch, then cached)
performer = await store.get(Performer, "123")
# Selectively load expensive fields only when needed
# Uses _received_fields to determine what's actually missing
performer = await store.populate(performer, fields=["scenes", "images"])
# Search (always queries GraphQL, caches results)
scenes = await store.find(Scene, title__contains="interview")
# Populate relationships on search results
for scene in scenes:
scene = await store.populate(scene, fields=["performers", "studio", "tags"])
# Populate nested objects directly (identity map pattern)
scene.studio = await store.populate(scene.studio, fields=["urls", "details"])
# Check what's missing before fetching
missing = store.missing_fields(scene.studio, "urls", "details")
if missing:
scene.studio = await store.populate(scene.studio, fields=list(missing))
# Force refresh from server (invalidates cache first)
scene = await store.populate(scene, fields=["studio"], force_refetch=True)
# Large result sets: lazy pagination
async for scene in store.find_iter(Scene, path__contains="/media/"):
process(scene)
if done:
break # Won't fetch remaining batches
# Query cached objects only (no network)
favorites = store.filter(Performer, lambda p: p.favorite)
Initialize entity store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
StashClient
|
StashClient instance for GraphQL queries |
required |
default_ttl
|
timedelta | int | None
|
Default TTL for cached entities. Default is 30 minutes. Can be a timedelta, or an int (interpreted as seconds). Pass None explicitly to disable expiration. |
DEFAULT_TTL
|
Attributes¶
cache_size
property
¶
Get number of entities in cache (deprecated, use cache_stats) (thread-safe).
Returns:
| Type | Description |
|---|---|
int
|
Number of cached entities |
Functions¶
get_cached
¶
Get entity from cache only. No network call (thread-safe).
Returns the cached entity if present and not expired, None otherwise.
This is the sync counterpart to :meth:get, following the Django
dual sync/async pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type (e.g., Performer, Scene) |
required |
entity_id
|
str
|
Entity ID |
required |
Returns:
| Type | Description |
|---|---|
T | None
|
Cached entity if found and not expired, None otherwise |
get
async
¶
Get entity by ID. Checks cache first, fetches if missing/expired (thread-safe).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type (e.g., Performer, Scene) |
required |
entity_id
|
str
|
Entity ID |
required |
fields
|
list[str] | None
|
Optional list of additional fields to fetch beyond base fragment. If provided, bypasses cache and fetches directly with specified fields. |
None
|
Returns:
| Type | Description |
|---|---|
T | None
|
Entity if found, None otherwise |
get_many
async
¶
Batch get entities. Returns cached + fetches missing in single query (thread-safe).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
ids
|
list[str]
|
List of entity IDs |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
List of found entities (order not guaranteed) |
find
async
¶
Search using Stash filters. Results cached. Max 1000 results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
**filters
|
Any
|
Search filters (Django-style kwargs or raw dict) |
{}
|
Returns:
| Type | Description |
|---|---|
list[T]
|
List of matching entities |
Raises:
| Type | Description |
|---|---|
ValueError
|
If result count exceeds FIND_LIMIT. Use find_iter() instead. |
Filter syntax
Django-style kwargs¶
find(Scene, title="exact") # EQUALS find(Scene, title__contains="partial") # INCLUDES find(Scene, title__regex=r"S\d+") # MATCHES_REGEX find(Scene, rating100__gte=80) # GREATER_THAN find(Scene, rating100__between=(60,90)) # BETWEEN find(Scene, studio__null=True) # IS_NULL
Raw dict for complex cases¶
find(Scene, title={"value": "x", "modifier": "NOT_EQUALS"})
Nested filters¶
find(Scene, performers_filter={"name": {"value": "Jane", "modifier": "EQUALS"}})
find_one
async
¶
Search returning first match. Result cached.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
**filters
|
Any
|
Search filters (same syntax as find()) |
{}
|
Returns:
| Type | Description |
|---|---|
T | None
|
First matching entity, or None if no matches |
find_iter
async
¶
find_iter(
entity_type: type[T],
query_batch: int = DEFAULT_QUERY_BATCH,
**filters: Any,
) -> AsyncIterator[T]
Lazy search yielding individual items. Batches queries internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
Type to search for |
required |
query_batch
|
int
|
Records to fetch per GraphQL query (default: 40) |
DEFAULT_QUERY_BATCH
|
**filters
|
Any
|
Search filters (same syntax as find()) |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Individual entities as they are fetched |
Example
async for scene in store.find_iter(Scene, path__contains="/media/"): process(scene) if done: break # Won't fetch remaining batches
filter
¶
Filter cached objects with Python lambda. No network call (thread-safe).
Uses the type index for O(k) iteration over only the requested type, instead of snapshotting the entire cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
predicate
|
Callable[[T], bool]
|
Function that returns True for matching entities |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
List of matching cached entities |
all_cached
¶
Get all cached objects of a type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
List of all cached entities of the specified type |
filter_strict
¶
filter_strict(
entity_type: type[T],
required_fields: set[str] | list[str],
predicate: Callable[[T], bool],
) -> list[T]
Filter cached objects, raising error if required fields are missing.
This is a fail-fast version of filter() that ensures all cached objects have the required fields populated before applying the predicate. If any cached object is missing required fields, raises ValueError immediately.
Supports nested field specifications using Django-style double-underscore syntax:
- 'files__path': Validates that files relationship exists AND path is populated on each File
- 'studio__parent__name': Validates full nested path is populated
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
required_fields
|
set[str] | list[str]
|
Fields that must be populated on all cached objects. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name'). |
required |
predicate
|
Callable[[T], bool]
|
Function that returns True for matching entities |
required |
Returns:
| Type | Description |
|---|---|
list[T]
|
List of matching cached entities (all guaranteed to have required fields) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If any cached object is missing required fields |
Examples:
This will raise if any performer has rating100=UNSET¶
high_rated = store.filter_strict( Performer, required_fields=['rating100', 'favorite'], predicate=lambda p: p.rating100 >= 80 and p.favorite )
Validate nested fields are populated¶
large_images = store.filter_strict( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any( f.size > 10_000_000 for f in i.files if f.size is not None ) )
↑ Raises ValueError if any Image has files=UNSET or any File has path/size=UNSET¶
filter_and_populate
async
¶
filter_and_populate(
entity_type: type[T],
required_fields: set[str] | list[str],
predicate: Callable[[T], bool],
batch_size: int = 50,
) -> list[T]
Filter cached objects, auto-populating missing fields as needed.
This is a smart hybrid between find() and filter(): - Gets all cached objects of the type - Identifies which ones have UNSET values for required_fields - Fetches only the missing fields for incomplete objects (in batches) - Applies the predicate to all objects (now with complete data)
Supports nested field specifications using Django-style double-underscore syntax:
- 'files__path': Ensures files relationship is populated, then path on each File
- 'studio__parent__name': Ensures studio, parent, and name are all populated
This is much faster than find() when most data is already cached, since it only fetches the specific missing fields rather than re-fetching entire entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
required_fields
|
set[str] | list[str]
|
Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name'). |
required |
predicate
|
Callable[[T], bool]
|
Function that returns True for matching entities |
required |
batch_size
|
int
|
Number of entities to populate concurrently (default: 50) |
50
|
Returns:
| Type | Description |
|---|---|
list[T]
|
List of matching entities (all with required fields populated) |
Examples:
Cache has 1000 performers, but only 500 have rating100 loaded¶
high_rated = await store.filter_and_populate( Performer, required_fields=['rating100', 'favorite'], predicate=lambda p: p.rating100 >= 80 and p.favorite )
↓ Fetches rating100+favorite for the 500 that don't have it¶
↓ Then filters all 1000 with complete data¶
↓ Network calls: Only for missing data (much faster than find())¶
Filter images by nested file properties¶
large_images = await store.filter_and_populate( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any( f.size > 10_000_000 for f in i.files if f.size is not None ) )
↓ Fetches files relationship + path/size fields on each File object¶
filter_and_populate_with_stats
async
¶
filter_and_populate_with_stats(
entity_type: type[T],
required_fields: set[str] | list[str],
predicate: Callable[[T], bool],
batch_size: int = 50,
) -> tuple[list[T], dict[str, Any]]
Filter and populate with debug statistics.
Same as filter_and_populate() but returns detailed statistics about what was fetched and filtered. Useful for debugging and optimization.
Supports nested field specifications using Django-style double-underscore syntax.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
required_fields
|
set[str] | list[str]
|
Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name'). |
required |
predicate
|
Callable[[T], bool]
|
Function that returns True for matching entities |
required |
batch_size
|
int
|
Number of entities to populate concurrently |
50
|
Returns:
| Type | Description |
|---|---|
list[T]
|
Tuple of (matching_entities, stats_dict) where stats contains: |
dict[str, Any]
|
|
tuple[list[T], dict[str, Any]]
|
|
tuple[list[T], dict[str, Any]]
|
|
tuple[list[T], dict[str, Any]]
|
|
tuple[list[T], dict[str, Any]]
|
|
Examples:
results, stats = await store.filter_and_populate_with_stats( Performer, required_fields=['rating100'], predicate=lambda p: p.rating100 >= 80 ) print(f"Populated {stats['needed_population']} of {stats['total_cached']}") print(f"Cache hit rate: {stats['cache_hit_rate']:.1%}") print(f"Found {stats['matches']} matches")
With nested fields¶
results, stats = await store.filter_and_populate_with_stats( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any(f.size > 10_000_000 for f in i.files) )
populated_filter_iter
async
¶
populated_filter_iter(
entity_type: type[T],
required_fields: set[str] | list[str],
predicate: Callable[[T], bool],
populate_batch: int = 50,
yield_batch: int = 10,
) -> AsyncIterator[T]
Lazy filter with auto-population, yielding results incrementally.
Like filter_and_populate() but yields results as they become available instead of waiting for all entities to be processed. Useful for large datasets where you want to start processing matches immediately.
Supports nested field specifications using Django-style double-underscore syntax.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
required_fields
|
set[str] | list[str]
|
Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name'). |
required |
predicate
|
Callable[[T], bool]
|
Function that returns True for matching entities |
required |
populate_batch
|
int
|
How many entities to populate concurrently |
50
|
yield_batch
|
int
|
Process this many entities before yielding matches |
10
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
Individual matching entities (with required fields populated) |
Examples:
Process large dataset incrementally¶
async for performer in store.populated_filter_iter( Performer, required_fields=['rating100', 'scenes'], predicate=lambda p: p.rating100 >= 90 and len(p.scenes) > 100 ): # Start processing immediately as matches are found await expensive_operation(performer) if should_stop: break # Can stop early without processing all
With nested fields¶
async for image in store.populated_filter_iter( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any(f.size > 10_000_000 for f in i.files) ): await process_large_image(image)
missing_fields_nested
¶
missing_fields_nested(
obj: StashObject, *field_specs: str
) -> set[str]
Check which nested field specifications are missing.
Supports both simple and nested (Django-style) field specifications: - Simple: 'rating100', 'favorite' - Nested: 'files__path', 'studio__parent__name'
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Entity to check |
required |
*field_specs
|
str
|
Field specifications to check |
()
|
Returns:
| Type | Description |
|---|---|
set[str]
|
Set of field specifications that are NOT fully populated |
populate
async
¶
Populate specific fields on an entity using field-aware fetching.
This method uses _received_fields tracking to determine which fields are
genuinely missing and need to be fetched. All entities are treated as potentially
incomplete.
Supports nested field specifications using Django-style double-underscore syntax:
- 'files__path': Populate files relationship, then path on each File
- 'studio__parent__name': Populate studio, then parent, then name
- Can be mixed with regular fields: ['rating100', 'files__path']
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
T
|
Entity to populate. Can be any StashObject, including nested objects like scene.studio or scene.performers[0]. |
required |
fields
|
list[str] | set[str] | None
|
Fields to populate. Supports both regular field names ('studio') and nested field specifications ('studio__parent__name'). If None and force_refetch=False, uses heuristics to determine if object needs more data. |
None
|
force_refetch
|
bool
|
If True, invalidates cache and re-fetches the specified fields from the server, regardless of whether they're in _received_fields. |
False
|
Returns:
| Type | Description |
|---|---|
T
|
The populated entity (may be a different instance if refetched from cache). |
Examples:
Populate specific fields on a scene¶
scene = await store.populate(scene, fields=["studio", "performers"])
Populate nested fields using __ syntax¶
image = await store.populate(image, fields=["files__path", "files__size"])
Mix regular and nested fields¶
scene = await store.populate( scene, fields=["rating100", "studio__parent__name"] )
Populate nested object directly (identity map pattern)¶
scene.studio = await store.populate(scene.studio, fields=["urls", "details"])
Force refresh from server (invalidates cache first)¶
scene = await store.populate(scene, fields=["studio"], force_refetch=True)
Populate performer from a list¶
performer = await store.populate( scene.performers[0], fields=["scenes", "images"] )
Check what's missing before populating¶
missing = store.missing_fields(scene.studio, "urls", "details", "aliases") if missing: scene.studio = await store.populate(scene.studio, fields=list(missing))
has_fields
¶
has_fields(obj: StashObject, *fields: str) -> bool
Check if an object has specific fields populated.
Uses _received_fields tracking when available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Entity to check |
required |
*fields
|
str
|
Field names to check for |
()
|
Returns:
| Type | Description |
|---|---|
bool
|
True if ALL specified fields are in _received_fields |
missing_fields
¶
missing_fields(obj: StashObject, *fields: str) -> set[str]
Get which of the specified fields are missing from an object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Entity to check |
required |
*fields
|
str
|
Field names to check |
()
|
Returns:
| Type | Description |
|---|---|
set[str]
|
Set of field names that are NOT in _received_fields |
invalidate
¶
invalidate(entity_type_or_obj: StashObject) -> None
invalidate(
entity_type_or_obj: type[T] | StashObject,
entity_id: str | None = None,
) -> None
Remove specific object from cache (thread-safe).
Can be called with either a type + ID pair, or a StashObject instance directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type_or_obj
|
type[T] | StashObject
|
The Stash entity type, or a StashObject instance |
required |
entity_id
|
str | None
|
Entity ID to invalidate (required when passing a type) |
None
|
Examples:
invalidate_type
¶
Remove all objects of a type from cache (thread-safe).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type to clear |
required |
set_ttl
¶
Set TTL for a type. None = use default (or never expire if no default).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
ttl
|
timedelta | int | None
|
TTL for this type, or None to use default. Can be a timedelta, or an int (interpreted as seconds). |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If ttl is not a timedelta, int, or None |
add
¶
add(obj: StashObject) -> None
Add object to cache (for new objects with temp UUIDs).
This is typically used with objects created via ClassName.new() that have temporary UUID IDs. After calling obj.save() or store.save(), the cache entry will be updated with the real ID from Stash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Object to cache (usually created with .new()) |
required |
save
async
¶
save(obj: StashObject, _cascade_depth: int = 0) -> None
Save object to Stash and update cache.
Handles both new objects (create) and existing objects (update). For new objects, updates cache key from temp UUID to real Stash ID.
Automatically cascades saves for unsaved related objects (with warning). Preferred pattern: explicitly save related objects before parent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Object to save |
required |
_cascade_depth
|
int
|
Internal tracking for cascade recursion depth |
0
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If save fails or object has unsaved UUIDs after cascade |
Example
# Create and save new tag
tag = Tag.new(name="Action")
store.add(tag) # Cache with temp UUID
await store.save(tag) # Save to Stash, update cache with real ID
# Modify existing tag
tag.description = "Action movies"
await store.save(tag) # Update in Stash
# With related objects (auto-cascade with warning)
scene.performers.append(new_performer) # new_performer has UUID
await store.save(scene) # Warns, cascades save(new_performer), then saves scene
# Preferred pattern: explicit saves
await store.save(new_performer) # Gets real ID
scene.performers.append(new_performer) # Has real ID
await store.save(scene) # No cascade needed
delete
async
¶
delete(obj: StashObject, **kwargs: Any) -> bool
Delete object from Stash and remove from cache.
Delegates to obj.delete(client) then invalidates the cache entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
StashObject
|
Entity to delete |
required |
**kwargs
|
Any
|
Additional destroy input fields (e.g., delete_file=True) |
{}
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successfully deleted |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the entity type doesn't support delete |
ValueError
|
If the entity has no server ID or delete fails |
Examples:
get_or_create
async
¶
Get entity by search criteria, optionally create if not found.
Searches for an entity matching the provided criteria. If found, returns the existing entity (from cache or fetched). If not found and create_if_missing is True, creates a new entity with the search params as initial data.
Note: New entities are created with UUID IDs and are NOT automatically saved. Call store.save() or entity.save() to persist to Stash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
create_if_missing
|
bool
|
If True, creates new entity if not found. Default: True. |
True
|
**search_params
|
Any
|
Search criteria (also used as creation data if not found) |
{}
|
Returns:
| Type | Description |
|---|---|
T
|
Existing or newly created entity |
Raises:
| Type | Description |
|---|---|
ValueError
|
If not found and create_if_missing=False |
Example
# Get existing or create new performer
performer = await store.get_or_create(Performer, name="Alice")
if performer._is_new:
# New performer - save it
await store.save(performer)
# Link to scene
scene.performers.append(performer)
await store.save(scene)
# Get existing, error if not found
tag = await store.get_or_create(Tag, create_if_missing=False, name="Action")
is_cached
¶
Check if object is in cache and not expired (thread-safe).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
type[T]
|
The Stash entity type |
required |
entity_id
|
str
|
Entity ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cached and not expired |
cache_stats
¶
Get cache statistics (thread-safe).
Returns:
| Type | Description |
|---|---|
CacheStats
|
CacheStats with total entries, by-type counts, and expired count |