Skip to content

Entity Store

Identity map and caching for Stash entities.

StashEntityStore

StashEntityStore(
    client: StashClient,
    default_ttl: timedelta | int | None = DEFAULT_TTL,
)

In-memory identity map with read-through caching for Stash entities.

Provides caching, selective field loading, and query capabilities for Stash GraphQL entities. All fetched entities are cached, and subsequent requests for the same entity return the cached version (if not expired).

All entities can be treated as "stubs" that may have incomplete data. Use populate() to selectively load additional fields as needed, avoiding expensive queries for data you don't need.

Example
async with StashContext(conn=...) as context:
    client = context.client
    store = context.store  # Use context's singleton store

    # Get by ID (cache miss -> fetch, then cached)
    performer = await store.get(Performer, "123")

    # Selectively load expensive fields only when needed
    # Uses _received_fields to determine what's actually missing
    performer = await store.populate(performer, fields=["scenes", "images"])

    # Search (always queries GraphQL, caches results)
    scenes = await store.find(Scene, title__contains="interview")

    # Populate relationships on search results
    for scene in scenes:
        scene = await store.populate(scene, fields=["performers", "studio", "tags"])

    # Populate nested objects directly (identity map pattern)
    scene.studio = await store.populate(scene.studio, fields=["urls", "details"])

    # Check what's missing before fetching
    missing = store.missing_fields(scene.studio, "urls", "details")
    if missing:
        scene.studio = await store.populate(scene.studio, fields=list(missing))

    # Force refresh from server (invalidates cache first)
    scene = await store.populate(scene, fields=["studio"], force_refetch=True)

    # Large result sets: lazy pagination
    async for scene in store.find_iter(Scene, path__contains="/media/"):
        process(scene)
        if done:
            break  # Won't fetch remaining batches

    # Query cached objects only (no network)
    favorites = store.filter(Performer, lambda p: p.favorite)

Initialize entity store.

Parameters:

Name Type Description Default
client StashClient

StashClient instance for GraphQL queries

required
default_ttl timedelta | int | None

Default TTL for cached entities. Default is 30 minutes. Can be a timedelta, or an int (interpreted as seconds). Pass None explicitly to disable expiration.

DEFAULT_TTL

Attributes

DEFAULT_QUERY_BATCH class-attribute instance-attribute

DEFAULT_QUERY_BATCH = 40

DEFAULT_TTL class-attribute instance-attribute

DEFAULT_TTL = timedelta(minutes=30)

FIND_LIMIT class-attribute instance-attribute

FIND_LIMIT = 1000

cache_size property

cache_size: int

Get number of entities in cache (deprecated, use cache_stats) (thread-safe).

Returns:

Type Description
int

Number of cached entities

Functions

get_cached

get_cached(
    entity_type: type[T], entity_id: str
) -> T | None

Get entity from cache only. No network call (thread-safe).

Returns the cached entity if present and not expired, None otherwise. This is the sync counterpart to :meth:get, following the Django dual sync/async pattern.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type (e.g., Performer, Scene)

required
entity_id str

Entity ID

required

Returns:

Type Description
T | None

Cached entity if found and not expired, None otherwise

get async

get(
    entity_type: type[T],
    entity_id: str,
    fields: list[str] | None = None,
) -> T | None

Get entity by ID. Checks cache first, fetches if missing/expired (thread-safe).

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type (e.g., Performer, Scene)

required
entity_id str

Entity ID

required
fields list[str] | None

Optional list of additional fields to fetch beyond base fragment. If provided, bypasses cache and fetches directly with specified fields.

None

Returns:

Type Description
T | None

Entity if found, None otherwise

get_many async

get_many(entity_type: type[T], ids: list[str]) -> list[T]

Batch get entities. Returns cached + fetches missing in single query (thread-safe).

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
ids list[str]

List of entity IDs

required

Returns:

Type Description
list[T]

List of found entities (order not guaranteed)

find async

find(entity_type: type[T], **filters: Any) -> list[T]

Search using Stash filters. Results cached. Max 1000 results.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
**filters Any

Search filters (Django-style kwargs or raw dict)

