Source code for dagster._core.storage.input_manager

from abc import ABC, abstractmethod
from functools import update_wrapper
from typing import TYPE_CHECKING, AbstractSet, Callable, Optional

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._core.definitions.config import is_callable_valid_config_arg
from dagster._core.definitions.definition_config_schema import (
    CoercableToConfigSchema,
    IDefinitionConfigSchema,
    convert_user_facing_definition_config_schema,
)
from dagster._core.definitions.resource_definition import (
    ResourceDefinition,
    ResourceFunction,
    is_context_provided,
)

if TYPE_CHECKING:
    from dagster._core.execution.context.input import InputContext

InputLoadFn: TypeAlias = Callable[["InputContext", object], object]


[docs]class InputManager(ABC): """ Base interface for classes that are responsible for loading solid inputs. """ @abstractmethod def load_input(self, context: "InputContext") -> object: """The user-defined read method that loads an input to a solid. Args: context (InputContext): The input context. Returns: Any: The data object. """
class IInputManagerDefinition: @property @abstractmethod def input_config_schema(self) -> IDefinitionConfigSchema: """The schema for per-input configuration for inputs that are managed by this input manager. """ class InputManagerDefinition(ResourceDefinition, IInputManagerDefinition): """Definition of an input manager resource. Input managers load op inputs. An InputManagerDefinition is a :py:class:`ResourceDefinition` whose resource_fn returns an :py:class:`InputManager`. The easiest way to create an InputManagerDefinition is with the :py:func:`@input_manager <input_manager>` decorator. """ def __init__( self, resource_fn: ResourceFunction, config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ): self._input_config_schema = convert_user_facing_definition_config_schema( input_config_schema ) super(InputManagerDefinition, self).__init__( resource_fn=resource_fn, config_schema=config_schema, description=description, required_resource_keys=required_resource_keys, version=version, ) @property def input_config_schema(self) -> IDefinitionConfigSchema: return self._input_config_schema def copy_for_configured( self, description: Optional[str], config_schema: CoercableToConfigSchema, ) -> "InputManagerDefinition": return InputManagerDefinition( config_schema=config_schema, description=description or self.description, resource_fn=self.resource_fn, required_resource_keys=self.required_resource_keys, input_config_schema=self.input_config_schema, )
[docs]def input_manager( config_schema: Optional[CoercableToConfigSchema] = None, description: Optional[str] = None, input_config_schema: Optional[CoercableToConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, version: Optional[str] = None, ) -> Callable[[InputLoadFn], InputManagerDefinition]: """Define an input manager. Input managers load op inputs, either from upstream outputs or by providing default values. The decorated function should accept a :py:class:`InputContext` and resource config, and return a loaded object that will be passed into one of the inputs of an op. The decorator produces an :py:class:`InputManagerDefinition`. Args: config_schema (Optional[ConfigSchema]): The schema for the resource-level config. If not set, Dagster will accept any config provided. description (Optional[str]): A human-readable description of the resource. input_config_schema (Optional[ConfigSchema]): A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided. required_resource_keys (Optional[Set[str]]): Keys for the resources required by the input manager. version (Optional[str]): (Experimental) the version of the input manager definition. **Examples:** .. code-block:: python from dagster import root_input_manager, op, job, In @input_manager def csv_loader(_): return read_csv("some/path") @op(ins={"input1": In(root_manager_key="csv_loader_key")}) def my_op(_, input1): do_stuff(input1) @job(resource_defs={"csv_loader_key": csv_loader}) def my_job(): my_op() @input_manager(config_schema={"base_dir": str}) def csv_loader(context): return read_csv(context.resource_config["base_dir"] + "/some/path") @input_manager(input_config_schema={"path": str}) def csv_loader(context): return read_csv(context.config["path"]) """ if callable(config_schema) and not is_callable_valid_config_arg(config_schema): return _InputManagerDecoratorCallable()(config_schema) def _wrap(load_fn: InputLoadFn) -> InputManagerDefinition: return _InputManagerDecoratorCallable( config_schema=config_schema, description=description, version=version, input_config_schema=input_config_schema, required_resource_keys=required_resource_keys, )(load_fn) return _wrap
class InputManagerWrapper(InputManager): def __init__(self, load_fn): self._load_fn = load_fn def load_input(self, context): # the @input_manager decorated function (self._load_fn) may return a direct value that # should be used or an instance of an InputManager. So we call self._load_fn and see if the # result is an InputManager. If so we call it's load_input method intermediate = ( # type-ignore because function being used as attribute self._load_fn(context) if is_context_provided(self._load_fn) else self._load_fn() # type: ignore ) if isinstance(intermediate, InputManager): return intermediate.load_input(context) return intermediate class _InputManagerDecoratorCallable: def __init__( self, config_schema: CoercableToConfigSchema = None, description: Optional[str] = None, version: Optional[str] = None, input_config_schema: CoercableToConfigSchema = None, required_resource_keys: Optional[AbstractSet[str]] = None, ): self.config_schema = config_schema self.description = check.opt_str_param(description, "description") self.version = check.opt_str_param(version, "version") self.input_config_schema = input_config_schema self.required_resource_keys = required_resource_keys def __call__(self, load_fn: InputLoadFn) -> InputManagerDefinition: check.callable_param(load_fn, "load_fn") def _resource_fn(_): return InputManagerWrapper(load_fn) root_input_manager_def = InputManagerDefinition( resource_fn=_resource_fn, config_schema=self.config_schema, description=self.description, version=self.version, input_config_schema=self.input_config_schema, required_resource_keys=self.required_resource_keys, ) update_wrapper(root_input_manager_def, wrapped=load_fn) return root_input_manager_def