Source code for dagster._core.execution.context.hook

import warnings
from typing import AbstractSet, Any, Dict, Mapping, Optional, Set, Union

import dagster._check as check
from dagster._annotations import public

from ...definitions.composition import PendingNodeInvocation
from ...definitions.decorators.graph_decorator import graph
from ...definitions.dependency import Node
from ...definitions.hook_definition import HookDefinition
from ...definitions.op_definition import OpDefinition
from ...definitions.resource_definition import IContainsGenerator, Resources
from ...errors import DagsterInvalidPropertyError, DagsterInvariantViolationError
from ...log_manager import DagsterLogManager
from ..plan.step import ExecutionStep
from ..plan.utils import RetryRequestedFromPolicy
from .system import StepExecutionContext


def _property_msg(prop_name: str, method_name: str) -> str:
    return (
        f"The {prop_name} {method_name} is not set when a `HookContext` is constructed from "
        "`build_hook_context`."
    )


def _check_property_on_test_context(
    context: "HookContext", attr_str: str, user_facing_name: str, param_on_builder: str
):
    """Check if attribute is not None on context. If none, error, and point user in direction of
    how to specify the parameter on the context object.
    """
    value = getattr(context, attr_str)
    if value is None:
        raise DagsterInvalidPropertyError(
            f"Attribute '{user_facing_name}' was not provided when "
            f"constructing context. Provide a value for the '{param_on_builder}' parameter on "
            "'build_hook_context'. To learn more, check out the testing hooks section of Dagster's "
            "concepts docs: https://docs.dagster.io/concepts/ops-jobs-graphs/op-hooks#testing-hooks"
        )
    else:
        return value