{}

Returns:

Type Description
list[T]

List of matching entities

Raises:

Type Description
ValueError

If result count exceeds FIND_LIMIT. Use find_iter() instead.

Filter syntax
Django-style kwargs

find(Scene, title="exact") # EQUALS find(Scene, title__contains="partial") # INCLUDES find(Scene, title__regex=r"S\d+") # MATCHES_REGEX find(Scene, rating100__gte=80) # GREATER_THAN find(Scene, rating100__between=(60,90)) # BETWEEN find(Scene, studio__null=True) # IS_NULL

Raw dict for complex cases

find(Scene, title={"value": "x", "modifier": "NOT_EQUALS"})

Nested filters

find(Scene, performers_filter={"name": {"value": "Jane", "modifier": "EQUALS"}})

find_one async

find_one(
    entity_type: type[T], **filters: Any
) -> T | None

Search returning first match. Result cached.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
**filters Any

Search filters (same syntax as find())

{}

Returns:

Type Description
T | None

First matching entity, or None if no matches

find_iter async

find_iter(
    entity_type: type[T],
    query_batch: int = DEFAULT_QUERY_BATCH,
    **filters: Any,
) -> AsyncIterator[T]

Lazy search yielding individual items. Batches queries internally.

Parameters:

Name Type Description Default
entity_type type[T]

Type to search for

required
query_batch int

Records to fetch per GraphQL query (default: 40)

DEFAULT_QUERY_BATCH
**filters Any

Search filters (same syntax as find())

{}

Yields:

Type Description
AsyncIterator[T]

Individual entities as they are fetched

Example

async for scene in store.find_iter(Scene, path__contains="/media/"): process(scene) if done: break # Won't fetch remaining batches

filter

filter(
    entity_type: type[T], predicate: Callable[[T], bool]
) -> list[T]

Filter cached objects with Python lambda. No network call (thread-safe).

Uses the type index for O(k) iteration over only the requested type, instead of snapshotting the entire cache.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
predicate Callable[[T], bool]

Function that returns True for matching entities

required

Returns:

Type Description
list[T]

List of matching cached entities

all_cached

all_cached(entity_type: type[T]) -> list[T]

Get all cached objects of a type.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required

Returns:

Type Description
list[T]

List of all cached entities of the specified type

filter_strict

filter_strict(
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
) -> list[T]

Filter cached objects, raising error if required fields are missing.

This is a fail-fast version of filter() that ensures all cached objects have the required fields populated before applying the predicate. If any cached object is missing required fields, raises ValueError immediately.

Supports nested field specifications using Django-style double-underscore syntax: - 'files__path': Validates that files relationship exists AND path is populated on each File - 'studio__parent__name': Validates full nested path is populated

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
required_fields set[str] | list[str]

Fields that must be populated on all cached objects. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name').

required
predicate Callable[[T], bool]

Function that returns True for matching entities

required

Returns:

Type Description
list[T]

List of matching cached entities (all guaranteed to have required fields)

Raises:

Type Description
ValueError

If any cached object is missing required fields

Examples:

This will raise if any performer has rating100=UNSET

high_rated = store.filter_strict( Performer, required_fields=['rating100', 'favorite'], predicate=lambda p: p.rating100 >= 80 and p.favorite )

Validate nested fields are populated

large_images = store.filter_strict( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any( f.size > 10_000_000 for f in i.files if f.size is not None ) )

↑ Raises ValueError if any Image has files=UNSET or any File has path/size=UNSET

filter_and_populate async

filter_and_populate(
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    batch_size: int = 50,
) -> list[T]

Filter cached objects, auto-populating missing fields as needed.

This is a smart hybrid between find() and filter(): - Gets all cached objects of the type - Identifies which ones have UNSET values for required_fields - Fetches only the missing fields for incomplete objects (in batches) - Applies the predicate to all objects (now with complete data)

Supports nested field specifications using Django-style double-underscore syntax: - 'files__path': Ensures files relationship is populated, then path on each File - 'studio__parent__name': Ensures studio, parent, and name are all populated

