Skip to content

Advanced Filtering with Field-Aware Repository

The StashEntityStore provides advanced filtering methods that combine local caching with smart field-level population. These methods enable high-performance queries by minimizing network traffic while ensuring data completeness.

Overview

The advanced filtering system builds on the UNSET Pattern and Identity Map to provide:

  • Field-level granularity: Track which fields are loaded vs unqueried
  • Smart population: Automatically fetch only missing fields
  • Local filtering: Query cached data without network calls
  • Fail-fast validation: Ensure required fields are present before filtering

Method Comparison

Method Network Calls Returns Use Case
filter() Never list[T] Fast local query (existing)
filter_strict() Never list[T] Fail-fast if fields missing
filter_and_populate() Only for missing fields list[T] Smart hybrid (main workhorse)
filter_and_populate_with_stats() Only for missing fields (list[T], dict) Performance debugging
populated_filter_iter() Only for missing fields AsyncIterator[T] Large datasets, early exit
find() Always list[T] Fresh data from server (existing)

filter_strict() - Fail-Fast Filtering

Filters cached objects, raising an error if any required fields are missing. Useful when you MUST have complete data.

Signature

def filter_strict(
    self,
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
) -> list[T]:

Example

from stash_graphql_client import StashContext, Performer

async with StashContext(conn=conn) as context:
    store = context.store

    # Pre-populate cache with find()
    await store.find(Performer, favorite=True)

    # Strict filtering - raises if any performer missing rating100
    try:
        high_rated = store.filter_strict(
            Performer,
            required_fields=['rating100', 'favorite'],
            predicate=lambda p: p.rating100 >= 80 and p.favorite
        )
        print(f"Found {len(high_rated)} high-rated favorites")
    except ValueError as e:
        # "Performer 123 is missing required fields: {'rating100'}"
        print(f"Cache incomplete: {e}")
        # Fix: warm cache with missing fields
        await store.find(Performer, favorite=True)  # Re-fetch with all fields

When to Use

  • Data validation: Ensure cache is complete before processing
  • Debugging: Identify incomplete cache population
  • Critical operations: Operations that require guaranteed field presence

filter_and_populate() - Smart Hybrid Filtering

The main workhorse method. Filters cached objects, automatically fetching missing fields as needed. Much faster than find() when most data is cached.

Signature

async def filter_and_populate(
    self,
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    batch_size: int = 50,
) -> list[T]:

Example

from stash_graphql_client import StashContext, Performer

async with StashContext(conn=conn) as context:
    store = context.store

    # Day 1: User browses performers (basic info only)
    async for performer in store.find_iter(Performer, query_batch=100):
        # Cache now has 5000 performers with: id, name, gender
        # But rating100, favorite, scenes are UNSET
        display_in_ui(performer)

    # Day 2: User wants to filter by rating (cache is partial)
    high_rated = await store.filter_and_populate(
        Performer,
        required_fields=['rating100', 'favorite'],
        predicate=lambda p: p.rating100 >= 80 and p.favorite
    )
    # ✓ Only fetches rating100+favorite for the 5000 performers
    # ✓ Much smaller payload than re-fetching full performer data
    # ✓ Results: All matching performers with complete data

Performance Benefits

Scenario: Cache has 1000 performers with basic info, need to filter by rating100

Approach Network Payload Time Description
find() ~500KB ~500ms Re-fetch all 1000 × full data
filter_and_populate() ~50KB ~50ms Fetch 1000 × 1 field only
Speedup 10x smaller 10x faster Field-level fetching wins

Batch Size Parameter

Controls how many entities are populated concurrently:

# Default: 50 concurrent populates
results = await store.filter_and_populate(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 80
)

# Smaller batches (gentler on server)
results = await store.filter_and_populate(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 80,
    batch_size=10  # Only 10 concurrent requests
)

filter_and_populate_with_stats() - Debug Variant

Same as filter_and_populate() but returns detailed statistics. Useful for performance optimization.

