import json
import logging
import platform
import sys
import time
import uuid
from base64 import standard_b64encode as b64
from typing import Any, Dict, Mapping, Optional, Sequence, cast
import requests
import requests.utils
from dagster import (
    Failure,
    Field,
    IntSource,
    RetryRequested,
    StringSource,
    _check as check,
    resource,
)
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.utils import coerce_valid_log_level
from ..dbt_resource import DbtResource
from .types import DbtRpcOutput
from .utils import is_fatal_code
[docs]class DbtRpcResource(DbtResource):
    """A client for a dbt RPC server.
    To use this as a dagster resource, we recommend using
    :func:`dbt_rpc_resource <dagster_dbt.dbt_rpc_resource>`.
    """
    def __init__(
        self,
        host: str = "0.0.0.0",
        port: int = 8580,
        jsonrpc_version: str = "2.0",
        logger: Optional[Any] = None,
        **_,
    ):
        """Constructor.
        Args:
            host (str): The IP address of the host of the dbt RPC server. Default is ``"0.0.0.0"``.
            port (int): The port of the dbt RPC server. Default is ``8580``.
            jsonrpc_version (str): The JSON-RPC version to send in RPC requests.
                Default is ``"2.0"``.
            logger (Optional[Any]): A property for injecting a logger dependency.
                Default is ``None``.
        """
        check.str_param(host, "host")
        check.int_param(port, "port")
        check.str_param(jsonrpc_version, "jsonrpc_version")
        self._host = host
        self._port = port
        self._jsonrpc_version = jsonrpc_version
        super().__init__(logger)
    @staticmethod
    def _construct_user_agent() -> str:
        """A helper method to construct a standard User-Agent string to be used in HTTP request
        headers.
        Returns:
            str: The constructed User-Agent value.
        """
        client = "dagster/dbt-rpc-client"
        python_version = (
            f"Python/{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
        )
        system_info = f"{platform.system()}/{platform.release()}"
        user_agent = " ".join([python_version, client, system_info])
        return user_agent
    def _construct_headers(self) -> Dict[str, str]:
        """Constructs a standard set of headers for HTTP requests.
        Returns:
            Dict[str, str]: The HTTP request headers.
        """
        headers = requests.utils.default_headers()
        headers["User-Agent"] = self._construct_user_agent()
        headers["Content-Type"] = "application/json"
        headers["Accept"] = "application/json"
        return cast(Dict[str, str], headers)
    def _post(self, data: Optional[str] = None) -> DbtRpcOutput:
        """Constructs and sends a POST request to the dbt RPC server.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        headers = self._construct_headers()
        try:
            response = requests.post(self.url, headers=headers, data=data)
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            if is_fatal_code(e):
                raise e
            else:
                raise RetryRequested(max_retries=5, seconds_to_wait=30)
        return DbtRpcOutput(response)
    def _get_result(self, data: Optional[str] = None) -> DbtRpcOutput:
        """Constructs and sends a POST request to the dbt RPC server.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        return self._post(data)
    def _default_request(
        self, method: str, params: Optional[Mapping[str, Any]] = None
    ) -> Dict[str, Any]:
        """Constructs a standard HTTP request body, to be sent to a dbt RPC server.
        Args:
            method (str): a dbt RPC method.
        Returns:
            Dict: the constructed HTTP request body.
        """
        data = {
            "jsonrpc": self.jsonrpc_version,
            "method": method,
            "id": str(uuid.uuid1()),
            "params": params or {},
        }
        return data
    @property
    def host(self) -> str:
        """str: The IP address of the host of the dbt RPC server."""
        return self._host
    @property
    def port(self) -> int:
        """int: The port of the dbt RPC server."""
        return self._port
    @property
    def jsonrpc_version(self) -> str:
        """str: The JSON-RPC version to send in RPC requests."""
        return self._jsonrpc_version
    @property
    def logger(self) -> logging.Logger:
        """logging.Logger: A property for injecting a logger dependency."""
        return self._logger
    @property
    def url(self) -> str:
        """str: The URL for sending dbt RPC requests."""
        return f"http://{self.host}:{self.port}/jsonrpc"
    def status(self):
        """Sends a request with the method ``status`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for the RPC method `status
        <https://docs.getdbt.com/reference/commands/rpc/#status>`_.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        data = self._default_request(method="status")
        return self._post(data=json.dumps(data))
    def ls(
        self,
        select: Optional[Sequence[str]] = None,
        models: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``list`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for `list
        <https://docs.getdbt.com/reference/commands/rpc/#list>`_.
        Args:
            select (List[str], optional): the resources to include in the output.
            models (List[str], optional): the models to include in the output.
            exclude (List[str]), optional): the resources to exclude from compilation.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(models=models, exclude=exclude)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="list", params=params)
        return self._get_result(data=json.dumps(data))
    def poll(self, request_token: str, logs: bool = False, logs_start: int = 0) -> DbtRpcOutput:
        """Sends a request with the method ``poll`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `poll
        <https://docs.getdbt.com/reference/commands/rpc/#poll>`_.
        Args:
            request_token (str): the token to poll responses for.
            logs (bool): Whether logs should be returned in the response. Defaults to ``False``.
            logs_start (int): The zero-indexed log line to fetch logs from. Defaults to ``0``.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        data = self._default_request(method="poll")
        data["params"] = {"request_token": request_token, "logs": logs, "logs_start": logs_start}
        return self._post(data=json.dumps(data))
    def ps(self, completed: bool = False) -> DbtRpcOutput:
        """Sends a request with the method ``ps`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `ps
        <https://docs.getdbt.com/reference/commands/rpc/#ps>`_.
        Args:
            compelted (bool): If ``True``, then also return completed tasks. Defaults to ``False``.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        data = self._default_request(method="ps")
        data["params"] = {"completed": completed}
        return self._post(data=json.dumps(data))
    def kill(self, task_id: str) -> DbtRpcOutput:
        """Sends a request with the method ``kill`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `kill
        <https://docs.getdbt.com/reference/commands/rpc/#kill>`_.
        Args:
            task_id (str): the ID of the task to terminate.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        data = self._default_request(method="kill")
        data["params"] = {"task_id": task_id}
        return self._post(data=json.dumps(data))
    def cli(self, command: str, **kwargs) -> DbtRpcOutput:
        """Sends a request with CLI syntax to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for `running CLI commands via RPC
        <https://docs.getdbt.com/reference/commands/rpc/#running-a-task-with-cli-syntax>`_.
        Args:
            cli (str): a dbt command in CLI syntax.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        params = self._format_params({"cli": command, **kwargs})
        data = self._default_request(method="cli_args", params=params)
        return self._get_result(data=json.dumps(data))
    def compile(
        self,
        models: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``compile`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for `compiling projects via RPC
        <https://docs.getdbt.com/reference/commands/rpc/#compile-a-project>`_.
        Args:
            models (List[str], optional): the models to include in compilation.
            exclude (List[str]), optional): the models to exclude from compilation.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(models=models, exclude=exclude)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="compile", params=params)
        return self._get_result(data=json.dumps(data))
    def run(
        self,
        models: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``run`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `run
        <https://docs.getdbt.com/reference/commands/rpc/#run-models>`_.
        Args:
            models (List[str], optional): the models to include in the run.
            exclude (List[str]), optional): the models to exclude from the run.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(models=models, exclude=exclude)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="run", params=params)
        return self._get_result(data=json.dumps(data))
    def snapshot(
        self,
        select: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``snapshot`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for the command `snapshot
        <https://docs.getdbt.com/reference/commands/snapshot>`_.
        Args:
            select (List[str], optional): the snapshots to include in the run.
            exclude (List[str], optional): the snapshots to exclude from the run.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(select=select, exclude=exclude)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="snapshot", params=params)
        return self._get_result(data=json.dumps(data))
    def test(
        self,
        models: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        data: bool = True,
        schema: bool = True,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``test`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `test
        <https://docs.getdbt.com/reference/commands/rpc/#run-test>`_.
        Args:
            models (List[str], optional): the models to include in testing.
            exclude (List[str], optional): the models to exclude from testing.
            data (bool, optional): If ``True`` (default), then run data tests.
            schema (bool, optional): If ``True`` (default), then run schema tests.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(models=models, exclude=exclude, data=data, schema=schema)
        params = self._format_params({**explicit_params, **kwargs})
        request_data = self._default_request(method="test", params=params)
        return self._get_result(data=json.dumps(request_data))
    def seed(
        self,
        show: bool = False,
        select: Optional[Sequence[str]] = None,
        exclude: Optional[Sequence[str]] = None,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``seed`` to the dbt RPC server, and returns the response.
        For more details, see the dbt docs for the RPC method `seed
        <https://docs.getdbt.com/reference/commands/rpc/#run-seed>`_.
        Args:
            show (bool, optional): If ``True``, then show a sample of the seeded data in the
                response. Defaults to ``False``.
            select (List[str], optional): the snapshots to include in the run.
            exclude (List[str], optional): the snapshots to exclude from the run.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        data = self._default_request(method="seed")
        params: Dict[str, Any] = {"show": show}
        if kwargs is not None:
            params["task_tags"] = kwargs
        data["params"] = params
        return self._get_result(data=json.dumps(data))
    def generate_docs(
        self,
        compile_project: bool = False,
        **kwargs,
    ) -> DbtRpcOutput:
        """Sends a request with the method ``docs.generate`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for the RPC method `docs.generate
        <https://docs.getdbt.com/reference/commands/rpc/#generate-docs>`_.
        Args:
            compile_project (bool, optional): If true, compile the project before generating a catalog.
        """
        explicit_params = dict(compile=compile_project)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="docs.generate", params=params)
        return self._get_result(data=json.dumps(data))
    def run_operation(
        self, macro: str, args: Optional[Mapping[str, Any]] = None, **kwargs
    ) -> DbtRpcOutput:
        """Sends a request with the method ``run-operation`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for the command `run-operation
        <https://docs.getdbt.com/reference/commands/run-operation>`_.
        Args:
            macro (str): the dbt macro to invoke.
            args (Dict[str, Any], optional): the keyword arguments to be supplied to the macro.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(macro=macro, args=args)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="run-operation", params=params)
        return self._get_result(data=json.dumps(data))
    def snapshot_freshness(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtRpcOutput:
        """Sends a request with the method ``snapshot-freshness`` to the dbt RPC server, and returns
        the response. For more details, see the dbt docs for the command `source snapshot-freshness
        <https://docs.getdbt.com/reference/commands/source#dbt-source-snapshot-freshness>`_.
        Args:
            select (List[str], optional): the models to include in calculating snapshot freshness.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(select=select)
        params = self._format_params({**explicit_params, **kwargs})
        data = self._default_request(method="snapshot-freshness", params=params)
        return self._get_result(data=json.dumps(data))
    def compile_sql(self, sql: str, name: str) -> DbtRpcOutput:
        """Sends a request with the method ``compile_sql`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for `compiling SQL via RPC
        <https://docs.getdbt.com/reference/commands/rpc#compiling-a-query>`_.
        Args:
            sql (str): the SQL to compile in base-64 encoding.
            name (str): a name for the compiled SQL.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(sql=b64(sql.encode("utf-8")).decode("utf-8"), name=name)
        params = self._format_params(explicit_params)
        data = self._default_request(method="compile_sql", params=params)
        return self._get_result(data=json.dumps(data))
    def run_sql(self, sql: str, name: str) -> DbtRpcOutput:
        """Sends a request with the method ``run_sql`` to the dbt RPC server, and returns the
        response. For more details, see the dbt docs for `running SQL via RPC
        <https://docs.getdbt.com/reference/commands/rpc#executing-a-query>`_.
        Args:
            sql (str): the SQL to run in base-64 encoding.
            name (str): a name for the compiled SQL.
        Returns:
            Response: the HTTP response from the dbt RPC server.
        """
        explicit_params = dict(sql=b64(sql.encode("utf-8")).decode("utf-8"), name=name)
        params = self._format_params(explicit_params)
        data = self._default_request(method="run_sql", params=params)
        return self._get_result(data=json.dumps(data))
    def build(self, select: Optional[Sequence[str]] = None, **kwargs) -> DbtRpcOutput:
        """Run the ``build`` command on a dbt project. kwargs are passed in as additional parameters.
        Args:
            select (List[str], optional): the models/resources to include in the run.
        Returns:
            DbtOutput: object containing parsed output from dbt
        """
        ...
        raise NotImplementedError()
    def get_run_results_json(self, **kwargs) -> Optional[Mapping[str, Any]]:
        """Get a parsed version of the run_results.json file for the relevant dbt project.
        Returns:
            Dict[str, Any]: dictionary containing the parsed contents of the run_results json file
                for this dbt project.
        """
        ...
        raise NotImplementedError()
    def get_manifest_json(self, **kwargs) -> Optional[Mapping[str, Any]]:
        """Get a parsed version of the manifest.json file for the relevant dbt project.
        Returns:
            Dict[str, Any]: dictionary containing the parsed contents of the manifest json file
                for this dbt project.
        """
        ...
        raise NotImplementedError() 
[docs]class DbtRpcSyncResource(DbtRpcResource):
    def __init__(
        self,
        host: str = "0.0.0.0",
        port: int = 8580,
        jsonrpc_version: str = "2.0",
        logger: Optional[Any] = None,
        poll_interval: int = 1,
        **_,
    ):
        """Constructor.
        Args:
            host (str): The IP address of the host of the dbt RPC server. Default is ``"0.0.0.0"``.
            port (int): The port of the dbt RPC server. Default is ``8580``.
            jsonrpc_version (str): The JSON-RPC version to send in RPC requests.
                Default is ``"2.0"``.
            logger (Optional[Any]): A property for injecting a logger dependency.
                Default is ``None``.
            poll_interval (int): The polling interval in seconds.
        """
        super().__init__(host, port, jsonrpc_version, logger)
        self.poll_interval = poll_interval
    def _get_result(self, data: Optional[str] = None) -> DbtRpcOutput:
        """Sends a request to the dbt RPC server and continuously polls for the status of a request
        until the state is ``success``.
        """
        out = super()._get_result(data)
        request_token: str = check.not_none(out.result.get("request_token"))
        logs_start = 0
        elapsed_time = -1
        current_state = None
        while True:
            out = self.poll(
                request_token=request_token,
                logs=True,
                logs_start=logs_start,
            )
            logs = out.result.get("logs", [])
            for log in logs:
                self.logger.log(
                    msg=log["message"],
                    level=coerce_valid_log_level(log.get("levelname", "INFO")),
                    extra=log.get("extra"),
                )
            logs_start += len(logs)
            current_state = out.result.get("state")
            # Stop polling if request's state is no longer "running".
            if current_state != "running":
                break
            elapsed_time = out.result.get("elapsed", 0)
            # Sleep for the configured time interval before polling again.
            time.sleep(self.poll_interval)
        if current_state != "success":
            raise Failure(
                description=(
                    f"Request {request_token} finished with state '{current_state}' in "
                    f"{elapsed_time} seconds"
                ),
            )
        return out 
[docs]@resource(
    description="A resource representing a dbt RPC client.",
    config_schema={
        "host": Field(StringSource),
        "port": Field(IntSource, is_required=False, default_value=8580),
    },
)
def dbt_rpc_resource(context) -> DbtRpcResource:
    """This resource defines a dbt RPC client.
    To configure this resource, we recommend using the `configured
    <https://docs.dagster.io/concepts/configuration/configured>`_ method.
    Examples:
        .. code-block:: python
            from dagster_dbt import dbt_rpc_resource
            custom_dbt_rpc_resource = dbt_rpc_resource.configured({"host": "80.80.80.80","port": 8080,})
            @job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
            def dbt_rpc_job():
                # Run ops with `required_resource_keys={"dbt_rpc", ...}`.
    """
    return DbtRpcResource(
        host=context.resource_config["host"], port=context.resource_config["port"]
    ) 
[docs]@resource(
    description="A resource representing a synchronous dbt RPC client.",
    config_schema={
        "host": Field(StringSource),
        "port": Field(IntSource, is_required=False, default_value=8580),
        "poll_interval": Field(IntSource, is_required=False, default_value=1),
    },
)
def dbt_rpc_sync_resource(
    context,
) -> DbtRpcSyncResource:
    """This resource defines a synchronous dbt RPC client, which sends requests to a dbt RPC server,
    and waits for the request to complete before returning.
    To configure this resource, we recommend using the `configured
    <https://docs.dagster.io/concepts/configuration/configured>`_ method.
    Examples:
        .. code-block:: python
            from dagster_dbt import dbt_rpc_sync_resource
            custom_sync_dbt_rpc_resource = dbt_rpc_sync_resource.configured({"host": "80.80.80.80","port": 8080,})
            @job(resource_defs={"dbt_rpc": custom_dbt_rpc_sync_resource})
            def dbt_rpc_sync_job():
                # Run ops with `required_resource_keys={"dbt_rpc", ...}`.
    """
    return DbtRpcSyncResource(
        host=context.resource_config["host"],
        port=context.resource_config["port"],
        poll_interval=context.resource_config["poll_interval"],
    ) 
local_dbt_rpc_resource = cast(
    ResourceDefinition, dbt_rpc_resource.configured({"host": "0.0.0.0", "port": 8580})
)
local_dbt_rpc_resource.__doc__ = """This resource defines a dbt RPC client for an RPC server running
on 0.0.0.0:8580."""