This is much faster than find() when most data is already cached, since it only fetches the specific missing fields rather than re-fetching entire entities.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
required_fields set[str] | list[str]

Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name').

required
predicate Callable[[T], bool]

Function that returns True for matching entities

required
batch_size int

Number of entities to populate concurrently (default: 50)

50

Returns:

Type Description
list[T]

List of matching entities (all with required fields populated)

Examples:

Cache has 1000 performers, but only 500 have rating100 loaded

high_rated = await store.filter_and_populate( Performer, required_fields=['rating100', 'favorite'], predicate=lambda p: p.rating100 >= 80 and p.favorite )

↓ Fetches rating100+favorite for the 500 that don't have it
↓ Then filters all 1000 with complete data
↓ Network calls: Only for missing data (much faster than find())
Filter images by nested file properties

large_images = await store.filter_and_populate( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any( f.size > 10_000_000 for f in i.files if f.size is not None ) )

↓ Fetches files relationship + path/size fields on each File object

filter_and_populate_with_stats async

filter_and_populate_with_stats(
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    batch_size: int = 50,
) -> tuple[list[T], dict[str, Any]]

Filter and populate with debug statistics.

Same as filter_and_populate() but returns detailed statistics about what was fetched and filtered. Useful for debugging and optimization.

Supports nested field specifications using Django-style double-underscore syntax.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
required_fields set[str] | list[str]

Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name').

required
predicate Callable[[T], bool]

Function that returns True for matching entities

required
batch_size int

Number of entities to populate concurrently

50

Returns:

Type Description
list[T]

Tuple of (matching_entities, stats_dict) where stats contains:

dict[str, Any]
  • total_cached: Total objects in cache
tuple[list[T], dict[str, Any]]
  • needed_population: How many needed fields fetched
tuple[list[T], dict[str, Any]]
  • populated_fields: Which fields were fetched
tuple[list[T], dict[str, Any]]
  • matches: How many matched the predicate
tuple[list[T], dict[str, Any]]
  • cache_hit_rate: Percentage with complete data

Examples:

results, stats = await store.filter_and_populate_with_stats( Performer, required_fields=['rating100'], predicate=lambda p: p.rating100 >= 80 ) print(f"Populated {stats['needed_population']} of {stats['total_cached']}") print(f"Cache hit rate: {stats['cache_hit_rate']:.1%}") print(f"Found {stats['matches']} matches")

With nested fields

results, stats = await store.filter_and_populate_with_stats( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any(f.size > 10_000_000 for f in i.files) )

populated_filter_iter async

populated_filter_iter(
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    populate_batch: int = 50,
    yield_batch: int = 10,
) -> AsyncIterator[T]

Lazy filter with auto-population, yielding results incrementally.

Like filter_and_populate() but yields results as they become available instead of waiting for all entities to be processed. Useful for large datasets where you want to start processing matches immediately.

Supports nested field specifications using Django-style double-underscore syntax.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
required_fields set[str] | list[str]

Fields needed by the predicate. Supports both regular field names ('rating100') and nested field specs ('files__path', 'studio__parent__name').

required
predicate Callable[[T], bool]

Function that returns True for matching entities

required
populate_batch int

How many entities to populate concurrently

50
yield_batch int

Process this many entities before yielding matches

10

Yields:

Type Description
AsyncIterator[T]

Individual matching entities (with required fields populated)

Examples:

Process large dataset incrementally

async for performer in store.populated_filter_iter( Performer, required_fields=['rating100', 'scenes'], predicate=lambda p: p.rating100 >= 90 and len(p.scenes) > 100 ): # Start processing immediately as matches are found await expensive_operation(performer) if should_stop: break # Can stop early without processing all

With nested fields

async for image in store.populated_filter_iter( Image, required_fields=['files__path', 'files__size'], predicate=lambda i: any(f.size > 10_000_000 for f in i.files) ): await process_large_image(image)

missing_fields_nested

missing_fields_nested(
    obj: StashObject, *field_specs: str
) -> set[str]

Check which nested field specifications are missing.

Supports both simple and nested (Django-style) field specifications: - Simple: 'rating100', 'favorite' - Nested: 'files__path', 'studio__parent__name'