[docs]class HookContext: """The ``context`` object available to a hook function on an DagsterEvent. Attributes: log (DagsterLogManager): Centralized log dispatch from user code. hook_def (HookDefinition): The hook that the context object belongs to. op (Op): The op instance associated with the hook. step_key (str): The key for the step where this hook is being triggered. required_resource_keys (Set[str]): Resources required by this hook. resources (Resources): Resources available in the hook context. op_config (Any): The parsed config specific to this op. job_name (str): The name of the job where this hook is being triggered. run_id (str): The id of the run where this hook is being triggered. op_exception (Optional[BaseException]): The thrown exception in a failed op. op_output_values (Dict): Computed output values in an op. """ def __init__( self, step_execution_context: StepExecutionContext, hook_def: HookDefinition, ): self._step_execution_context = step_execution_context self._hook_def = check.inst_param(hook_def, "hook_def", HookDefinition) self._required_resource_keys = hook_def.required_resource_keys self._resources = step_execution_context.scoped_resources_builder.build( self._required_resource_keys ) @property def pipeline_name(self) -> str: return self.job_name @public # type: ignore @property def job_name(self) -> str: return self._step_execution_context.job_name @public # type: ignore @property def run_id(self) -> str: return self._step_execution_context.run_id @public # type: ignore @property def hook_def(self) -> HookDefinition: return self._hook_def @property def op(self) -> Node: return self._step_execution_context.solid @property def step(self) -> ExecutionStep: warnings.warn( "The step property of HookContext has been deprecated, and will be removed " "in a future release." ) return self._step_execution_context.step @public # type: ignore @property def step_key(self) -> str: return self._step_execution_context.step.key @public # type: ignore @property def required_resource_keys(self) -> AbstractSet[str]: return self._required_resource_keys @public # type: ignore @property def resources(self) -> "Resources": return self._resources @property def solid_config(self) -> Any: solid_config = self._step_execution_context.resolved_run_config.solids.get( str(self._step_execution_context.step.solid_handle) ) return solid_config.config if solid_config else None @public # type: ignore @property def op_config(self) -> Any: return self.solid_config # Because of the fact that we directly use the log manager of the step, if a user calls # hook_context.log.with_tags, then they will end up mutating the step's logging tags as well. # This is not problematic because the hook only runs after the step has been completed. @public # type: ignore @property def log(self) -> DagsterLogManager: return self._step_execution_context.log @property def solid_exception(self) -> Optional[BaseException]: """The thrown exception in a failed solid. Returns: Optional[BaseException]: the exception object, None if the solid execution succeeds. """ return self.op_exception @public # type: ignore @property def op_exception(self): exc = self._step_execution_context.step_exception if isinstance(exc, RetryRequestedFromPolicy): return exc.__cause__ return exc @property def solid_output_values(self) -> Mapping[str, Union[Any, Mapping[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ results: Dict[str, Union[Any, Dict[str, Any]]] = {} captured = self._step_execution_context.step_output_capture if captured is None: check.failed("Outputs were unexpectedly not captured for hook") # make the returned values more user-friendly for step_output_handle, value in captured.items(): if step_output_handle.mapping_key: if results.get(step_output_handle.output_name) is None: results[step_output_handle.output_name] = { step_output_handle.mapping_key: value } else: results[step_output_handle.output_name][step_output_handle.mapping_key] = value else: results[step_output_handle.output_name] = value return results @public # type: ignore @property def op_output_values(self): return self.solid_output_values
class UnboundHookContext(HookContext): def __init__( self, resources: Mapping[str, Any], op: Optional[Union[OpDefinition, PendingNodeInvocation]], run_id: Optional[str], job_name: Optional[str], op_exception: Optional[Exception], ): # pylint: disable=super-init-not-called from ..build_resources import build_resources, wrap_resources_for_execution from ..context_creation_pipeline import initialize_console_manager self._op = None if op is not None: @graph(name="hook_context_container") def temp_graph(): op() self._op = temp_graph.solids[0] # Open resource context manager self._resource_defs = wrap_resources_for_execution(resources) self._resources_cm = build_resources(self._resource_defs) self._resources = self._resources_cm.__enter__() # pylint: disable=no-member self._resources_contain_cm = isinstance(self._resources, IContainsGenerator) self._run_id = run_id self._job_name = job_name self._op_exception = op_exception self._log = initialize_console_manager(None) self._cm_scope_entered = False def __enter__(self): self._cm_scope_entered = True return self def __exit__(self, *exc): self._resources_cm.__exit__(*exc) # pylint: disable=no-member def __del__(self): if self._resources_contain_cm and not self._cm_scope_entered: self._resources_cm.__exit__(None, None, None) # pylint: disable=no-member @property def job_name(self) -> str: return self.pipeline_name @property def run_id(self) -> str: return _check_property_on_test_context( self, attr_str="_run_id", user_facing_name="run_id", param_on_builder="run_id" ) @property def hook_def(self) -> HookDefinition: raise DagsterInvalidPropertyError(_property_msg("hook_def", "property")) @property def op(self) -> Node: return _check_property_on_test_context( self, attr_str="_op", user_facing_name="op", param_on_builder="op" ) @property def step(self) -> ExecutionStep: raise DagsterInvalidPropertyError(_property_msg("step", "property")) @property def step_key(self) -> str: raise DagsterInvalidPropertyError(_property_msg("step_key", "property")) @property def required_resource_keys(self) -> Set[str]: raise DagsterInvalidPropertyError(_property_msg("hook_def", "property")) @property def resources(self) -> "Resources": if self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access " "resources outside of context manager scope. You can use the following syntax to " "open a context manager: `with build_hook_context(...) as context:`" ) return self._resources @property def solid_config(self) -> Any: raise DagsterInvalidPropertyError(_property_msg("solid_config", "property")) @property def log(self) -> DagsterLogManager: return self._log @property def op_exception(self) -> Optional[BaseException]: return self._op_exception @property def solid_output_values(self) -> Mapping[str, Union[Any, Mapping[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ raise DagsterInvalidPropertyError(_property_msg("solid_output_values", "method")) class BoundHookContext(HookContext): def __init__( self, hook_def: HookDefinition, resources: Resources, op: Optional[Node], log_manager: DagsterLogManager, run_id: Optional[str], job_name: Optional[str], op_exception: Optional[Exception], ): # pylint: disable=super-init-not-called self._hook_def = hook_def self._resources = resources self._op = op self._log_manager = log_manager self._run_id = run_id self._job_name = job_name self._op_exception = op_exception @property def job_name(self) -> str: return _check_property_on_test_context( self, attr_str="_job_name", user_facing_name="job_name", param_on_builder="job_name" ) @property def run_id(self) -> str: return _check_property_on_test_context( self, attr_str="_run_id", user_facing_name="run_id", param_on_builder="run_id" ) @property def hook_def(self) -> HookDefinition: return self._hook_def @property def op(self) -> Node: return _check_property_on_test_context( self, attr_str="_op", user_facing_name="op", param_on_builder="op" ) @property def step(self) -> ExecutionStep: raise DagsterInvalidPropertyError(_property_msg("step", "property")) @property def step_key(self) -> str: raise DagsterInvalidPropertyError(_property_msg("step_key", "property")) @property def required_resource_keys(self) -> AbstractSet[str]: return self._hook_def.required_resource_keys @property def resources(self) -> "Resources": return self._resources @property def solid_config(self) -> Any: raise DagsterInvalidPropertyError(_property_msg("solid_config", "property")) @property def log(self) -> DagsterLogManager: return self._log_manager @property def op_exception(self): return self._op_exception @property def solid_output_values(self) -> Mapping[str, Union[Any, Mapping[str, Any]]]: """The computed output values. Returns a dictionary where keys are output names and the values are: * the output values in the normal case * a dictionary from mapping key to corresponding value in the mapped case """ raise DagsterInvalidPropertyError(_property_msg("solid_output_values", "method"))
[docs]def build_hook_context( resources: Optional[Mapping[str, Any]] = None, op: Optional[Union[OpDefinition, PendingNodeInvocation]] = None, run_id: Optional[str] = None, job_name: Optional[str] = None, op_exception: Optional[Exception] = None, ) -> UnboundHookContext: """Builds hook context from provided parameters. ``build_hook_context`` can be used as either a function or a context manager. If there is a provided resource to ``build_hook_context`` that is a context manager, then it must be used as a context manager. This function can be used to provide the context argument to the invocation of a hook definition. Args: resources (Optional[Dict[str, Any]]): The resources to provide to the context. These can either be values or resource definitions. op (Optional[OpDefinition, PendingNodeInvocation]): The op definition which the hook may be associated with. run_id (Optional[str]): The id of the run in which the hook is invoked (provided for mocking purposes). job_name (Optional[str]): The name of the job in which the hook is used (provided for mocking purposes). op_exception (Optional[Exception]): The exception that caused the hook to be triggered. Examples: .. code-block:: python context = build_hook_context() hook_to_invoke(context) with build_hook_context(resources={"foo": context_manager_resource}) as context: hook_to_invoke(context) """ op = check.opt_inst_param(op, "op", (OpDefinition, PendingNodeInvocation)) return UnboundHookContext( resources=check.opt_mapping_param(resources, "resources", key_type=str), op=op, # type: ignore[arg-type] # (mypy bug) run_id=check.opt_str_param(run_id, "run_id"), job_name=check.opt_str_param(job_name, "job_name"), op_exception=check.opt_inst_param(op_exception, "op_exception", Exception), )