Signature

async def filter_and_populate_with_stats(
    self,
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    batch_size: int = 50,
) -> tuple[list[T], dict[str, Any]]:

Example

from stash_graphql_client import StashContext, Performer

async with StashContext(conn=conn) as context:
    store = context.store

    # Filter with statistics
    results, stats = await store.filter_and_populate_with_stats(
        Performer,
        required_fields=['rating100', 'favorite'],
        predicate=lambda p: p.rating100 >= 80 and p.favorite
    )

    # Analyze cache performance
    print(f"Total cached: {stats['total_cached']}")
    print(f"Needed population: {stats['needed_population']}")
    print(f"Cache hit rate: {stats['cache_hit_rate']:.1%}")
    print(f"Found matches: {stats['matches']}")

    # Example output:
    # Total cached: 1000
    # Needed population: 500
    # Cache hit rate: 50.0%
    # Found matches: 237

Statistics Dictionary

{
    "total_cached": 1000,           # Total objects in cache
    "needed_population": 500,       # How many needed fields fetched
    "populated_fields": ["rating100", "favorite"],  # Which fields
    "matches": 237,                 # How many matched predicate
    "cache_hit_rate": 0.5           # 50% had complete data
}

When to Use

  • Performance analysis: Identify cache inefficiencies
  • Optimization: Determine if cache warming is needed
  • Debugging: Understand why queries are slow

populated_filter_iter() - Lazy Async Iterator

Lazy version of filter_and_populate() that yields results incrementally. Great for large datasets where you want to start processing immediately or can short-circuit early.

Signature

async def populated_filter_iter(
    self,
    entity_type: type[T],
    required_fields: set[str] | list[str],
    predicate: Callable[[T], bool],
    populate_batch: int = 50,
    yield_batch: int = 10,
) -> AsyncIterator[T]:

Example: Early Exit

from stash_graphql_client import StashContext, Performer

async with StashContext(conn=conn) as context:
    store = context.store

    # Find first 10 high-rated performers from 10,000 cached
    count = 0
    async for performer in store.populated_filter_iter(
        Performer,
        required_fields=['rating100', 'scenes'],
        predicate=lambda p: p.rating100 >= 90 and len(p.scenes) > 100,
        populate_batch=50,   # Fetch 50 at a time
        yield_batch=10       # Yield after processing each 10
    ):
        # Start processing immediately as matches are found
        await expensive_operation(performer)
        count += 1

        # Early exit - don't process all 10,000
        if count >= 10:
            break  # Only processed ~100-200 performers

Example: Incremental Processing

from stash_graphql_client import StashContext, Scene

async with StashContext(conn=conn) as context:
    store = context.store

    # Process large dataset incrementally
    processed = 0
    async for scene in store.populated_filter_iter(
        Scene,
        required_fields=['file', 'performers'],
        predicate=lambda s: s.file.size > 1_000_000_000,  # > 1GB
        populate_batch=50,
        yield_batch=10
    ):
        # Yields results as they're ready (doesn't wait for all)
        await process_large_scene(scene)
        processed += 1

        # Update progress bar
        if processed % 10 == 0:
            print(f"Processed {processed} large scenes...")

Batch Parameters

async for item in store.populated_filter_iter(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 80,
    populate_batch=50,  # Concurrent populates per sub-batch
    yield_batch=10      # Process 10 entities before yielding matches
):
    process(item)
  • populate_batch: How many to populate concurrently (default: 50)
  • yield_batch: How many to process before yielding (default: 10)

When to Use

  • Large datasets: Process 10,000+ entities incrementally
  • Early exit: Stop processing when you find enough matches
  • Memory efficiency: Don't load all results into memory
  • Progress reporting: Update UI as results stream in

Real-World Workflow Example

from stash_graphql_client import StashContext, Performer