Parameters:

Name Type Description Default
obj StashObject

Entity to check

required
*field_specs str

Field specifications to check

()

Returns:

Type Description
set[str]

Set of field specifications that are NOT fully populated

Example
Check if image has files loaded with path field

missing = store.missing_fields_nested( image, 'title', # Simple field 'files__path' # Nested field )

Returns {'files__path'} if files.path is not loaded

populate async

populate(
    obj: T,
    fields: list[str] | set[str] | None = None,
    force_refetch: bool = False,
) -> T

Populate specific fields on an entity using field-aware fetching.

This method uses _received_fields tracking to determine which fields are genuinely missing and need to be fetched. All entities are treated as potentially incomplete.

Supports nested field specifications using Django-style double-underscore syntax: - 'files__path': Populate files relationship, then path on each File - 'studio__parent__name': Populate studio, then parent, then name - Can be mixed with regular fields: ['rating100', 'files__path']

Parameters:

Name Type Description Default
obj T

Entity to populate. Can be any StashObject, including nested objects like scene.studio or scene.performers[0].

required
fields list[str] | set[str] | None

Fields to populate. Supports both regular field names ('studio') and nested field specifications ('studio__parent__name'). If None and force_refetch=False, uses heuristics to determine if object needs more data.

None
force_refetch bool

If True, invalidates cache and re-fetches the specified fields from the server, regardless of whether they're in _received_fields.

False

Returns:

Type Description
T

The populated entity (may be a different instance if refetched from cache).

Examples:

Populate specific fields on a scene

scene = await store.populate(scene, fields=["studio", "performers"])

Populate nested fields using __ syntax

image = await store.populate(image, fields=["files__path", "files__size"])

Mix regular and nested fields

scene = await store.populate( scene, fields=["rating100", "studio__parent__name"] )

Populate nested object directly (identity map pattern)

scene.studio = await store.populate(scene.studio, fields=["urls", "details"])

Force refresh from server (invalidates cache first)

scene = await store.populate(scene, fields=["studio"], force_refetch=True)

Populate performer from a list

performer = await store.populate( scene.performers[0], fields=["scenes", "images"] )

Check what's missing before populating

missing = store.missing_fields(scene.studio, "urls", "details", "aliases") if missing: scene.studio = await store.populate(scene.studio, fields=list(missing))

has_fields

has_fields(obj: StashObject, *fields: str) -> bool

Check if an object has specific fields populated.

Uses _received_fields tracking when available.

Parameters:

Name Type Description Default
obj StashObject

Entity to check

required
*fields str

Field names to check for

()

Returns:

Type Description
bool

True if ALL specified fields are in _received_fields

missing_fields

missing_fields(obj: StashObject, *fields: str) -> set[str]

Get which of the specified fields are missing from an object.

Parameters:

Name Type Description Default
obj StashObject

Entity to check

required
*fields str

Field names to check

()

Returns:

Type Description
set[str]

Set of field names that are NOT in _received_fields

invalidate

invalidate(
    entity_type_or_obj: type[T], entity_id: str
) -> None
invalidate(entity_type_or_obj: StashObject) -> None
invalidate(
    entity_type_or_obj: type[T] | StashObject,
    entity_id: str | None = None,
) -> None

Remove specific object from cache (thread-safe).

Can be called with either a type + ID pair, or a StashObject instance directly.

Parameters:

Name Type Description Default
entity_type_or_obj type[T] | StashObject

The Stash entity type, or a StashObject instance

required
entity_id str | None

Entity ID to invalidate (required when passing a type)

None

Examples:

>>> store.invalidate(Scene, "123")
>>> store.invalidate(scene)  # extracts type and id from instance

invalidate_type

invalidate_type(entity_type: type[T]) -> None

Remove all objects of a type from cache (thread-safe).

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type to clear

required

invalidate_all

invalidate_all() -> None

Clear entire cache (thread-safe).

set_ttl

set_ttl(
    entity_type: type[T], ttl: timedelta | int | None
) -> None

Set TTL for a type. None = use default (or never expire if no default).

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
ttl timedelta | int | None

TTL for this type, or None to use default. Can be a timedelta, or an int (interpreted as seconds).

required

Raises:

Type Description
TypeError

If ttl is not a timedelta, int, or None

add

add(obj: StashObject) -> None

Add object to cache (for new objects with temp UUIDs).

This is typically used with objects created via ClassName.new() that have temporary UUID IDs. After calling obj.save() or store.save(), the cache entry will be updated with the real ID from Stash.

Parameters:

Name Type Description Default
obj StashObject

Object to cache (usually created with .new())

required
Example
# Create new tag with temp UUID
tag = Tag.new(name="Action")

# Cache it
store.add(tag)

# Save to Stash (updates ID and cache)
await store.save(tag)
# OR: await tag.save(client) + manual cache update

save async

save(obj: StashObject, _cascade_depth: int = 0) -> None

Save object to Stash and update cache.

Handles both new objects (create) and existing objects (update). For new objects, updates cache key from temp UUID to real Stash ID.

Automatically cascades saves for unsaved related objects (with warning). Preferred pattern: explicitly save related objects before parent.

Parameters:

Name Type Description Default
obj StashObject

Object to save

required
_cascade_depth int

Internal tracking for cascade recursion depth

0

Raises:

Type Description
ValueError

If save fails or object has unsaved UUIDs after cascade

Example
# Create and save new tag
tag = Tag.new(name="Action")
store.add(tag)  # Cache with temp UUID
await store.save(tag)  # Save to Stash, update cache with real ID

# Modify existing tag
tag.description = "Action movies"
await store.save(tag)  # Update in Stash

# With related objects (auto-cascade with warning)
scene.performers.append(new_performer)  # new_performer has UUID
await store.save(scene)  # Warns, cascades save(new_performer), then saves scene

# Preferred pattern: explicit saves
await store.save(new_performer)  # Gets real ID
scene.performers.append(new_performer)  # Has real ID
await store.save(scene)  # No cascade needed

delete async

delete(obj: StashObject, **kwargs: Any) -> bool

Delete object from Stash and remove from cache.

Delegates to obj.delete(client) then invalidates the cache entry.

Parameters:

Name Type Description Default
obj StashObject

Entity to delete

required
**kwargs Any

Additional destroy input fields (e.g., delete_file=True)

{}

Returns:

Type Description
bool

True if successfully deleted

Raises:

Type Description
NotImplementedError

If the entity type doesn't support delete

ValueError

If the entity has no server ID or delete fails

Examples:

>>> await store.delete(scene)
>>> await store.delete(scene, delete_file=True)

get_or_create async

get_or_create(
    entity_type: type[T],
    create_if_missing: bool = True,
    **search_params: Any,
) -> T

Get entity by search criteria, optionally create if not found.

Searches for an entity matching the provided criteria. If found, returns the existing entity (from cache or fetched). If not found and create_if_missing is True, creates a new entity with the search params as initial data.

Note: New entities are created with UUID IDs and are NOT automatically saved. Call store.save() or entity.save() to persist to Stash.

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
create_if_missing bool

If True, creates new entity if not found. Default: True.

True
**search_params Any

Search criteria (also used as creation data if not found)

{}

Returns:

Type Description
T

Existing or newly created entity

Raises:

Type Description
ValueError

If not found and create_if_missing=False

Example
# Get existing or create new performer
performer = await store.get_or_create(Performer, name="Alice")
if performer._is_new:
    # New performer - save it
    await store.save(performer)

# Link to scene
scene.performers.append(performer)
await store.save(scene)

# Get existing, error if not found
tag = await store.get_or_create(Tag, create_if_missing=False, name="Action")

is_cached

is_cached(entity_type: type[T], entity_id: str) -> bool

Check if object is in cache and not expired (thread-safe).

Parameters:

Name Type Description Default
entity_type type[T]

The Stash entity type

required
entity_id str

Entity ID

required

Returns:

Type Description
bool

True if cached and not expired

cache_stats

cache_stats() -> CacheStats

Get cache statistics (thread-safe).

Returns:

Type Description
CacheStats

CacheStats with total entries, by-type counts, and expired count