from abc import ABC, abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional
from typing_extensions import TypeAlias
import dagster._check as check
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
CoercableToConfigSchema,
IDefinitionConfigSchema,
convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import (
ResourceDefinition,
ResourceFunction,
is_context_provided,
)
if TYPE_CHECKING:
from dagster._core.execution.context.input import InputContext
InputLoadFn: TypeAlias = Callable[["InputContext", object], object]
class IInputManagerDefinition:
@property
@abstractmethod
def input_config_schema(self) -> IDefinitionConfigSchema:
"""The schema for per-input configuration for inputs that are managed by this
input manager.
"""
class InputManagerDefinition(ResourceDefinition, IInputManagerDefinition):
"""Definition of an input manager resource.
Input managers load op inputs.
An InputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an
:py:class:`InputManager`.
The easiest way to create an InputManagerDefinition is with the
:py:func:`@input_manager <input_manager>` decorator.
"""
def __init__(
self,
resource_fn: ResourceFunction,
config_schema: Optional[CoercableToConfigSchema] = None,
description: Optional[str] = None,
input_config_schema: Optional[CoercableToConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
version: Optional[str] = None,
):
self._input_config_schema = convert_user_facing_definition_config_schema(
input_config_schema
)
super(InputManagerDefinition, self).__init__(
resource_fn=resource_fn,
config_schema=config_schema,
description=description,
required_resource_keys=required_resource_keys,
version=version,
)
@property
def input_config_schema(self) -> IDefinitionConfigSchema:
return self._input_config_schema
def copy_for_configured(
self,
description: Optional[str],
config_schema: CoercableToConfigSchema,
) -> "InputManagerDefinition":
return InputManagerDefinition(
config_schema=config_schema,
description=description or self.description,
resource_fn=self.resource_fn,
required_resource_keys=self.required_resource_keys,
input_config_schema=self.input_config_schema,
)
class InputManagerWrapper(InputManager):
def __init__(self, load_fn):
self._load_fn = load_fn
def load_input(self, context):
# the @input_manager decorated function (self._load_fn) may return a direct value that
# should be used or an instance of an InputManager. So we call self._load_fn and see if the
# result is an InputManager. If so we call it's load_input method
intermediate = (
# type-ignore because function being used as attribute
self._load_fn(context)
if is_context_provided(self._load_fn)
else self._load_fn() # type: ignore
)
if isinstance(intermediate, InputManager):
return intermediate.load_input(context)
return intermediate
class _InputManagerDecoratorCallable:
def __init__(
self,
config_schema: CoercableToConfigSchema = None,
description: Optional[str] = None,
version: Optional[str] = None,
input_config_schema: CoercableToConfigSchema = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
):
self.config_schema = config_schema
self.description = check.opt_str_param(description, "description")
self.version = check.opt_str_param(version, "version")
self.input_config_schema = input_config_schema
self.required_resource_keys = required_resource_keys
def __call__(self, load_fn: InputLoadFn) -> InputManagerDefinition:
check.callable_param(load_fn, "load_fn")
def _resource_fn(_):
return InputManagerWrapper(load_fn)
root_input_manager_def = InputManagerDefinition(
resource_fn=_resource_fn,
config_schema=self.config_schema,
description=self.description,
version=self.version,
input_config_schema=self.input_config_schema,
required_resource_keys=self.required_resource_keys,
)
update_wrapper(root_input_manager_def, wrapped=load_fn)
return root_input_manager_def