async with StashContext(conn=conn) as context:
    store = context.store

    # Step 1: Initial cache population (lightweight)
    print("Loading performers...")
    async for performer in store.find_iter(Performer, query_batch=100):
        # Loads: id, name, gender (minimal fields)
        cache_performer(performer)
    print(f"Cached {len(store.all_cached(Performer))} performers")

    # Step 2: User filters by rating (partial data in cache)
    print("\nFinding high-rated performers...")
    results, stats = await store.filter_and_populate_with_stats(
        Performer,
        required_fields=['rating100', 'favorite'],
        predicate=lambda p: p.rating100 >= 80 and p.favorite
    )

    print(f"Cache hit rate: {stats['cache_hit_rate']:.1%}")
    print(f"Fetched fields for: {stats['needed_population']} performers")
    print(f"Found: {len(results)} matches")

    # Step 3: Verify cache before expensive operation
    try:
        verified = store.filter_strict(
            Performer,
            required_fields=['rating100', 'favorite', 'scenes'],
            predicate=lambda p: p in results
        )
        # All performers now guaranteed to have scenes field
        for performer in verified:
            process_with_scenes(performer)
    except ValueError:
        # Some performers missing 'scenes' field - populate it
        for performer in results:
            if not store.has_fields(performer, 'scenes'):
                await store.populate(performer, fields=['scenes'])

Best Practices

1. Choose the Right Method

# ✓ Fast local query, data already complete
results = store.filter(Performer, lambda p: p.rating100 >= 80)

# ✓ Smart hybrid, auto-populate missing fields
results = await store.filter_and_populate(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 80
)

# ✓ Fail-fast validation before critical operation
results = store.filter_strict(
    Performer,
    required_fields=['rating100', 'scenes'],
    predicate=lambda p: p.rating100 >= 80
)

# ✓ Large dataset with early exit
async for item in store.populated_filter_iter(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 95
):
    if found_enough():
        break

2. Cache Warming Strategy

# Strategy 1: Minimal initial load
async for performer in store.find_iter(Performer):
    # Loads minimal fields
    pass

# Later: Populate on-demand
results = await store.filter_and_populate(
    Performer,
    required_fields=['rating100'],
    predicate=lambda p: p.rating100 >= 80
)

# Strategy 2: Pre-load common fields
await store.find(
    Performer,
    # Specify fields in GraphQL query (if supported by client method)
)

3. Performance Monitoring

# Use stats variant to optimize
for _ in range(10):
    results, stats = await store.filter_and_populate_with_stats(
        Performer,
        required_fields=['rating100'],
        predicate=lambda p: p.rating100 >= 80
    )

    if stats['cache_hit_rate'] < 0.5:
        print(f"Warning: Low cache hit rate ({stats['cache_hit_rate']:.1%})")
        print("Consider cache warming or adjusting TTL")

4. Field Dependencies

# If predicate needs multiple fields, list them all
results = await store.filter_and_populate(
    Scene,
    required_fields=['rating100', 'performers', 'tags', 'studio'],
    predicate=lambda s: (
        s.rating100 >= 80 and
        len(s.performers) > 2 and
        any(t.name == "Action" for t in s.tags) and
        s.studio.name == "Acme Studios"
    )
)

Nested Field Filtering

All advanced filter methods support Django-style nested field specifications using double-underscore (__) syntax. This allows you to filter on properties of related objects without manual joins.

Syntax

  • Simple field: 'rating100', 'favorite'
  • Nested field: 'files__path', 'studio__parent__name'
  • Deep nesting: 'studio__parent__parent__name' (arbitrary depth)
  • Mixed: ['rating100', 'files__path', 'studio__name']

Example: Filter Images by File Properties

from stash_graphql_client import StashContext, Image

async with StashContext(conn=conn) as context:
    store = context.store

    # Filter images by nested file properties
    large_images = await store.filter_and_populate(
        Image,
        required_fields=['files__path', 'files__size'],
        predicate=lambda img: any(
            f.size > 10_000_000  # 10MB
            for f in img.files
            if f.size is not None
        )
    )

    print(f"Found {len(large_images)} images with files > 10MB")
    for image in large_images:
        for file in image.files:
            if file.size > 10_000_000:
                print(f"  {file.path}: {file.size / 1_000_000:.1f}MB")

Example: Filter by Studio Hierarchy

from stash_graphql_client import StashContext, Scene

async with StashContext(conn=conn) as context:
    store = context.store

    # Filter scenes by studio's parent studio
    acme_network_scenes = await store.filter_and_populate(
        Scene,
        required_fields=['studio__parent__name', 'title'],
        predicate=lambda s: (
            s.studio is not None and
            s.studio.parent is not None and
            s.studio.parent.name == "Acme Network"
        )
    )

    print(f"Found {len(acme_network_scenes)} scenes from Acme Network subsidiaries")

How It Works

When you specify a nested field like 'files__path', the filter method:

  1. Parses the specification: 'files__path'['files', 'path']
  2. Checks root field: Ensures files relationship is populated
  3. Recursively checks nested fields: Ensures path is populated on each File object
  4. Auto-populates missing data: Fetches only what's needed from the server

Benefits

  • No manual joins: Express complex queries naturally
  • Selective fetching: Only fetch fields actually needed
  • Type-safe: Compile-time checking with IDE autocomplete
  • Efficient: Uses identity map to avoid duplicate fetches

Nested Field Examples

# Filter performers by scene count
high_activity_performers = await store.filter_and_populate(
    Performer,
    required_fields=['scenes__title'],  # Ensure scenes relationship loaded
    predicate=lambda p: len(p.scenes) > 100
)

# Filter scenes by performer rating
quality_cast_scenes = await store.filter_and_populate(
    Scene,
    required_fields=['performers__rating100'],
    predicate=lambda s: all(
        p.rating100 >= 80
        for p in s.performers
        if p.rating100 is not None
    )
)

# Filter studios by country (via parent)
us_studios = await store.filter_and_populate(
    Studio,
    required_fields=['parent__country'],
    predicate=lambda s: (
        s.parent is not None and
        s.parent.country == "US"
    )
)

With filter_strict()

from stash_graphql_client import StashContext, Image

async with StashContext(conn=conn) as context:
    store = context.store

    # This raises ValueError if ANY image has incomplete file data
    try:
        large_images = store.filter_strict(
            Image,
            required_fields=['files__path', 'files__size'],
            predicate=lambda i: any(f.size > 10_000_000 for f in i.files)
        )
    except ValueError as e:
        print(f"Cache incomplete: {e}")
        # Use filter_and_populate instead

Performance Tip

Nested field filtering is most efficient when:

  1. Root relationships are already cached
  2. Only leaf fields need population
  3. Batch sizes are tuned for your dataset
# Good: Most images already have files loaded
large_images = await store.filter_and_populate(
    Image,
    required_fields=['files__size'],  # Only fetch size
    predicate=lambda i: any(f.size > 10_000_000 for f in i.files),
    batch_size=100  # Process 100 images concurrently
)

Integration with UNSET Pattern

The advanced filter methods work seamlessly with the UNSET Pattern:

from stash_graphql_client.types.unset import UNSET

# Manual check for UNSET fields
if performer.rating100 is UNSET:
    # Field not queried - use filter_and_populate
    results = await store.filter_and_populate(
        Performer,
        required_fields=['rating100'],
        predicate=lambda p: p.id == performer.id
    )
else:
    # Field is loaded (None or value) - safe to use
    if performer.rating100 and performer.rating100 >= 80:
        process(performer)

# Or use filter_strict to enforce
try:
    results = store.filter_strict(
        Performer,
        required_fields=['rating100'],
        predicate=lambda p: p.rating100 >= 80
    )
except ValueError:
    # Some performers have rating100=UNSET
    results = await store.filter_and_populate(
        Performer,
        required_fields=['rating100'],
        predicate=lambda p: p.rating100 >= 80
    )

See Also