
cloudgenix module

Python3 SDK for the CloudGenix AppFabric

Version: v6.3.2b1

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT



Intended to be a small, lightweight SDK wrapper around the CloudGenix API for easy use. Initial version requires knowledge of JSON/Dict objects for POST/PUT/PATCH operations.


Code Example

Super-simplified example code (rewrite of in ~4 lines of code):

# Import the CloudGenix SDK API constructor and JSON response pretty printer
from cloudgenix import API, jd

# Instantiate the CloudGenix API constructor
sdk = API()

# Call CloudGenix API login using the Interactive helpers (Handle SAML2.0 login and MSP functions too!).

# Print a dump of the list of sites for your selected account



For more info

Python3 SDK for the CloudGenix AppFabric

**Version:** v6.3.2b1

**Author:** CloudGenix

**Copyright:** (c) 2017-2023 CloudGenix, Inc

**License:** MIT

**Location:** <>

#### Synopsis
Intended to be a small, lightweight SDK wrapper around the CloudGenix API for easy use.
Initial version requires knowledge of JSON/Dict objects for POST/PUT/PATCH operations.

#### Requirements
* Active CloudGenix Account
* Python >= 2.7 or >=3.7
* Python modules:
    * Requests - <>
    * Websockets (if Python > 3.6) >= 8.1- <>
    * urllib3 >= 2.0.0 - <>
#### Code Example
Super-simplified example code (rewrite of in ~4 lines of code):

    # Import the CloudGenix SDK API constructor and JSON response pretty printer
    from cloudgenix import API, jd

    # Instantiate the CloudGenix API constructor
    sdk = API()

    # Call CloudGenix API login using the Interactive helpers (Handle SAML2.0 login and MSP functions too!).

    # Print a dump of the list of sites for your selected account

#### License

#### For more info
 * Get help and additional CloudGenix Documentation at <>
 * View the autogenerated documentation in the `docs/` directory, or at <>.
 * View in-python help using `help()` functions. (example: `help(sdk.get.login)`)

from __future__ import unicode_literals
import logging
import json
import re
import sys

import ssl
import requests
import requests.adapters
import urllib3
from http import cookiejar as cookielib

from requests.adapters import HTTPAdapter
import websockets
from .ws_api import WebSockets

from .get_api import Get
from .post_api import Post
from .patch_api import Patch
from .put_api import Put
from .delete_api import Delete
from .interactive import Interactive

# CA Certificate bundle
from .ca_bundle import CG_CA_BUNDLE as _CG_CA_BUNDLE

# python 2 and 3 handling
if sys.version_info >= (3, 7,):
    # Python 3.7 or higher
    text_type = str
    binary_type = bytes
elif sys.version_info >= (3, ):
    raise Exception("ERROR: Python 3.0-3.6 support is deprecated. Please migrate to a 3.7+ Python interpreter.")
    # Python 2.x, is deprecated.
    raise Exception("ERROR: Python 2.x support is deprecated. Please migrate to a 3.7+ Python interpreter.")

Explicit CA bundle for CA Pinning - Root Certificates for the CloudGenix Controller API Endpoint.

Loaded from `cloudgenix.ca_bundle.CG_CA_BUNDLE`

Default set of SSL/TLS ciphers for communication.

Default is meant to be widely compatible with dangerous ciphers removed.

Default SSL/TLS Options

Default TLS v1.2/v1.3 or higher.

__author__ = "CloudGenix Developer Support <>"
__email__ = ""
__copyright__ = "Copyright (c) 2017-2023 CloudGenix, Inc"
__license__ = """
    MIT License
    Copyright (c) 2017-2023 CloudGenix, Inc
    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:
    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

# Set logging to function name
api_logger = logging.getLogger(__name__)
"""logging.getlogger object to enable debug printing via `cloudgenix.API.set_debug`"""

ws_logger = logging.getLogger('websockets')
"""websocket logger is handled slightly differently, so we will have a seperate handle."""

# Version of SDK
version = "6.3.2b1"
"""SDK Version string"""
__version__ = version

# PyPI URL for checking for updates.
update_info_url = ""
"""URL for checking for updates."""

# regex
SDK_BUILD_REGEX = re.compile(
    r'^'                        # start of string
    r'(?P<major>[0-9]+)'        # major number
    r'\.'                       # literal . character
    r'(?P<minor>[0-9]+)'        # minor number
    r'\.'                       # literal . character
    r'(?P<patch>[0-9]+)'        # patch number
    r'b'                        # literal 'b' character
    r'(?P<build>[0-9]+)'        # build number
    r'$'                        # end of string
"""REGEX for parsing SDK builds"""

def jd(api_response):
    JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body.

    Example: `jd(sdk.get.sites())`


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object

    **Returns:** No Return, directly prints all output.

def jdout(api_response):
    JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string
    instead of directly printing content.


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object

    **Returns:** Pretty-formatted text of the Response body
        # attempt to output the cgx_content. should always be a Dict if it exists.
        output = json.dumps(api_response.cgx_content, indent=4)
    except (TypeError, ValueError, AttributeError):
        # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj.
            output = json.dumps(api_response, indent=4)
        except (TypeError, ValueError, AttributeError):
            # Same issue, just raw output the passed data. Let any exceptions happen here.
            output = api_response
    return output

def jd_detailed(api_response, sensitive=False):
    JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response
    objects for troubleshooting.

    Example: `jd_detailed(cgx_sess.get.sites())`


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object
      - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

    **Returns:** No Return, directly prints all output.
    print(jdout_detailed(api_response, sensitive=sensitive))

def jdout_detailed(api_response, sensitive=False):
    JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response
    objects for troubleshooting. This function returns a string instead of directly printing content.


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object
      - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

    **Returns:** Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers,
    and Response Body.
        # try to be super verbose.
        output = "REQUEST: {0} {1}\n".format(api_response.request.method, api_response.request.path_url)
        output += "REQUEST HEADERS:\n"
        for key, value in api_response.request.headers.items():
            # look for sensitive values
            if key.lower() in ['cookie'] and not sensitive:
                # we need to do some work to watch for the AUTH_TOKEN cookie. Split on cookie separator
                cookie_list = value.split('; ')
                muted_cookie_list = []
                for cookie in cookie_list:
                    # check if cookie starts with a permutation of AUTH_TOKEN/whitespace.
                    if cookie.lower().strip().startswith('auth_token='):
                        # first 11 chars of cookie with whitespace removed + mute string.
                        newcookie = cookie.strip()[:11] + "\"<SENSITIVE - NOT SHOWN BY DEFAULT>\""
                # got list of cookies, muted as needed. recombine.
                muted_value = "; ".join(muted_cookie_list)
                output += "\t{0}: {1}\n".format(key, muted_value)
            elif key.lower() in ['x-auth-token'] and not sensitive:
                output += "\t{0}: {1}\n".format(key, "<SENSITIVE - NOT SHOWN BY DEFAULT>")
                output += "\t{0}: {1}\n".format(key, value)
        # if body not present, output blank.
        if not api_response.request.body:
            output += "REQUEST BODY:\n{0}\n\n".format({})
                # Attempt to load JSON from string to make it look beter.
                output += "REQUEST BODY:\n{0}\n\n".format(json.dumps(json.loads(api_response.request.body), indent=4))
            except (TypeError, ValueError, AttributeError):
                # if pretty call above didn't work, just toss it to jdout to best effort it.
                output += "REQUEST BODY:\n{0}\n\n".format(jdout(api_response.request.body))
        output += "RESPONSE: {0} {1}\n".format(api_response.status_code, api_response.reason)
        output += "RESPONSE HEADERS:\n"
        for key, value in api_response.headers.items():
            output += "\t{0}: {1}\n".format(key, value)
            # look for CGX content first.
            output += "RESPONSE DATA:\n{0}".format(json.dumps(api_response.cgx_content, indent=4))
        except (TypeError, ValueError, AttributeError):
            # look for standard response data.
            output += "RESPONSE DATA:\n{0}".format(json.dumps(json.loads(api_response.content), indent=4))
    except (TypeError, ValueError, AttributeError, UnicodeDecodeError):
        # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj.
            output = json.dumps(api_response, indent=4)
        except (TypeError, ValueError, AttributeError):
            # Same issue, just raw output the passed data. Let any exceptions happen here.
            output = api_response
    return output

class CloudGenixAPIError(Exception):
    Custom exception for errors when not exiting.

class TlsHttpAdapter(HTTPAdapter):
    A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context.

    This allows for fine-grained SSL Cipher, Options and CA specification.
    def __init__(self, ssl_context=None, **kwargs):
        if ssl_context is None:
            Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK,
             but is a desirable alternative to nothing.
            self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
            self.ssl_context.options |= DEFAULT_SSL_OPTIONS
            self.ssl_context = ssl_context
        super(TlsHttpAdapter, self).__init__(**kwargs)

    def init_poolmanager(self, *args, **kwargs):
        kwargs['ssl_context'] = self.ssl_context
        return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs)

    def proxy_manager_for(self, *args, **kwargs):
        kwargs['ssl_context'] = self.ssl_context
        return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs)

class API(object):
    Class for interacting with the CloudGenix API.

    Subclass objects are linked to various operations.

     - get: links to `cloudgenix.get_api.Get` for API Get Operations
     - post: links to `cloudgenix.post_api.Post` for API Post Operations
     - put: links to `cloudgenix.put_api.Put` for API Put Operations
     - patch: links to `cloudgenix.patch_api.Patch` for API Patch Operations
     - delete: links to `cloudgenix.delete_api.Delete` for API Delete Operations
    # Global structure, previously sdk_vars
    # Authentication is now stored as cookies, as part of the requests.Session() object.
    # Authentication can also be stored as an 'X-Auth-Token:' header, but cookies take precedence.
    controller = ''
    """Current active controller URL"""

    controller_orig = None
    """Original Controller URL as entered - before Region re-parse"""

    controller_region = None
    """Controller Region, if present."""

    ignore_region = False
    """Ignore regions returned by controller, and use explicit controller only."""

    _debuglevel = 0
    """debug level - set via set_debug()"""

    tenant_id = None
    """Numeric ID of tenant (account) - should be set after initial login from `cloudgenix.get_api.Get.profile` data"""

    tenant_name = None
    """Name of tenant (account), should be set after initial login from `cloudgenix.get_api.Get.profile` data"""

    token_session = None
    """Is this login from a static AUTH_TOKEN (True), or a standard login (False)"""

    is_esp = None
    """Is the current tenant an ESP/MSP?"""

    client_id = None
    """If ESP/MSP, is client currently logged in"""

    address_string = None
    """String representing address, optional - should be pulled from get.profile() data."""

    email = None
    """Email (username) for session"""

    operator_id = None
    """Operator ID of current session."""

    _user_id = None
    """Deprecated (replaced by `operator_id`.) Left for backwards compatibility."""

    _password = None
    """user password - only used when argv passed, and cleared quickly"""

    roles = None
    """Roles list"""

    verify = True
    """Verify SSL certificate."""

    version = None
    """Version string for use once Constructor created."""

    ca_verify_filename = None
    """DEPRECATED: Filename to use for CA verification. Moved to modern python `ssl`, updated to only passing
    filename in event of local file."""

    _ca_verify_file_handle = None
    """File handle for CA verification"""

    _ca_ssl_context = None
    """`ssl` library context for controller connections"""

    _rest_connection_adapter = None
    """`requests.adapters.HTTPAdapter` object for use by requests to request content."""

    rest_call_retry = False
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    rest_call_max_retry = 30
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    rest_call_sleep = 10
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    _rest_call_retry_object = None
    """`urllib3.util.retry.Retry` object for use by SDK to do retries"""

    rest_call_timeout = 240
    """Maximum time to wait for any data from REST server."""

    cache = {}
    """API response cache (Future)"""

    _parent_namespace = None
    """holder for namespace for wrapper classes."""

    _session = None
    """holder for requests.Session() object"""

    _websocket_headers = None
    """holder for WebSocket Headers"""

    update_check = True
    """Notify users of available update to SDK"""

    update_info_url = None
    """Update Info URL for use once Constructor Created."""

    def __init__(self, controller=controller, ssl_verify=verify, update_check=True):
        Create the API constructor object

          - **controller:** Initial Controller URL String
          - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details.
          - **update_check:** Bool to Enable/Disable SDK update check and new release notifications.
        # set version and update url from outer scope.
        self.version = version
        """Version string for use once Constructor created."""

        self.update_info_url = update_info_url
        """Update Info URL for use once Constructor Created."""

        # try:
        if controller and isinstance(controller, (binary_type, text_type)):
            self.controller = controller.lower()
            self.controller_orig = controller.lower()

        # Create Requests Session.
        self._session = requests.Session()

        # set ssl context
        if isinstance(ssl_verify, (binary_type, text_type, bool)):
            self.ssl_verify(ssl_verify, update_adapter=False)

        # Set default REST retry parameters

        # update the HTTP Adapter for requests.

        # handle update check
        if isinstance(update_check, bool):
            self.update_check = update_check

        if update_check:

        # Identify SDK in the User-Agent.
        user_agent = self._session.headers.get('User-Agent')
        if user_agent:
            user_agent += ' (CGX SDK v{0})'.format(self.version)
            user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version)

        # Update Headers
            'Accept': 'application/json',
            'User-Agent': text_type(user_agent)

        # except Exception as e:
        #     raise ValueError("Unable to create Requests session object: {0}.".format(e))
        api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s",

        # Update Headers for WebSocket requests
        websocketlib_name = websockets.__name__
        websocketlib_version = websockets.version.version
        if not websocketlib_name:
            websocketlib_name = 'websockets'
        if not websocketlib_version:
            websocketlib_version = 'UNKNOWN'
        ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name,
        self._websocket_headers = {
            'Accept': 'application/json',
            'User-Agent': text_type(ws_user_agent)

        # Bind API method classes to this object
        subclasses = self._subclass_container()
        self.get = subclasses["get"]()
        """API object link to `cloudgenix.get_api.Get`""" = subclasses["post"]()
        """API object link to `cloudgenix.post_api.Post`"""

        self.put = subclasses["put"]()
        """API object link to `cloudgenix.put_api.Put`"""

        self.patch = subclasses["patch"]()
        """API object link to `cloudgenix.patch_api.Patch`"""

        self.delete = subclasses["delete"]()
        """API object link to `cloudgenix.delete_api.Delete`"""

        self.interactive = subclasses["interactive"]()
        """API object link to `cloudgenix.interactive.Interactive`""" = subclasses["ws"]()
        """API object link to ``"""


    def notify_for_new_version(self):
        Check for a new version of the SDK on API constructor instantiation. If new version found, print
        Notification to STDERR.

        On failure of this check, fail silently.

        **Returns:** No item returned, directly prints notification to `sys.stderr`.

        # broad exception clause, if this fails for any reason just return.
            recommend_update = False
            update_check_resp = requests.get(self.update_info_url, timeout=3)
            web_version = update_check_resp.json()["info"]["version"]
            api_logger.debug("RETRIEVED_VERSION: %s", web_version)

            available_version =
            current_version =

            available_major = available_version.get('major')
            available_minor = available_version.get('minor')
            available_patch = available_version.get('patch')
            available_build = available_version.get('build')
            current_major = current_version.get('major')
            current_minor = current_version.get('minor')
            current_patch = current_version.get('patch')
            current_build = current_version.get('build')

            api_logger.debug("AVAILABLE_VERSION: %s", available_version)
            api_logger.debug("CURRENT_VERSION: %s", current_version)

            # check for major/minor version differences, do not alert for build differences.
            if available_major > current_major:
                recommend_update = True
            elif available_major >= current_major and available_minor > current_minor:
                recommend_update = True
            elif available_major >= current_major and available_minor >= current_minor and \
                    available_patch > current_patch:
                recommend_update = True

            api_logger.debug("NEED_UPDATE: %s", recommend_update)

            # notify.
            if recommend_update:
                sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 "
                                 "months after release of a new version.\n"
                                 "\tLatest Version: {0}\n"
                                 "\tCurrent Version: {1}\n"
                                 "\tFor more info, see ''. Additionally, this "
                                 "message can be suppressed by instantiating the API with API(update_check=False).\n\n"
                                 "".format(web_version, self.version))


        except Exception:
            # just return and continue.

    def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS,
                   tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True):
        Modify ssl verification settings


          - **ssl_verify:**
             - **True:** Verify using builtin BYTE_CA_BUNDLE.
             - **False:** No SSL Verification.
             - ***Str:** Full path to a x509 PEM CA File or bundle.
          - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified.
          - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified.
          - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2`
          - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED`
          - **update_adapter:** bool, call update adapter function after running to auto update adapter.

        **Returns:** No return, mutates the SSL Context / session in place directly.
        self.verify = ssl_verify
        # if verify true/false, set ca_verify_file appropriately
        if isinstance(self.verify, bool):
            if self.verify:  # True
                # load default CA list and set default SSL ciphers.
                self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
                self._session.verify = True

            else:  # False
                # disable warnings for SSL certs.
                self._ca_ssl_context = ssl.SSLContext()
                self._ca_ssl_context.check_hostname = False
                self._ca_ssl_context.verify_mode = ssl.CERT_NONE
                self._session.verify = False

            # Not True/False, assume path to file/dir for Requests
            # set filename/filepath for context
            self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify)
            self._ca_ssl_context.check_hostname = True
            self._session.verify = True

        # set the CA independent values
        self._ca_ssl_context.minimum_version = tls_min
        self._ca_ssl_context.maximum_version = tls_max
        self._ca_ssl_context.options |= options
        # update the session adapter.
        if update_adapter:

    def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0,
                          allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None,
                          backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True,
                          respect_retry_after_header=True, update_adapter=True):
        Modify retry parameters for the SDKs rest call object.

        Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to
        the underlying `requests.Session` object.

        Default retry with total=8 and backoff_factor=0.705883:

         - Try 1, 0 delay (0 total seconds)
         - Try 2, 0 delay (0 total seconds)
         - Try 3, 0.705883 delay (0.705883 total seconds)
         - Try 4, 1.411766 delay (2.117649 total seconds)
         - Try 5, 2.823532 delay (4.941181 total seconds)
         - Try 6, 5.647064 delay (10.588245 total seconds)
         - Try 7, 11.294128 delay (21.882373 total seconds)
         - Try 8, 22.588256 delay (44.470629 total seconds)
         - Try 9, 45.176512 delay (89.647141 total seconds)
         - Try 10, 90.353024 delay (180.000165 total seconds)


          - **total:** int, Total number of retries to allow. Takes precedence over other counts.
          - **connect:** int, How many connection-related errors to retry on.
          - **read:** int, How many times to retry on read errors.
          - **redirect:** int, How many redirects to perform. loops.
          - **status:** int, How many times to retry on bad status codes.
          - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors.
          - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on.
          - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on.
          - **backoff_factor:** float, A backoff factor to apply between attempts after the second try.
          - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response.
          - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses.
          - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes.
          - **adapter_url:** string, URL match for these retry values (default `https://`)
          - **update_adapter:** bool, call update adapter function after running to auto update adapter.

        **Returns:** No return, mutates the retry / session directly
        # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified.
        if status_forcelist is None:
            status_forcelist = (413, 429, 502, 503, 504)

        retry = urllib3.util.retry.Retry(total=total,

        # set updated retry object.
        self._rest_call_retry_object = retry
        # call update to mount/remount session adapter
        if update_adapter:

    def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"):
        Mount/remount the HTTPS session adapter after changes that affect it.


          - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter.
          - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object.
          - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context.
          - **adapter_url:** URL to use for matching requests. Defaults to 'https://'

        **Returns:** No return, mutates the session adapter directly

        # use default or specified session objects.
        use_retry = retry or self._rest_call_retry_object
        use_ssl_context = ssl_context or self._ca_ssl_context

        # if object adapter has not been built, create a new one.
        if adapter is None:
            # does an effective retry object exist?
            if use_retry is None:
                self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context)
                self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context,
            # if adapter is specified, nothing else should be specified.
            if retry is not None or ssl_context is not None:
                self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored."
                                   " retry and ssl_contexts need to be set in the adapter itself.")

        # Pick the specified or default adapter.
        use_adapter = adapter or self._rest_connection_adapter

        # mount the adapter.
        self._session.mount(adapter_url, use_adapter)


    def view_rest_retry(self, url=None):
        View current rest retry settings in the `requests.Session()` object


          - **url:** URL to use to determine retry methods for. Defaults to 'https://'

        **Returns:** Dict, Key header, value is header value.
        if url is None:
            url = "https://"
        return vars(self._session.get_adapter(url).max_retries)

    def expose_session(self):
        Call to expose the Requests Session object

        **Returns:** `requests.Session` object
        return self._session

    def add_headers(self, headers):
        Permanently add/overwrite headers to session.


          - **headers:** dict with header/value

        **Returns:** Mutates `requests.Session()` object, no return.

    def remove_header(self, header):
        Permanently remove a single header from session


          - **header:** str of single header to remove

        **Returns:** Mutates `requests.Session()` object, no return.
        # check for header first. Return silently if it does not exist.
        if self._session.headers.get(header) is not None:
            del self._session.headers[header]

    def view_headers(self):
        View current headers in the `requests.Session()` object

        **Returns:** Dict, Key header, value is header value.
        return dict(self._session.headers)

    def websocket_add_headers(self, headers):
        Permanently add/overwrite headers to the `API()` WebSocket object


          - **headers:** dict with header/value

        **Returns:** Mutates `API()` object, no return.


    def websocket_remove_header(self, header):
        Permanently remove a single header from the `API()` WebSocket object


          - **header:** str of single header to remove

        **Returns:** Mutates `API()` object, no return.

        # check for header first. Return silently if it does not exist.
        if self._websocket_headers.get(header) is not None:
            del self._websocket_headers[header]

    def websocket_view_headers(self):
        View current headers in the `API()` WebSocket object

        **Returns:** Dict, Key header, value is header value.
        return dict(self._websocket_headers)

    def view_cookies(self):
        View current cookies in the `requests.Session()` object

        **Returns:** List of Dicts, one cookie per Dict.
        return_list = []
        for cookie in self._session.cookies:

        return return_list

    def set_debug(self, debuglevel, set_format=None, set_handler=None):
        Change the debug level of the API


          - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter.
          - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default

        **Returns:** No item returned.
        # set the logging formatter and stream handle
        if set_format is None:
            # default formatter
            api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
        elif not isinstance(set_format, text_type):
            # not a valid format string. Set to default.
            api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
            # valid logging string.
            api_formatter = logging.Formatter(set_format)

        # set the logging handler if supported handler is not passed.
        if set_handler is None:
            # Default handler
            api_handler = logging.StreamHandler()
        elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler,
            # not a valid handler. Set to default handler.
            api_handler = logging.StreamHandler()
            # passed valid handler
            api_handler = set_handler

        # set handler to use format.

        # Get the loggers from other modules in prep for setting new handlers.
        urllib3_logger = logging.getLogger("requests.packages.urllib3")
        urllib3_retry_logger = logging.getLogger("urllib3.util.retry")
        cookie_logger = logging.getLogger("http.cookiejar")

        # remove existing handlers
        api_logger.handlers = []
        urllib3_logger.handlers = []
        urllib3_retry_logger.handlers = []
        cookie_logger.handlers = []
        ws_logger.handlers = []

        # ok, lets set the new handlers.
        if isinstance(debuglevel, int):
            self._debuglevel = debuglevel

        if self._debuglevel == 1:

        elif self._debuglevel == 2:
            cookielib.debug = True

        elif self._debuglevel >= 3:
            cookielib.debug = True

            # set to warning
            cookielib.debug = False


    def _subclass_container(self):
        Call subclasses via function to allow passing parent namespace to subclasses.

        **Returns:** dict with subclass references.
        _parent_class = self

        return_object = {}

        class GetWrapper(Get):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['get'] = GetWrapper

        class PostWrapper(Post):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['post'] = PostWrapper

        class PutWrapper(Put):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['put'] = PutWrapper

        class PatchWrapper(Patch):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['patch'] = PatchWrapper

        class DeleteWrapper(Delete):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['delete'] = DeleteWrapper

        class InteractiveWrapper(Interactive):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['interactive'] = InteractiveWrapper

        class WebSocketsWrapper(WebSockets):

            def __init__(self):
                self._parent_class = _parent_class

        return_object['ws'] = WebSocketsWrapper

        return return_object

    def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False,
                  retry=None, max_retry=None, retry_sleep=None):
        Generic REST call worker function


          - **url:** URL for the REST call
          - **method:** METHOD for the REST call
          - **data:** Optional DATA for the call (for POST/PUT/etc.)
          - **sensitive:** Flag if content request/response should be hidden from logging functions
          - **timeout:** Requests Timeout
          - **content_json:** Bool on whether the Content-Type header should be set to application/json
          - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text.
          - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
          - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
          - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.

        **Returns:** Requests.Response object, extended with:

          - **cgx_status**: Bool, True if a successful CloudGenix response, False if error.
          - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses
          will be converted to a Dict response.
          - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True.
          - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True.

        # pull retry related items from Constructor if not specified.
        if timeout is None:
            timeout = self.rest_call_timeout
        if retry is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")
        if max_retry is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")
        if retry_sleep is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")

        # Get logging level, use this to bypass logging functions with possible large content if not set.
        logger_level = api_logger.getEffectiveLevel()

        # populate headers and cookies from session.
        if content_json and method.lower() not in ['get', 'delete']:
            headers = {
                'Content-Type': 'application/json'
            headers = {}

        # add session headers
        cookie = self._session.cookies.get_dict()

        # make sure data is populated if present.
        if isinstance(data, (list, dict)):
            data = json.dumps(data)

        api_logger.debug('REST_CALL URL = %s', url)

        # make request
            if not sensitive:
                api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n',
                                 method.upper(), url, headers, cookie, data)

            # Actual request
            response = self._session.request(method, url, data=data, stream=True, timeout=timeout,
                                             headers=headers, allow_redirects=False)

            # Request complete - lets parse.
            # if it's a non-CGX-good response, return with cgx_status = False
            if response.status_code not in [,

                # Simple JSON debug
                if not sensitive:
                        api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                            json.loads(text_type(response.headers)), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                        api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE: %s\n', text_type(response.text))
                    api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')

                api_logger.debug("Error, non-200 response received: %s", response.status_code)

                # CGX extend requests.Response for return
                response.cgx_status = False
                response.cgx_content = self._catch_nonjson_streamresponse(response.text)

                # CGX extend requests.Response for any errors/warnings.
                response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
                response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)

                # We are in a failed request. If no error text in response, give the response code and detail.
                if response.cgx_errors is None:
                    response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code))

                return response


                # Simple JSON debug
                if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET):
                        api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                            json.loads(text_type(response.headers)), indent=4))
                        api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                        api_logger.debug('RESPONSE: %s\n', text_type(response.text))
                elif sensitive:
                    api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')

                # CGX extend requests.Response for return
                response.cgx_status = True
                response.cgx_content = self._catch_nonjson_streamresponse(response.text)

                # CGX extend requests.Response for any errors/warnings.
                response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
                response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
                return response
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\
                as e:

  "Error, %s.", text_type(e))

            # make a requests.Response object for return since we didn't get one.
            response = requests.Response

            # CGX extend requests.Response for return
            response.cgx_status = False
            response.cgx_content = {
                '_error': [
                        'message': 'REST Request Exception: {}'.format(e),
                        'data': {},

            # CGX extend requests.Response for any errors/warnings.
            response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
            response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
            return response

    def websocket_call(self, url, *args, **kwargs):
        Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session.


          - **url:** URL for the REST call
          - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below)

        **Returns:** `websocket.client.Connect` object.

        **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will
        be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK
        auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see

        headers = {}
        # add session headers
        # Get cookies from requests.
        cookies = self._session.cookies.get_dict()

        # create cookie header from the cookies in Requests
        headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()])

        # check for host header
        host_header = headers.get("Host")
        force_host_arg = None
        if host_header is not None:

            # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass
            # a separate IP to connect to as 'host' argument to `websockets.client.Connect`.
            # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host
            # header, and remove the host header - then send the original value to `Connect` as host kwarg.

            # split controller at "//" ( to get host.
            controller_host = self.controller.split("//")[1]

            force_host_arg = controller_host
            # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words)
            new_url = url.replace(force_host_arg, host_header, 1)
            api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url))
            url = new_url
            # delete the host header
            del headers["Host"]

        # convert the headers dictionary to a list of tuples.
        header_tuple_list = [(key, value) for key, value in headers.items()]

        # Create argument tuple
        ws_args = (url,) + args

        # Create keyword args
        ws_kwargs = {
            "ssl": self._ca_ssl_context,
            "extra_headers": header_tuple_list
        # check for force host arg
        if force_host_arg is not None:
            api_logger.debug("Forcing connection to host {0}".format(force_host_arg))
            ws_kwargs["host"] = force_host_arg

        # Override automatic with any manually passed kwargs

        return websockets.connect(*ws_args, **ws_kwargs)

    def parse_auth_token(self, auth_token):
        Break auth_token up into its constituent values.


          - **auth_token:** Auth_token string

        **Returns:** dict with Auth Token constituents
        # remove the random security key value from the front of the auth_token
        auth_token_cleaned = auth_token.split('-', 1)[1]
        # URL Decode the Auth Token
        auth_token_decoded = self.url_decode(auth_token_cleaned)
        # Create a new dict to hold the response.
        auth_dict = {}

        # Parse the token
        for key_value in auth_token_decoded.split("&"):
            key_value_list = key_value.split("=")
            # check for valid token parts
            if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]:
                auth_dict[key_value_list[0]] = key_value_list[1]

        # Return the dict of key/values in the token.
        return auth_dict

    def update_region_to_controller(self, region):
        Update the controller string with dynamic region info.
        Controller string should end up as `<name[-env]>.<region>`


          - **region:** region string.

        **Returns:** No return value, mutates the controller in the class namespace
        # default region position in a list
        region_position = 1

        # Check for a global "ignore region" flag
        if self.ignore_region:
            # bypass
            api_logger.debug("IGNORE_REGION set, not updating controller region.")

        api_logger.debug("Updating Controller Region")
        api_logger.debug("CONTROLLER = %s", self.controller)
        api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig)
        api_logger.debug("CONTROLLER_REGION = %s", self.controller_region)

        # Check if this is an initial region use or an update region use
        if self.controller_orig:
            controller_base = self.controller_orig
            controller_base = self.controller
            self.controller_orig = self.controller

        # splice controller string
        controller_full_part_list = controller_base.split('.')

        for idx, part in enumerate(controller_full_part_list):
            # is the region already in the controller string?
            if region == part:
                # yes, controller already has appropriate region
                api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx)
                # update region if it is not already set.
                if self.controller_region != region:
                    self.controller_region = region
                    api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)
                # Update controller if not already matching
                if self.controller != controller_base:
                    self.controller = controller_base
                    api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)

        controller_part_count = len(controller_full_part_list)

        # handle short domain case
        if controller_part_count > 1:
            # insert region
            controller_full_part_list[region_position] = region
            self.controller = ".".join(controller_full_part_list)
            # short domain, just add region
            self.controller = ".".join(controller_full_part_list) + '.' + region

        # update SDK vars with region info
        self.controller_orig = controller_base
        self.controller_region = region

        api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)
        api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig)
        api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)

    def parse_region(self, login_response):
        Return region from a successful login response.


          - **login_response:** requests.Response from a successful login.

        **Returns:** region name.
        auth_token = login_response.cgx_content['x_auth_token']
        auth_token_dict = self.parse_auth_token(auth_token)
        auth_region = auth_token_dict.get('region')
        return auth_region

    def reparse_login_cookie_after_region_update(self, login_response):
        Sometimes, login cookie gets sent with region info instead of This function
        re-parses the original login request and applies cookies to the session if they now match the new region.


          - **login_response:** requests.Response from a non-region login.

        **Returns:** updates API() object directly, no return.

        login_url = login_response.request.url
        api_logger.debug("ORIGINAL REQUEST URL = %s", login_url)
        # replace old controller with new controller.
        login_url_new = login_url.replace(self.controller_orig, self.controller)
        api_logger.debug("UPDATED REQUEST URL = %s", login_url_new)
        # reset login url with new region
        login_response.request.url = login_url_new
        # prep cookie jar parsing
        req = requests.cookies.MockRequest(login_response.request)
        res = requests.cookies.MockResponse(login_response.raw._original_response.msg)
        # extract cookies to session cookie jar.
        self._session.cookies.extract_cookies(res, req)

    def _catch_nonjson_streamresponse(rawresponse):
        Validate a streamed response is JSON. Return a Python dictionary either way.


          - **rawresponse:** Streamed Response from Requests.

        **Returns:** Dictionary
        # attempt to load response for return.
            response = json.loads(rawresponse)
        except (ValueError, TypeError):
            if rawresponse:
                response = {
                    '_error': [
                            'message': 'Response not in JSON format.',
                            'data': rawresponse,
                # in case of null response, return empty dict.
                response = {}

        return response

    def url_decode(url):
        URL Decode function using REGEX


          - **url:** URLENCODED text string

        **Returns:** Non URLENCODED string
        return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(, 16)), url)

    def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError):
        Non-recoverable error, write message to STDERR and raise exception


          - **message:** Message text
          - **resp:** Optional - CloudGenix SDK Response object
          - **cr:** Optional - Use (or not) Carriage Returns.
          - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError`

        **Returns:** No Return, throws exception.
        output = "ERROR: " + str(message)
        if cr:
            output += "\n"
        if resp is not None:
            output2 = str(jdout_detailed(resp))
            if cr:
                output2 += "\n"
        raise exception(message)

    def throw_warning(message, resp=None, cr=True):
        Recoverable Warning.


          - **message:** Message text
          - **resp:** Optional - CloudGenix SDK Response object
          - **cr:** Optional - Use (or not) Carriage Returns.

        **Returns:** No Return.
        output = "WARNING: " + str(message)
        if cr:
            output += "\n"
        if resp is not None:
            output2 = str(jdout_detailed(resp))
            if cr:
                output2 += "\n"

    def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'):
        Extract list of items from a CloudGenix API Response object.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **error_label:** Optional - text to describe operation on error.
          - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response.
          - **items_key:** Optional - Text for items key to extract (default 'items')

        **Returns:** list of 'items' objects.

        if pass_code_list is None:
            pass_code_list = [404, 400]

        items = resp_object.cgx_content.get(items_key)

        if resp_object.cgx_status and items is not None:
            return items

        # handle 404 and other error codes for certain APIs where objects may not exist
        elif resp_object.status_code in pass_code_list:
            return [{}]

            if error_label is not None:
                self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object)
                return [{}]
                self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object)
                return [{}]

    def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None):
        Build key/value lookup dictionary from a list of dictionaries with specified key/value entries.


          - **list_content:** List of dicts to derive lookup structs from
          - **key_val:** Optional - Value to extract from entry to be key
          - **value_val:** Optional - Value to extract from entry to be value
          - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache`
          - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked.

        **Returns:** Lookup Dictionary
        if nag_cache and isinstance(nag_cache, list):
            already_nagged_dup_keys = nag_cache
            already_nagged_dup_keys = []

        lookup_dict = {}
        blacklist_duplicate_keys = []
        blacklist_duplicate_entries = []

        for item in list_content:
            item_key = item.get(key_val)
            item_value = item.get(value_val)
            # print(item_key, item_value)
            if item_key and item_value is not None:
                # check if it's a duplicate key.
                if str(item_key) in lookup_dict:
                    # First duplicate we've seen - save for warning.
                    duplicate_value = lookup_dict.get(item_key)
                    blacklist_duplicate_entries.append({item_key: duplicate_value})
                    blacklist_duplicate_entries.append({item_key: item_value})
                    # remove from lookup dict to prevent accidental overlap usage
                    del lookup_dict[str(item_key)]

                # check if it was a third+ duplicate key for a previous key
                elif item_key in blacklist_duplicate_keys:
                    # save for warning.
                    blacklist_duplicate_entries.append({item_key: item_value})

                    # no duplicates, append
                    lookup_dict[str(item_key)] = item_value

        for duplicate_key in blacklist_duplicate_keys:
            matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry]
            # check if force_nag set and if not, has key already been notified to the end user.
            if force_nag or duplicate_key not in already_nagged_dup_keys:
                    "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller,"
                    " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries)
                # we've now notified, add to notified list.
        return lookup_dict

    def pull_content_error(resp_object, raw=False):
        Parse API response object, return error detail text for printing on error in response content.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False.

        **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors.
        api_logger.debug('pull_content_error function:')

            # attempt to grab the cgx_content. should always be a Dict if it exists.
            data = resp_object.cgx_content
        except (TypeError, ValueError, AttributeError):
            # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
            # extended `requests.Response` object.
            data = resp_object

        if not isinstance(data, dict):
            # fast fail if data isn't correct format.
            api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object))
            return None

        parsed_messages = []
        errors = []

        if isinstance(data, dict):
            # got a parsed response.
            errors = data.get('_error')
            if isinstance(errors, list):
                # Some errors in the response
                if raw is True:
                    # just return raw error list
                    return errors
                    for error in errors:
                        code = error.get('code')
                        message = error.get('message')
                        if code and message:
                            parsed_messages.append("{0} ({1})".format(message, code))
            elif errors is None:
                # no errors, ensure list and empty.
                errors = []
                # not a list.. put whatever it is into a list.
                errors = [errors]

        api_logger.debug("ERRORS: %s", errors)
        api_logger.debug("PARSED_ERRORS: %s", parsed_messages)

        # is parsed_messages empty and errors exist? dump errors as txt
        if not parsed_messages and len(errors) > 1:
            return text_type(errors)
        elif len(parsed_messages) == 1:
            return text_type(parsed_messages[0])
        elif len(parsed_messages) > 1:
            # return comma separated string of errors
            return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
            # no errors
            return None

    def pull_content_warning(resp_object, raw=False):
        Parse API response object, return text for printing on warning in response.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False.

        **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings.
        api_logger.debug('pull_content_warning function:')

            # attempt to grab the cgx_content. should always be a Dict if it exists.
            data = resp_object.cgx_content
        except (TypeError, ValueError, AttributeError):
            # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
            # extended `requests.Response` object.
            data = resp_object

        if not isinstance(data, dict):
            # fast fail if data isn't correct format.
            api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}'
            return None

        parsed_messages = []
        warnings = []

        if isinstance(data, dict):
            # got a parsed response.
            warnings = data.get('_warning')
            if isinstance(warnings, list):
                # Some warnings in the response
                if raw is True:
                    # just return raw warning list
                    return warnings
                    for warning in warnings:
                        code = warning.get('code')
                        message = warning.get('message')
                        if code and message:
                            parsed_messages.append("{0} ({1})".format(message, code))
            elif warnings is None:
                # no warnings, ensure list and empty.
                warnings = []
                # not a list.. put whatever it is into a list.
                warnings = [warnings]

        api_logger.debug("WARNINGS: %s", warnings)
        api_logger.debug("PARSED_WARNINGS: %s", parsed_messages)

        # is parsed_messages empty and warnings exist? dump warnings as txt
        if not parsed_messages and len(warnings) > 1:
            return text_type(warnings)
        elif len(parsed_messages) == 1:
            return text_type(parsed_messages[0])
        elif len(parsed_messages) > 1:
            # return comma separated string of warnings
            return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
            # no warnings
            return None

Module variables


Explicit CA bundle for CA Pinning - Root Certificates for the CloudGenix Controller API Endpoint.

Loaded from cloudgenix.ca_bundle.CG_CA_BUNDLE


Default set of SSL/TLS ciphers for communication.

Default is meant to be widely compatible with dangerous ciphers removed.


Default SSL/TLS Options

Default TLS v1.2/v1.3 or higher.


REGEX for parsing SDK builds

var api_logger

logging.getlogger object to enable debug printing via set_debug

var update_info_url

URL for checking for updates.

var version

SDK Version string

var ws_logger

websocket logger is handled slightly differently, so we will have a seperate handle.


def jd(


JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body.

Example: jd(sdk.get.sites())


  • api_response: A CloudGenix-attribute extended requests.Response object

Returns: No Return, directly prints all output.

def jd(api_response):
    JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body.

    Example: `jd(sdk.get.sites())`


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object

    **Returns:** No Return, directly prints all output.

def jd_detailed(

api_response, sensitive=False)

JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting.

Example: jd_detailed(cgx_sess.get.sites())


  • api_response: A CloudGenix-attribute extended requests.Response object
  • sensitive: Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

Returns: No Return, directly prints all output.

def jd_detailed(api_response, sensitive=False):
    JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response
    objects for troubleshooting.

    Example: `jd_detailed(cgx_sess.get.sites())`


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object
      - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

    **Returns:** No Return, directly prints all output.
    print(jdout_detailed(api_response, sensitive=sensitive))

def jdout(


JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string instead of directly printing content.


  • api_response: A CloudGenix-attribute extended requests.Response object

Returns: Pretty-formatted text of the Response body

def jdout(api_response):
    JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string
    instead of directly printing content.


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object

    **Returns:** Pretty-formatted text of the Response body
        # attempt to output the cgx_content. should always be a Dict if it exists.
        output = json.dumps(api_response.cgx_content, indent=4)
    except (TypeError, ValueError, AttributeError):
        # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj.
            output = json.dumps(api_response, indent=4)
        except (TypeError, ValueError, AttributeError):
            # Same issue, just raw output the passed data. Let any exceptions happen here.
            output = api_response
    return output

def jdout_detailed(

api_response, sensitive=False)

JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. This function returns a string instead of directly printing content.


  • api_response: A CloudGenix-attribute extended requests.Response object
  • sensitive: Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

Returns: Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers, and Response Body.

def jdout_detailed(api_response, sensitive=False):
    JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response
    objects for troubleshooting. This function returns a string instead of directly printing content.


      - **api_response:** A CloudGenix-attribute extended `requests.Response` object
      - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers).

    **Returns:** Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers,
    and Response Body.
        # try to be super verbose.
        output = "REQUEST: {0} {1}\n".format(api_response.request.method, api_response.request.path_url)
        output += "REQUEST HEADERS:\n"
        for key, value in api_response.request.headers.items():
            # look for sensitive values
            if key.lower() in ['cookie'] and not sensitive:
                # we need to do some work to watch for the AUTH_TOKEN cookie. Split on cookie separator
                cookie_list = value.split('; ')
                muted_cookie_list = []
                for cookie in cookie_list:
                    # check if cookie starts with a permutation of AUTH_TOKEN/whitespace.
                    if cookie.lower().strip().startswith('auth_token='):
                        # first 11 chars of cookie with whitespace removed + mute string.
                        newcookie = cookie.strip()[:11] + "\"<SENSITIVE - NOT SHOWN BY DEFAULT>\""
                # got list of cookies, muted as needed. recombine.
                muted_value = "; ".join(muted_cookie_list)
                output += "\t{0}: {1}\n".format(key, muted_value)
            elif key.lower() in ['x-auth-token'] and not sensitive:
                output += "\t{0}: {1}\n".format(key, "<SENSITIVE - NOT SHOWN BY DEFAULT>")
                output += "\t{0}: {1}\n".format(key, value)
        # if body not present, output blank.
        if not api_response.request.body:
            output += "REQUEST BODY:\n{0}\n\n".format({})
                # Attempt to load JSON from string to make it look beter.
                output += "REQUEST BODY:\n{0}\n\n".format(json.dumps(json.loads(api_response.request.body), indent=4))
            except (TypeError, ValueError, AttributeError):
                # if pretty call above didn't work, just toss it to jdout to best effort it.
                output += "REQUEST BODY:\n{0}\n\n".format(jdout(api_response.request.body))
        output += "RESPONSE: {0} {1}\n".format(api_response.status_code, api_response.reason)
        output += "RESPONSE HEADERS:\n"
        for key, value in api_response.headers.items():
            output += "\t{0}: {1}\n".format(key, value)
            # look for CGX content first.
            output += "RESPONSE DATA:\n{0}".format(json.dumps(api_response.cgx_content, indent=4))
        except (TypeError, ValueError, AttributeError):
            # look for standard response data.
            output += "RESPONSE DATA:\n{0}".format(json.dumps(json.loads(api_response.content), indent=4))
    except (TypeError, ValueError, AttributeError, UnicodeDecodeError):
        # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj.
            output = json.dumps(api_response, indent=4)
        except (TypeError, ValueError, AttributeError):
            # Same issue, just raw output the passed data. Let any exceptions happen here.
            output = api_response
    return output


class API

Class for interacting with the CloudGenix API.

Subclass objects are linked to various operations.

class API(object):
    Class for interacting with the CloudGenix API.

    Subclass objects are linked to various operations.

     - get: links to `cloudgenix.get_api.Get` for API Get Operations
     - post: links to `cloudgenix.post_api.Post` for API Post Operations
     - put: links to `cloudgenix.put_api.Put` for API Put Operations
     - patch: links to `cloudgenix.patch_api.Patch` for API Patch Operations
     - delete: links to `cloudgenix.delete_api.Delete` for API Delete Operations
    # Global structure, previously sdk_vars
    # Authentication is now stored as cookies, as part of the requests.Session() object.
    # Authentication can also be stored as an 'X-Auth-Token:' header, but cookies take precedence.
    controller = ''
    """Current active controller URL"""

    controller_orig = None
    """Original Controller URL as entered - before Region re-parse"""

    controller_region = None
    """Controller Region, if present."""

    ignore_region = False
    """Ignore regions returned by controller, and use explicit controller only."""

    _debuglevel = 0
    """debug level - set via set_debug()"""

    tenant_id = None
    """Numeric ID of tenant (account) - should be set after initial login from `cloudgenix.get_api.Get.profile` data"""

    tenant_name = None
    """Name of tenant (account), should be set after initial login from `cloudgenix.get_api.Get.profile` data"""

    token_session = None
    """Is this login from a static AUTH_TOKEN (True), or a standard login (False)"""

    is_esp = None
    """Is the current tenant an ESP/MSP?"""

    client_id = None
    """If ESP/MSP, is client currently logged in"""

    address_string = None
    """String representing address, optional - should be pulled from get.profile() data."""

    email = None
    """Email (username) for session"""

    operator_id = None
    """Operator ID of current session."""

    _user_id = None
    """Deprecated (replaced by `operator_id`.) Left for backwards compatibility."""

    _password = None
    """user password - only used when argv passed, and cleared quickly"""

    roles = None
    """Roles list"""

    verify = True
    """Verify SSL certificate."""

    version = None
    """Version string for use once Constructor created."""

    ca_verify_filename = None
    """DEPRECATED: Filename to use for CA verification. Moved to modern python `ssl`, updated to only passing
    filename in event of local file."""

    _ca_verify_file_handle = None
    """File handle for CA verification"""

    _ca_ssl_context = None
    """`ssl` library context for controller connections"""

    _rest_connection_adapter = None
    """`requests.adapters.HTTPAdapter` object for use by requests to request content."""

    rest_call_retry = False
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    rest_call_max_retry = 30
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    rest_call_sleep = 10
    """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`."""

    _rest_call_retry_object = None
    """`urllib3.util.retry.Retry` object for use by SDK to do retries"""

    rest_call_timeout = 240
    """Maximum time to wait for any data from REST server."""

    cache = {}
    """API response cache (Future)"""

    _parent_namespace = None
    """holder for namespace for wrapper classes."""

    _session = None
    """holder for requests.Session() object"""

    _websocket_headers = None
    """holder for WebSocket Headers"""

    update_check = True
    """Notify users of available update to SDK"""

    update_info_url = None
    """Update Info URL for use once Constructor Created."""

    def __init__(self, controller=controller, ssl_verify=verify, update_check=True):
        Create the API constructor object

          - **controller:** Initial Controller URL String
          - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details.
          - **update_check:** Bool to Enable/Disable SDK update check and new release notifications.
        # set version and update url from outer scope.
        self.version = version
        """Version string for use once Constructor created."""

        self.update_info_url = update_info_url
        """Update Info URL for use once Constructor Created."""

        # try:
        if controller and isinstance(controller, (binary_type, text_type)):
            self.controller = controller.lower()
            self.controller_orig = controller.lower()

        # Create Requests Session.
        self._session = requests.Session()

        # set ssl context
        if isinstance(ssl_verify, (binary_type, text_type, bool)):
            self.ssl_verify(ssl_verify, update_adapter=False)

        # Set default REST retry parameters

        # update the HTTP Adapter for requests.

        # handle update check
        if isinstance(update_check, bool):
            self.update_check = update_check

        if update_check:

        # Identify SDK in the User-Agent.
        user_agent = self._session.headers.get('User-Agent')
        if user_agent:
            user_agent += ' (CGX SDK v{0})'.format(self.version)
            user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version)

        # Update Headers
            'Accept': 'application/json',
            'User-Agent': text_type(user_agent)

        # except Exception as e:
        #     raise ValueError("Unable to create Requests session object: {0}.".format(e))
        api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s",

        # Update Headers for WebSocket requests
        websocketlib_name = websockets.__name__
        websocketlib_version = websockets.version.version
        if not websocketlib_name:
            websocketlib_name = 'websockets'
        if not websocketlib_version:
            websocketlib_version = 'UNKNOWN'
        ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name,
        self._websocket_headers = {
            'Accept': 'application/json',
            'User-Agent': text_type(ws_user_agent)

        # Bind API method classes to this object
        subclasses = self._subclass_container()
        self.get = subclasses["get"]()
        """API object link to `cloudgenix.get_api.Get`""" = subclasses["post"]()
        """API object link to `cloudgenix.post_api.Post`"""

        self.put = subclasses["put"]()
        """API object link to `cloudgenix.put_api.Put`"""

        self.patch = subclasses["patch"]()
        """API object link to `cloudgenix.patch_api.Patch`"""

        self.delete = subclasses["delete"]()
        """API object link to `cloudgenix.delete_api.Delete`"""

        self.interactive = subclasses["interactive"]()
        """API object link to `cloudgenix.interactive.Interactive`""" = subclasses["ws"]()
        """API object link to ``"""


    def notify_for_new_version(self):
        Check for a new version of the SDK on API constructor instantiation. If new version found, print
        Notification to STDERR.

        On failure of this check, fail silently.

        **Returns:** No item returned, directly prints notification to `sys.stderr`.

        # broad exception clause, if this fails for any reason just return.
            recommend_update = False
            update_check_resp = requests.get(self.update_info_url, timeout=3)
            web_version = update_check_resp.json()["info"]["version"]
            api_logger.debug("RETRIEVED_VERSION: %s", web_version)

            available_version =
            current_version =

            available_major = available_version.get('major')
            available_minor = available_version.get('minor')
            available_patch = available_version.get('patch')
            available_build = available_version.get('build')
            current_major = current_version.get('major')
            current_minor = current_version.get('minor')
            current_patch = current_version.get('patch')
            current_build = current_version.get('build')

            api_logger.debug("AVAILABLE_VERSION: %s", available_version)
            api_logger.debug("CURRENT_VERSION: %s", current_version)

            # check for major/minor version differences, do not alert for build differences.
            if available_major > current_major:
                recommend_update = True
            elif available_major >= current_major and available_minor > current_minor:
                recommend_update = True
            elif available_major >= current_major and available_minor >= current_minor and \
                    available_patch > current_patch:
                recommend_update = True

            api_logger.debug("NEED_UPDATE: %s", recommend_update)

            # notify.
            if recommend_update:
                sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 "
                                 "months after release of a new version.\n"
                                 "\tLatest Version: {0}\n"
                                 "\tCurrent Version: {1}\n"
                                 "\tFor more info, see ''. Additionally, this "
                                 "message can be suppressed by instantiating the API with API(update_check=False).\n\n"
                                 "".format(web_version, self.version))


        except Exception:
            # just return and continue.

    def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS,
                   tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True):
        Modify ssl verification settings


          - **ssl_verify:**
             - **True:** Verify using builtin BYTE_CA_BUNDLE.
             - **False:** No SSL Verification.
             - ***Str:** Full path to a x509 PEM CA File or bundle.
          - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified.
          - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified.
          - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2`
          - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED`
          - **update_adapter:** bool, call update adapter function after running to auto update adapter.

        **Returns:** No return, mutates the SSL Context / session in place directly.
        self.verify = ssl_verify
        # if verify true/false, set ca_verify_file appropriately
        if isinstance(self.verify, bool):
            if self.verify:  # True
                # load default CA list and set default SSL ciphers.
                self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
                self._session.verify = True

            else:  # False
                # disable warnings for SSL certs.
                self._ca_ssl_context = ssl.SSLContext()
                self._ca_ssl_context.check_hostname = False
                self._ca_ssl_context.verify_mode = ssl.CERT_NONE
                self._session.verify = False

            # Not True/False, assume path to file/dir for Requests
            # set filename/filepath for context
            self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify)
            self._ca_ssl_context.check_hostname = True
            self._session.verify = True

        # set the CA independent values
        self._ca_ssl_context.minimum_version = tls_min
        self._ca_ssl_context.maximum_version = tls_max
        self._ca_ssl_context.options |= options
        # update the session adapter.
        if update_adapter:

    def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0,
                          allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None,
                          backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True,
                          respect_retry_after_header=True, update_adapter=True):
        Modify retry parameters for the SDKs rest call object.

        Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to
        the underlying `requests.Session` object.

        Default retry with total=8 and backoff_factor=0.705883:

         - Try 1, 0 delay (0 total seconds)
         - Try 2, 0 delay (0 total seconds)
         - Try 3, 0.705883 delay (0.705883 total seconds)
         - Try 4, 1.411766 delay (2.117649 total seconds)
         - Try 5, 2.823532 delay (4.941181 total seconds)
         - Try 6, 5.647064 delay (10.588245 total seconds)
         - Try 7, 11.294128 delay (21.882373 total seconds)
         - Try 8, 22.588256 delay (44.470629 total seconds)
         - Try 9, 45.176512 delay (89.647141 total seconds)
         - Try 10, 90.353024 delay (180.000165 total seconds)


          - **total:** int, Total number of retries to allow. Takes precedence over other counts.
          - **connect:** int, How many connection-related errors to retry on.
          - **read:** int, How many times to retry on read errors.
          - **redirect:** int, How many redirects to perform. loops.
          - **status:** int, How many times to retry on bad status codes.
          - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors.
          - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on.
          - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on.
          - **backoff_factor:** float, A backoff factor to apply between attempts after the second try.
          - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response.
          - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses.
          - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes.
          - **adapter_url:** string, URL match for these retry values (default `https://`)
          - **update_adapter:** bool, call update adapter function after running to auto update adapter.

        **Returns:** No return, mutates the retry / session directly
        # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified.
        if status_forcelist is None:
            status_forcelist = (413, 429, 502, 503, 504)

        retry = urllib3.util.retry.Retry(total=total,

        # set updated retry object.
        self._rest_call_retry_object = retry
        # call update to mount/remount session adapter
        if update_adapter:

    def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"):
        Mount/remount the HTTPS session adapter after changes that affect it.


          - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter.
          - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object.
          - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context.
          - **adapter_url:** URL to use for matching requests. Defaults to 'https://'

        **Returns:** No return, mutates the session adapter directly

        # use default or specified session objects.
        use_retry = retry or self._rest_call_retry_object
        use_ssl_context = ssl_context or self._ca_ssl_context

        # if object adapter has not been built, create a new one.
        if adapter is None:
            # does an effective retry object exist?
            if use_retry is None:
                self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context)
                self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context,
            # if adapter is specified, nothing else should be specified.
            if retry is not None or ssl_context is not None:
                self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored."
                                   " retry and ssl_contexts need to be set in the adapter itself.")

        # Pick the specified or default adapter.
        use_adapter = adapter or self._rest_connection_adapter

        # mount the adapter.
        self._session.mount(adapter_url, use_adapter)


    def view_rest_retry(self, url=None):
        View current rest retry settings in the `requests.Session()` object


          - **url:** URL to use to determine retry methods for. Defaults to 'https://'

        **Returns:** Dict, Key header, value is header value.
        if url is None:
            url = "https://"
        return vars(self._session.get_adapter(url).max_retries)

    def expose_session(self):
        Call to expose the Requests Session object

        **Returns:** `requests.Session` object
        return self._session

    def add_headers(self, headers):
        Permanently add/overwrite headers to session.


          - **headers:** dict with header/value

        **Returns:** Mutates `requests.Session()` object, no return.

    def remove_header(self, header):
        Permanently remove a single header from session


          - **header:** str of single header to remove

        **Returns:** Mutates `requests.Session()` object, no return.
        # check for header first. Return silently if it does not exist.
        if self._session.headers.get(header) is not None:
            del self._session.headers[header]

    def view_headers(self):
        View current headers in the `requests.Session()` object

        **Returns:** Dict, Key header, value is header value.
        return dict(self._session.headers)

    def websocket_add_headers(self, headers):
        Permanently add/overwrite headers to the `API()` WebSocket object


          - **headers:** dict with header/value

        **Returns:** Mutates `API()` object, no return.


    def websocket_remove_header(self, header):
        Permanently remove a single header from the `API()` WebSocket object


          - **header:** str of single header to remove

        **Returns:** Mutates `API()` object, no return.

        # check for header first. Return silently if it does not exist.
        if self._websocket_headers.get(header) is not None:
            del self._websocket_headers[header]

    def websocket_view_headers(self):
        View current headers in the `API()` WebSocket object

        **Returns:** Dict, Key header, value is header value.
        return dict(self._websocket_headers)

    def view_cookies(self):
        View current cookies in the `requests.Session()` object

        **Returns:** List of Dicts, one cookie per Dict.
        return_list = []
        for cookie in self._session.cookies:

        return return_list

    def set_debug(self, debuglevel, set_format=None, set_handler=None):
        Change the debug level of the API


          - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter.
          - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default

        **Returns:** No item returned.
        # set the logging formatter and stream handle
        if set_format is None:
            # default formatter
            api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
        elif not isinstance(set_format, text_type):
            # not a valid format string. Set to default.
            api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
            # valid logging string.
            api_formatter = logging.Formatter(set_format)

        # set the logging handler if supported handler is not passed.
        if set_handler is None:
            # Default handler
            api_handler = logging.StreamHandler()
        elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler,
            # not a valid handler. Set to default handler.
            api_handler = logging.StreamHandler()
            # passed valid handler
            api_handler = set_handler

        # set handler to use format.

        # Get the loggers from other modules in prep for setting new handlers.
        urllib3_logger = logging.getLogger("requests.packages.urllib3")
        urllib3_retry_logger = logging.getLogger("urllib3.util.retry")
        cookie_logger = logging.getLogger("http.cookiejar")

        # remove existing handlers
        api_logger.handlers = []
        urllib3_logger.handlers = []
        urllib3_retry_logger.handlers = []
        cookie_logger.handlers = []
        ws_logger.handlers = []

        # ok, lets set the new handlers.
        if isinstance(debuglevel, int):
            self._debuglevel = debuglevel

        if self._debuglevel == 1:

        elif self._debuglevel == 2:
            cookielib.debug = True

        elif self._debuglevel >= 3:
            cookielib.debug = True

            # set to warning
            cookielib.debug = False


    def _subclass_container(self):
        Call subclasses via function to allow passing parent namespace to subclasses.

        **Returns:** dict with subclass references.
        _parent_class = self

        return_object = {}

        class GetWrapper(Get):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['get'] = GetWrapper

        class PostWrapper(Post):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['post'] = PostWrapper

        class PutWrapper(Put):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['put'] = PutWrapper

        class PatchWrapper(Patch):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['patch'] = PatchWrapper

        class DeleteWrapper(Delete):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['delete'] = DeleteWrapper

        class InteractiveWrapper(Interactive):

            def __init__(self):
                self._parent_class = _parent_class
        return_object['interactive'] = InteractiveWrapper

        class WebSocketsWrapper(WebSockets):

            def __init__(self):
                self._parent_class = _parent_class

        return_object['ws'] = WebSocketsWrapper

        return return_object

    def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False,
                  retry=None, max_retry=None, retry_sleep=None):
        Generic REST call worker function


          - **url:** URL for the REST call
          - **method:** METHOD for the REST call
          - **data:** Optional DATA for the call (for POST/PUT/etc.)
          - **sensitive:** Flag if content request/response should be hidden from logging functions
          - **timeout:** Requests Timeout
          - **content_json:** Bool on whether the Content-Type header should be set to application/json
          - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text.
          - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
          - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
          - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.

        **Returns:** Requests.Response object, extended with:

          - **cgx_status**: Bool, True if a successful CloudGenix response, False if error.
          - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses
          will be converted to a Dict response.
          - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True.
          - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True.

        # pull retry related items from Constructor if not specified.
        if timeout is None:
            timeout = self.rest_call_timeout
        if retry is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")
        if max_retry is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")
        if retry_sleep is not None:
            # Someone using deprecated retry code. Notify.
            sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                             "Please use 'API.modify_rest_retry()' instead.")

        # Get logging level, use this to bypass logging functions with possible large content if not set.
        logger_level = api_logger.getEffectiveLevel()

        # populate headers and cookies from session.
        if content_json and method.lower() not in ['get', 'delete']:
            headers = {
                'Content-Type': 'application/json'
            headers = {}

        # add session headers
        cookie = self._session.cookies.get_dict()

        # make sure data is populated if present.
        if isinstance(data, (list, dict)):
            data = json.dumps(data)

        api_logger.debug('REST_CALL URL = %s', url)

        # make request
            if not sensitive:
                api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n',
                                 method.upper(), url, headers, cookie, data)

            # Actual request
            response = self._session.request(method, url, data=data, stream=True, timeout=timeout,
                                             headers=headers, allow_redirects=False)

            # Request complete - lets parse.
            # if it's a non-CGX-good response, return with cgx_status = False
            if response.status_code not in [,

                # Simple JSON debug
                if not sensitive:
                        api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                            json.loads(text_type(response.headers)), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                        api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE: %s\n', text_type(response.text))
                    api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')

                api_logger.debug("Error, non-200 response received: %s", response.status_code)

                # CGX extend requests.Response for return
                response.cgx_status = False
                response.cgx_content = self._catch_nonjson_streamresponse(response.text)

                # CGX extend requests.Response for any errors/warnings.
                response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
                response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)

                # We are in a failed request. If no error text in response, give the response code and detail.
                if response.cgx_errors is None:
                    response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code))

                return response


                # Simple JSON debug
                if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET):
                        api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                            json.loads(text_type(response.headers)), indent=4))
                        api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                    except ValueError:
                        api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                        api_logger.debug('RESPONSE: %s\n', text_type(response.text))
                elif sensitive:
                    api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')

                # CGX extend requests.Response for return
                response.cgx_status = True
                response.cgx_content = self._catch_nonjson_streamresponse(response.text)

                # CGX extend requests.Response for any errors/warnings.
                response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
                response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
                return response
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\
                as e:

  "Error, %s.", text_type(e))

            # make a requests.Response object for return since we didn't get one.
            response = requests.Response

            # CGX extend requests.Response for return
            response.cgx_status = False
            response.cgx_content = {
                '_error': [
                        'message': 'REST Request Exception: {}'.format(e),
                        'data': {},

            # CGX extend requests.Response for any errors/warnings.
            response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
            response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
            return response

    def websocket_call(self, url, *args, **kwargs):
        Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session.


          - **url:** URL for the REST call
          - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below)

        **Returns:** `websocket.client.Connect` object.

        **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will
        be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK
        auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see

        headers = {}
        # add session headers
        # Get cookies from requests.
        cookies = self._session.cookies.get_dict()

        # create cookie header from the cookies in Requests
        headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()])

        # check for host header
        host_header = headers.get("Host")
        force_host_arg = None
        if host_header is not None:

            # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass
            # a separate IP to connect to as 'host' argument to `websockets.client.Connect`.
            # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host
            # header, and remove the host header - then send the original value to `Connect` as host kwarg.

            # split controller at "//" ( to get host.
            controller_host = self.controller.split("//")[1]

            force_host_arg = controller_host
            # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words)
            new_url = url.replace(force_host_arg, host_header, 1)
            api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url))
            url = new_url
            # delete the host header
            del headers["Host"]

        # convert the headers dictionary to a list of tuples.
        header_tuple_list = [(key, value) for key, value in headers.items()]

        # Create argument tuple
        ws_args = (url,) + args

        # Create keyword args
        ws_kwargs = {
            "ssl": self._ca_ssl_context,
            "extra_headers": header_tuple_list
        # check for force host arg
        if force_host_arg is not None:
            api_logger.debug("Forcing connection to host {0}".format(force_host_arg))
            ws_kwargs["host"] = force_host_arg

        # Override automatic with any manually passed kwargs

        return websockets.connect(*ws_args, **ws_kwargs)

    def parse_auth_token(self, auth_token):
        Break auth_token up into its constituent values.


          - **auth_token:** Auth_token string

        **Returns:** dict with Auth Token constituents
        # remove the random security key value from the front of the auth_token
        auth_token_cleaned = auth_token.split('-', 1)[1]
        # URL Decode the Auth Token
        auth_token_decoded = self.url_decode(auth_token_cleaned)
        # Create a new dict to hold the response.
        auth_dict = {}

        # Parse the token
        for key_value in auth_token_decoded.split("&"):
            key_value_list = key_value.split("=")
            # check for valid token parts
            if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]:
                auth_dict[key_value_list[0]] = key_value_list[1]

        # Return the dict of key/values in the token.
        return auth_dict

    def update_region_to_controller(self, region):
        Update the controller string with dynamic region info.
        Controller string should end up as `<name[-env]>.<region>`


          - **region:** region string.

        **Returns:** No return value, mutates the controller in the class namespace
        # default region position in a list
        region_position = 1

        # Check for a global "ignore region" flag
        if self.ignore_region:
            # bypass
            api_logger.debug("IGNORE_REGION set, not updating controller region.")

        api_logger.debug("Updating Controller Region")
        api_logger.debug("CONTROLLER = %s", self.controller)
        api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig)
        api_logger.debug("CONTROLLER_REGION = %s", self.controller_region)

        # Check if this is an initial region use or an update region use
        if self.controller_orig:
            controller_base = self.controller_orig
            controller_base = self.controller
            self.controller_orig = self.controller

        # splice controller string
        controller_full_part_list = controller_base.split('.')

        for idx, part in enumerate(controller_full_part_list):
            # is the region already in the controller string?
            if region == part:
                # yes, controller already has appropriate region
                api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx)
                # update region if it is not already set.
                if self.controller_region != region:
                    self.controller_region = region
                    api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)
                # Update controller if not already matching
                if self.controller != controller_base:
                    self.controller = controller_base
                    api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)

        controller_part_count = len(controller_full_part_list)

        # handle short domain case
        if controller_part_count > 1:
            # insert region
            controller_full_part_list[region_position] = region
            self.controller = ".".join(controller_full_part_list)
            # short domain, just add region
            self.controller = ".".join(controller_full_part_list) + '.' + region

        # update SDK vars with region info
        self.controller_orig = controller_base
        self.controller_region = region

        api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)
        api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig)
        api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)

    def parse_region(self, login_response):
        Return region from a successful login response.


          - **login_response:** requests.Response from a successful login.

        **Returns:** region name.
        auth_token = login_response.cgx_content['x_auth_token']
        auth_token_dict = self.parse_auth_token(auth_token)
        auth_region = auth_token_dict.get('region')
        return auth_region

    def reparse_login_cookie_after_region_update(self, login_response):
        Sometimes, login cookie gets sent with region info instead of This function
        re-parses the original login request and applies cookies to the session if they now match the new region.


          - **login_response:** requests.Response from a non-region login.

        **Returns:** updates API() object directly, no return.

        login_url = login_response.request.url
        api_logger.debug("ORIGINAL REQUEST URL = %s", login_url)
        # replace old controller with new controller.
        login_url_new = login_url.replace(self.controller_orig, self.controller)
        api_logger.debug("UPDATED REQUEST URL = %s", login_url_new)
        # reset login url with new region
        login_response.request.url = login_url_new
        # prep cookie jar parsing
        req = requests.cookies.MockRequest(login_response.request)
        res = requests.cookies.MockResponse(login_response.raw._original_response.msg)
        # extract cookies to session cookie jar.
        self._session.cookies.extract_cookies(res, req)

    def _catch_nonjson_streamresponse(rawresponse):
        Validate a streamed response is JSON. Return a Python dictionary either way.


          - **rawresponse:** Streamed Response from Requests.

        **Returns:** Dictionary
        # attempt to load response for return.
            response = json.loads(rawresponse)
        except (ValueError, TypeError):
            if rawresponse:
                response = {
                    '_error': [
                            'message': 'Response not in JSON format.',
                            'data': rawresponse,
                # in case of null response, return empty dict.
                response = {}

        return response

    def url_decode(url):
        URL Decode function using REGEX


          - **url:** URLENCODED text string

        **Returns:** Non URLENCODED string
        return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(, 16)), url)

    def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError):
        Non-recoverable error, write message to STDERR and raise exception


          - **message:** Message text
          - **resp:** Optional - CloudGenix SDK Response object
          - **cr:** Optional - Use (or not) Carriage Returns.
          - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError`

        **Returns:** No Return, throws exception.
        output = "ERROR: " + str(message)
        if cr:
            output += "\n"
        if resp is not None:
            output2 = str(jdout_detailed(resp))
            if cr:
                output2 += "\n"
        raise exception(message)

    def throw_warning(message, resp=None, cr=True):
        Recoverable Warning.


          - **message:** Message text
          - **resp:** Optional - CloudGenix SDK Response object
          - **cr:** Optional - Use (or not) Carriage Returns.

        **Returns:** No Return.
        output = "WARNING: " + str(message)
        if cr:
            output += "\n"
        if resp is not None:
            output2 = str(jdout_detailed(resp))
            if cr:
                output2 += "\n"

    def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'):
        Extract list of items from a CloudGenix API Response object.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **error_label:** Optional - text to describe operation on error.
          - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response.
          - **items_key:** Optional - Text for items key to extract (default 'items')

        **Returns:** list of 'items' objects.

        if pass_code_list is None:
            pass_code_list = [404, 400]

        items = resp_object.cgx_content.get(items_key)

        if resp_object.cgx_status and items is not None:
            return items

        # handle 404 and other error codes for certain APIs where objects may not exist
        elif resp_object.status_code in pass_code_list:
            return [{}]

            if error_label is not None:
                self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object)
                return [{}]
                self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object)
                return [{}]

    def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None):
        Build key/value lookup dictionary from a list of dictionaries with specified key/value entries.


          - **list_content:** List of dicts to derive lookup structs from
          - **key_val:** Optional - Value to extract from entry to be key
          - **value_val:** Optional - Value to extract from entry to be value
          - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache`
          - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked.

        **Returns:** Lookup Dictionary
        if nag_cache and isinstance(nag_cache, list):
            already_nagged_dup_keys = nag_cache
            already_nagged_dup_keys = []

        lookup_dict = {}
        blacklist_duplicate_keys = []
        blacklist_duplicate_entries = []

        for item in list_content:
            item_key = item.get(key_val)
            item_value = item.get(value_val)
            # print(item_key, item_value)
            if item_key and item_value is not None:
                # check if it's a duplicate key.
                if str(item_key) in lookup_dict:
                    # First duplicate we've seen - save for warning.
                    duplicate_value = lookup_dict.get(item_key)
                    blacklist_duplicate_entries.append({item_key: duplicate_value})
                    blacklist_duplicate_entries.append({item_key: item_value})
                    # remove from lookup dict to prevent accidental overlap usage
                    del lookup_dict[str(item_key)]

                # check if it was a third+ duplicate key for a previous key
                elif item_key in blacklist_duplicate_keys:
                    # save for warning.
                    blacklist_duplicate_entries.append({item_key: item_value})

                    # no duplicates, append
                    lookup_dict[str(item_key)] = item_value

        for duplicate_key in blacklist_duplicate_keys:
            matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry]
            # check if force_nag set and if not, has key already been notified to the end user.
            if force_nag or duplicate_key not in already_nagged_dup_keys:
                    "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller,"
                    " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries)
                # we've now notified, add to notified list.
        return lookup_dict

    def pull_content_error(resp_object, raw=False):
        Parse API response object, return error detail text for printing on error in response content.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False.

        **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors.
        api_logger.debug('pull_content_error function:')

            # attempt to grab the cgx_content. should always be a Dict if it exists.
            data = resp_object.cgx_content
        except (TypeError, ValueError, AttributeError):
            # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
            # extended `requests.Response` object.
            data = resp_object

        if not isinstance(data, dict):
            # fast fail if data isn't correct format.
            api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object))
            return None

        parsed_messages = []
        errors = []

        if isinstance(data, dict):
            # got a parsed response.
            errors = data.get('_error')
            if isinstance(errors, list):
                # Some errors in the response
                if raw is True:
                    # just return raw error list
                    return errors
                    for error in errors:
                        code = error.get('code')
                        message = error.get('message')
                        if code and message:
                            parsed_messages.append("{0} ({1})".format(message, code))
            elif errors is None:
                # no errors, ensure list and empty.
                errors = []
                # not a list.. put whatever it is into a list.
                errors = [errors]

        api_logger.debug("ERRORS: %s", errors)
        api_logger.debug("PARSED_ERRORS: %s", parsed_messages)

        # is parsed_messages empty and errors exist? dump errors as txt
        if not parsed_messages and len(errors) > 1:
            return text_type(errors)
        elif len(parsed_messages) == 1:
            return text_type(parsed_messages[0])
        elif len(parsed_messages) > 1:
            # return comma separated string of errors
            return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
            # no errors
            return None

    def pull_content_warning(resp_object, raw=False):
        Parse API response object, return text for printing on warning in response.


          - **resp_object:** CloudGenix Extended `requests.Response` object.
          - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False.

        **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings.
        api_logger.debug('pull_content_warning function:')

            # attempt to grab the cgx_content. should always be a Dict if it exists.
            data = resp_object.cgx_content
        except (TypeError, ValueError, AttributeError):
            # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
            # extended `requests.Response` object.
            data = resp_object

        if not isinstance(data, dict):
            # fast fail if data isn't correct format.
            api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}'
            return None

        parsed_messages = []
        warnings = []

        if isinstance(data, dict):
            # got a parsed response.
            warnings = data.get('_warning')
            if isinstance(warnings, list):
                # Some warnings in the response
                if raw is True:
                    # just return raw warning list
                    return warnings
                    for warning in warnings:
                        code = warning.get('code')
                        message = warning.get('message')
                        if code and message:
                            parsed_messages.append("{0} ({1})".format(message, code))
            elif warnings is None:
                # no warnings, ensure list and empty.
                warnings = []
                # not a list.. put whatever it is into a list.
                warnings = [warnings]

        api_logger.debug("WARNINGS: %s", warnings)
        api_logger.debug("PARSED_WARNINGS: %s", parsed_messages)

        # is parsed_messages empty and warnings exist? dump warnings as txt
        if not parsed_messages and len(warnings) > 1:
            return text_type(warnings)
        elif len(parsed_messages) == 1:
            return text_type(parsed_messages[0])
        elif len(parsed_messages) > 1:
            # return comma separated string of warnings
            return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
            # no warnings
            return None

Ancestors (in MRO)

  • API
  • builtins.object

Class variables

var address_string

String representing address, optional - should be pulled from get.profile() data.

var ca_verify_filename

DEPRECATED: Filename to use for CA verification. Moved to modern python ssl, updated to only passing filename in event of local file.

var cache

API response cache (Future)

var client_id

If ESP/MSP, is client currently logged in

var controller

Current active controller URL

var controller_orig

Original Controller URL as entered - before Region re-parse

var controller_region

Controller Region, if present.

var email

Email (username) for session

var ignore_region

Ignore regions returned by controller, and use explicit controller only.

var is_esp

Is the current tenant an ESP/MSP?

var operator_id

Operator ID of current session.

var rest_call_max_retry

DEPRECATED: Please use modify_rest_retry.

var rest_call_retry

DEPRECATED: Please use modify_rest_retry.

var rest_call_sleep

DEPRECATED: Please use modify_rest_retry.

var rest_call_timeout

Maximum time to wait for any data from REST server.

var roles

Roles list

var tenant_id

Numeric ID of tenant (account) - should be set after initial login from cloudgenix.get_api.Get.profile data

var tenant_name

Name of tenant (account), should be set after initial login from cloudgenix.get_api.Get.profile data

var token_session

Is this login from a static AUTH_TOKEN (True), or a standard login (False)

var update_check

Notify users of available update to SDK

var update_info_url

Update Info URL for use once Constructor Created.

var verify

Verify SSL certificate.

var version

Version string for use once Constructor created.

Static methods

def __init__(

self, controller='', ssl_verify=True, update_check=True)

Create the API constructor object

  • controller: Initial Controller URL String
  • ssl_verify: Should SSL be verified for this system. Can be file or BOOL. See ssl_verify for more details.
  • update_check: Bool to Enable/Disable SDK update check and new release notifications.
def __init__(self, controller=controller, ssl_verify=verify, update_check=True):
    Create the API constructor object
      - **controller:** Initial Controller URL String
      - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details.
      - **update_check:** Bool to Enable/Disable SDK update check and new release notifications.
    # set version and update url from outer scope.
    self.version = version
    """Version string for use once Constructor created."""
    self.update_info_url = update_info_url
    """Update Info URL for use once Constructor Created."""
    # try:
    if controller and isinstance(controller, (binary_type, text_type)):
        self.controller = controller.lower()
        self.controller_orig = controller.lower()
    # Create Requests Session.
    self._session = requests.Session()
    # set ssl context
    if isinstance(ssl_verify, (binary_type, text_type, bool)):
        self.ssl_verify(ssl_verify, update_adapter=False)
    # Set default REST retry parameters
    # update the HTTP Adapter for requests.
    # handle update check
    if isinstance(update_check, bool):
        self.update_check = update_check
    if update_check:
    # Identify SDK in the User-Agent.
    user_agent = self._session.headers.get('User-Agent')
    if user_agent:
        user_agent += ' (CGX SDK v{0})'.format(self.version)
        user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version)
    # Update Headers
        'Accept': 'application/json',
        'User-Agent': text_type(user_agent)
    # except Exception as e:
    #     raise ValueError("Unable to create Requests session object: {0}.".format(e))
    api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s",
    # Update Headers for WebSocket requests
    websocketlib_name = websockets.__name__
    websocketlib_version = websockets.version.version
    if not websocketlib_name:
        websocketlib_name = 'websockets'
    if not websocketlib_version:
        websocketlib_version = 'UNKNOWN'
    ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name,
    self._websocket_headers = {
        'Accept': 'application/json',
        'User-Agent': text_type(ws_user_agent)
    # Bind API method classes to this object
    subclasses = self._subclass_container()
    self.get = subclasses["get"]()
    """API object link to `cloudgenix.get_api.Get`""" = subclasses["post"]()
    """API object link to `cloudgenix.post_api.Post`"""
    self.put = subclasses["put"]()
    """API object link to `cloudgenix.put_api.Put`"""
    self.patch = subclasses["patch"]()
    """API object link to `cloudgenix.patch_api.Patch`"""
    self.delete = subclasses["delete"]()
    """API object link to `cloudgenix.delete_api.Delete`"""
    self.interactive = subclasses["interactive"]()
    """API object link to `cloudgenix.interactive.Interactive`""" = subclasses["ws"]()
    """API object link to ``"""

def add_headers(

self, headers)

Permanently add/overwrite headers to session.


  • headers: dict with header/value

Returns: Mutates requests.Session() object, no return.

def add_headers(self, headers):
    Permanently add/overwrite headers to session.
      - **headers:** dict with header/value
    **Returns:** Mutates `requests.Session()` object, no return.

def build_lookup_dict(

self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None)

Build key/value lookup dictionary from a list of dictionaries with specified key/value entries.


  • list_content: List of dicts to derive lookup structs from
  • key_val: Optional - Value to extract from entry to be key
  • value_val: Optional - Value to extract from entry to be value
  • force_nag: Optional - Bool, if True will nag even if key in nag_cache
  • nag_cache: Optional - List of keys that already exist in a lookup dict that should be duplicate checked.

Returns: Lookup Dictionary

def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None):
    Build key/value lookup dictionary from a list of dictionaries with specified key/value entries.
      - **list_content:** List of dicts to derive lookup structs from
      - **key_val:** Optional - Value to extract from entry to be key
      - **value_val:** Optional - Value to extract from entry to be value
      - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache`
      - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked.
    **Returns:** Lookup Dictionary
    if nag_cache and isinstance(nag_cache, list):
        already_nagged_dup_keys = nag_cache
        already_nagged_dup_keys = []
    lookup_dict = {}
    blacklist_duplicate_keys = []
    blacklist_duplicate_entries = []
    for item in list_content:
        item_key = item.get(key_val)
        item_value = item.get(value_val)
        # print(item_key, item_value)
        if item_key and item_value is not None:
            # check if it's a duplicate key.
            if str(item_key) in lookup_dict:
                # First duplicate we've seen - save for warning.
                duplicate_value = lookup_dict.get(item_key)
                blacklist_duplicate_entries.append({item_key: duplicate_value})
                blacklist_duplicate_entries.append({item_key: item_value})
                # remove from lookup dict to prevent accidental overlap usage
                del lookup_dict[str(item_key)]
            # check if it was a third+ duplicate key for a previous key
            elif item_key in blacklist_duplicate_keys:
                # save for warning.
                blacklist_duplicate_entries.append({item_key: item_value})
                # no duplicates, append
                lookup_dict[str(item_key)] = item_value
    for duplicate_key in blacklist_duplicate_keys:
        matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry]
        # check if force_nag set and if not, has key already been notified to the end user.
        if force_nag or duplicate_key not in already_nagged_dup_keys:
                "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller,"
                " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries)
            # we've now notified, add to notified list.
    return lookup_dict

def expose_session(


Call to expose the Requests Session object

Returns: requests.Session object

def expose_session(self):
    Call to expose the Requests Session object
    **Returns:** `requests.Session` object
    return self._session

def extract_items(

self, resp_object, error_label=None, pass_code_list=None, items_key='items')

Extract list of items from a CloudGenix API Response object.


  • resp_object: CloudGenix Extended requests.Response object.
  • error_label: Optional - text to describe operation on error.
  • pass_code_list: Optional - list of HTTP response codes to silently pass with empty list response.
  • items_key: Optional - Text for items key to extract (default 'items')

Returns: list of 'items' objects.

def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'):
    Extract list of items from a CloudGenix API Response object.
      - **resp_object:** CloudGenix Extended `requests.Response` object.
      - **error_label:** Optional - text to describe operation on error.
      - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response.
      - **items_key:** Optional - Text for items key to extract (default 'items')
    **Returns:** list of 'items' objects.
    if pass_code_list is None:
        pass_code_list = [404, 400]
    items = resp_object.cgx_content.get(items_key)
    if resp_object.cgx_status and items is not None:
        return items
    # handle 404 and other error codes for certain APIs where objects may not exist
    elif resp_object.status_code in pass_code_list:
        return [{}]
        if error_label is not None:
            self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object)
            return [{}]
            self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object)
            return [{}]

def modify_rest_retry(

self, total=8, connect=None, read=None, redirect=None, status=None, other=0, allowed_methods=frozenset({'HEAD', 'PUT', 'GET', 'DELETE', 'OPTIONS', 'TRACE'}), status_forcelist=None, backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True, respect_retry_after_header=True, update_adapter=True)

Modify retry parameters for the SDKs rest call object.

Parameters are directly from and passed directly to urllib3.util.retry.Retry, and get applied directly to the underlying requests.Session object.

Default retry with total=8 and backoff_factor=0.705883:

  • Try 1, 0 delay (0 total seconds)
  • Try 2, 0 delay (0 total seconds)
  • Try 3, 0.705883 delay (0.705883 total seconds)
  • Try 4, 1.411766 delay (2.117649 total seconds)
  • Try 5, 2.823532 delay (4.941181 total seconds)
  • Try 6, 5.647064 delay (10.588245 total seconds)
  • Try 7, 11.294128 delay (21.882373 total seconds)
  • Try 8, 22.588256 delay (44.470629 total seconds)
  • Try 9, 45.176512 delay (89.647141 total seconds)
  • Try 10, 90.353024 delay (180.000165 total seconds)


  • total: int, Total number of retries to allow. Takes precedence over other counts.
  • connect: int, How many connection-related errors to retry on.
  • read: int, How many times to retry on read errors.
  • redirect: int, How many redirects to perform. loops.
  • status: int, How many times to retry on bad status codes.
  • other: int, How many times to retry on other errors. Set to 0 by default for things like SSL errors.
  • method_whitelist: iterable, Set of uppercased HTTP method verbs that we should retry on.
  • status_forcelist: iterable, A set of integer HTTP status codes that we should force a retry on.
  • backoff_factor: float, A backoff factor to apply between attempts after the second try.
  • raise_on_redirect: bool, True = raise a MaxRetryError, False = return latest 3xx response.
  • raise_on_status: bool, Similar logic to raise_on_redirect but for status responses.
  • respect_retry_after_header: bool, Whether to respect Retry-After header on status codes.
  • adapter_url: string, URL match for these retry values (default https://)
  • update_adapter: bool, call update adapter function after running to auto update adapter.

Returns: No return, mutates the retry / session directly

def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0,
                      allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None,
                      backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True,
                      respect_retry_after_header=True, update_adapter=True):
    Modify retry parameters for the SDKs rest call object.
    Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to
    the underlying `requests.Session` object.
    Default retry with total=8 and backoff_factor=0.705883:
     - Try 1, 0 delay (0 total seconds)
     - Try 2, 0 delay (0 total seconds)
     - Try 3, 0.705883 delay (0.705883 total seconds)
     - Try 4, 1.411766 delay (2.117649 total seconds)
     - Try 5, 2.823532 delay (4.941181 total seconds)
     - Try 6, 5.647064 delay (10.588245 total seconds)
     - Try 7, 11.294128 delay (21.882373 total seconds)
     - Try 8, 22.588256 delay (44.470629 total seconds)
     - Try 9, 45.176512 delay (89.647141 total seconds)
     - Try 10, 90.353024 delay (180.000165 total seconds)
      - **total:** int, Total number of retries to allow. Takes precedence over other counts.
      - **connect:** int, How many connection-related errors to retry on.
      - **read:** int, How many times to retry on read errors.
      - **redirect:** int, How many redirects to perform. loops.
      - **status:** int, How many times to retry on bad status codes.
      - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors.
      - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on.
      - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on.
      - **backoff_factor:** float, A backoff factor to apply between attempts after the second try.
      - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response.
      - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses.
      - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes.
      - **adapter_url:** string, URL match for these retry values (default `https://`)
      - **update_adapter:** bool, call update adapter function after running to auto update adapter.
    **Returns:** No return, mutates the retry / session directly
    # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified.
    if status_forcelist is None:
        status_forcelist = (413, 429, 502, 503, 504)
    retry = urllib3.util.retry.Retry(total=total,
    # set updated retry object.
    self._rest_call_retry_object = retry
    # call update to mount/remount session adapter
    if update_adapter:

def notify_for_new_version(


Check for a new version of the SDK on API constructor instantiation. If new version found, print Notification to STDERR.

On failure of this check, fail silently.

Returns: No item returned, directly prints notification to sys.stderr.

def notify_for_new_version(self):
    Check for a new version of the SDK on API constructor instantiation. If new version found, print
    Notification to STDERR.
    On failure of this check, fail silently.
    **Returns:** No item returned, directly prints notification to `sys.stderr`.
    # broad exception clause, if this fails for any reason just return.
        recommend_update = False
        update_check_resp = requests.get(self.update_info_url, timeout=3)
        web_version = update_check_resp.json()["info"]["version"]
        api_logger.debug("RETRIEVED_VERSION: %s", web_version)
        available_version =
        current_version =
        available_major = available_version.get('major')
        available_minor = available_version.get('minor')
        available_patch = available_version.get('patch')
        available_build = available_version.get('build')
        current_major = current_version.get('major')
        current_minor = current_version.get('minor')
        current_patch = current_version.get('patch')
        current_build = current_version.get('build')
        api_logger.debug("AVAILABLE_VERSION: %s", available_version)
        api_logger.debug("CURRENT_VERSION: %s", current_version)
        # check for major/minor version differences, do not alert for build differences.
        if available_major > current_major:
            recommend_update = True
        elif available_major >= current_major and available_minor > current_minor:
            recommend_update = True
        elif available_major >= current_major and available_minor >= current_minor and \
                available_patch > current_patch:
            recommend_update = True
        api_logger.debug("NEED_UPDATE: %s", recommend_update)
        # notify.
        if recommend_update:
            sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 "
                             "months after release of a new version.\n"
                             "\tLatest Version: {0}\n"
                             "\tCurrent Version: {1}\n"
                             "\tFor more info, see ''. Additionally, this "
                             "message can be suppressed by instantiating the API with API(update_check=False).\n\n"
                             "".format(web_version, self.version))
    except Exception:
        # just return and continue.

def parse_auth_token(

self, auth_token)

Break auth_token up into its constituent values.


  • auth_token: Auth_token string

Returns: dict with Auth Token constituents

def parse_auth_token(self, auth_token):
    Break auth_token up into its constituent values.
      - **auth_token:** Auth_token string
    **Returns:** dict with Auth Token constituents
    # remove the random security key value from the front of the auth_token
    auth_token_cleaned = auth_token.split('-', 1)[1]
    # URL Decode the Auth Token
    auth_token_decoded = self.url_decode(auth_token_cleaned)
    # Create a new dict to hold the response.
    auth_dict = {}
    # Parse the token
    for key_value in auth_token_decoded.split("&"):
        key_value_list = key_value.split("=")
        # check for valid token parts
        if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]:
            auth_dict[key_value_list[0]] = key_value_list[1]
    # Return the dict of key/values in the token.
    return auth_dict

def parse_region(

self, login_response)

Return region from a successful login response.


  • login_response: requests.Response from a successful login.

Returns: region name.

def parse_region(self, login_response):
    Return region from a successful login response.
      - **login_response:** requests.Response from a successful login.
    **Returns:** region name.
    auth_token = login_response.cgx_content['x_auth_token']
    auth_token_dict = self.parse_auth_token(auth_token)
    auth_region = auth_token_dict.get('region')
    return auth_region

def pull_content_error(

resp_object, raw=False)

Parse API response object, return error detail text for printing on error in response content.


  • resp_object: CloudGenix Extended requests.Response object.
  • raw: Optional. If True, return list of dicts (raw error messages.) Default False.

Returns: text_type error message, or list of dicts (if raw=True). None if no errors.

def pull_content_error(resp_object, raw=False):
    Parse API response object, return error detail text for printing on error in response content.
      - **resp_object:** CloudGenix Extended `requests.Response` object.
      - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False.
    **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors.
    api_logger.debug('pull_content_error function:')
        # attempt to grab the cgx_content. should always be a Dict if it exists.
        data = resp_object.cgx_content
    except (TypeError, ValueError, AttributeError):
        # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
        # extended `requests.Response` object.
        data = resp_object
    if not isinstance(data, dict):
        # fast fail if data isn't correct format.
        api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object))
        return None
    parsed_messages = []
    errors = []
    if isinstance(data, dict):
        # got a parsed response.
        errors = data.get('_error')
        if isinstance(errors, list):
            # Some errors in the response
            if raw is True:
                # just return raw error list
                return errors
                for error in errors:
                    code = error.get('code')
                    message = error.get('message')
                    if code and message:
                        parsed_messages.append("{0} ({1})".format(message, code))
        elif errors is None:
            # no errors, ensure list and empty.
            errors = []
            # not a list.. put whatever it is into a list.
            errors = [errors]
    api_logger.debug("ERRORS: %s", errors)
    api_logger.debug("PARSED_ERRORS: %s", parsed_messages)
    # is parsed_messages empty and errors exist? dump errors as txt
    if not parsed_messages and len(errors) > 1:
        return text_type(errors)
    elif len(parsed_messages) == 1:
        return text_type(parsed_messages[0])
    elif len(parsed_messages) > 1:
        # return comma separated string of errors
        return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
        # no errors
        return None

def pull_content_warning(

resp_object, raw=False)

Parse API response object, return text for printing on warning in response.


  • resp_object: CloudGenix Extended requests.Response object.
  • raw: Optional. If True, return list of dicts (raw warning messages.) Default False.

Returns: text_type warning message, or list of dicts (if raw=True). None if no warnings.

def pull_content_warning(resp_object, raw=False):
    Parse API response object, return text for printing on warning in response.
      - **resp_object:** CloudGenix Extended `requests.Response` object.
      - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False.
    **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings.
    api_logger.debug('pull_content_warning function:')
        # attempt to grab the cgx_content. should always be a Dict if it exists.
        data = resp_object.cgx_content
    except (TypeError, ValueError, AttributeError):
        # cgx_content did not exist. check root object for dict as end-user may pass the content and not the
        # extended `requests.Response` object.
        data = resp_object
    if not isinstance(data, dict):
        # fast fail if data isn't correct format.
        api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}'
        return None
    parsed_messages = []
    warnings = []
    if isinstance(data, dict):
        # got a parsed response.
        warnings = data.get('_warning')
        if isinstance(warnings, list):
            # Some warnings in the response
            if raw is True:
                # just return raw warning list
                return warnings
                for warning in warnings:
                    code = warning.get('code')
                    message = warning.get('message')
                    if code and message:
                        parsed_messages.append("{0} ({1})".format(message, code))
        elif warnings is None:
            # no warnings, ensure list and empty.
            warnings = []
            # not a list.. put whatever it is into a list.
            warnings = [warnings]
    api_logger.debug("WARNINGS: %s", warnings)
    api_logger.debug("PARSED_WARNINGS: %s", parsed_messages)
    # is parsed_messages empty and warnings exist? dump warnings as txt
    if not parsed_messages and len(warnings) > 1:
        return text_type(warnings)
    elif len(parsed_messages) == 1:
        return text_type(parsed_messages[0])
    elif len(parsed_messages) > 1:
        # return comma separated string of warnings
        return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]),  parsed_messages[-1]))
        # no warnings
        return None

def remove_header(

self, header)

Permanently remove a single header from session


  • header: str of single header to remove

Returns: Mutates requests.Session() object, no return.

def remove_header(self, header):
    Permanently remove a single header from session
      - **header:** str of single header to remove
    **Returns:** Mutates `requests.Session()` object, no return.
    # check for header first. Return silently if it does not exist.
    if self._session.headers.get(header) is not None:
        del self._session.headers[header]

def rest_call(

self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False, retry=None, max_retry=None, retry_sleep=None)

Generic REST call worker function


  • url: URL for the REST call
  • method: METHOD for the REST call
  • data: Optional DATA for the call (for POST/PUT/etc.)
  • sensitive: Flag if content request/response should be hidden from logging functions
  • timeout: Requests Timeout
  • content_json: Bool on whether the Content-Type header should be set to application/json
  • raw_msgs: True/False, if True, do not convert API sideband messages (warnings, errors) to text.
  • retry: DEPRECATED - please use modify_rest_retry instead.
  • max_retry: DEPRECATED - please use modify_rest_retry instead.
  • retry_sleep: DEPRECATED - please use modify_rest_retry instead.

Returns: Requests.Response object, extended with:

  • cgx_status: Bool, True if a successful CloudGenix response, False if error.
  • cgx_content: Content of the response, guaranteed to be in Dict format. Empty/invalid responses will be converted to a Dict response.
  • cgx_errors: Text error messages if any are present. None if none. List if raw_msgs is True.
  • cgx_warnings: Text warning messages if any are present. None if none. List if raw_msgs is True.
def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False,
              retry=None, max_retry=None, retry_sleep=None):
    Generic REST call worker function
      - **url:** URL for the REST call
      - **method:** METHOD for the REST call
      - **data:** Optional DATA for the call (for POST/PUT/etc.)
      - **sensitive:** Flag if content request/response should be hidden from logging functions
      - **timeout:** Requests Timeout
      - **content_json:** Bool on whether the Content-Type header should be set to application/json
      - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text.
      - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
      - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
      - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead.
    **Returns:** Requests.Response object, extended with:
      - **cgx_status**: Bool, True if a successful CloudGenix response, False if error.
      - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses
      will be converted to a Dict response.
      - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True.
      - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True.
    # pull retry related items from Constructor if not specified.
    if timeout is None:
        timeout = self.rest_call_timeout
    if retry is not None:
        # Someone using deprecated retry code. Notify.
        sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. "
                         "Please use 'API.modify_rest_retry()' instead.")
    if max_retry is not None:
        # Someone using deprecated retry code. Notify.
        sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                         "Please use 'API.modify_rest_retry()' instead.")
    if retry_sleep is not None:
        # Someone using deprecated retry code. Notify.
        sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. "
                         "Please use 'API.modify_rest_retry()' instead.")
    # Get logging level, use this to bypass logging functions with possible large content if not set.
    logger_level = api_logger.getEffectiveLevel()
    # populate headers and cookies from session.
    if content_json and method.lower() not in ['get', 'delete']:
        headers = {
            'Content-Type': 'application/json'
        headers = {}
    # add session headers
    cookie = self._session.cookies.get_dict()
    # make sure data is populated if present.
    if isinstance(data, (list, dict)):
        data = json.dumps(data)
    api_logger.debug('REST_CALL URL = %s', url)
    # make request
        if not sensitive:
            api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n',
                             method.upper(), url, headers, cookie, data)
        # Actual request
        response = self._session.request(method, url, data=data, stream=True, timeout=timeout,
                                         headers=headers, allow_redirects=False)
        # Request complete - lets parse.
        # if it's a non-CGX-good response, return with cgx_status = False
        if response.status_code not in [,
            # Simple JSON debug
            if not sensitive:
                    api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                        json.loads(text_type(response.headers)), indent=4))
                except ValueError:
                    api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                    api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                except ValueError:
                    api_logger.debug('RESPONSE: %s\n', text_type(response.text))
                api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')
            api_logger.debug("Error, non-200 response received: %s", response.status_code)
            # CGX extend requests.Response for return
            response.cgx_status = False
            response.cgx_content = self._catch_nonjson_streamresponse(response.text)
            # CGX extend requests.Response for any errors/warnings.
            response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
            response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
            # We are in a failed request. If no error text in response, give the response code and detail.
            if response.cgx_errors is None:
                response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code))
            return response
            # Simple JSON debug
            if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET):
                    api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps(
                        json.loads(text_type(response.headers)), indent=4))
                    api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4))
                except ValueError:
                    api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers))
                    api_logger.debug('RESPONSE: %s\n', text_type(response.text))
            elif sensitive:
                api_logger.debug('RESPONSE NOT LOGGED (sensitive content)')
            # CGX extend requests.Response for return
            response.cgx_status = True
            response.cgx_content = self._catch_nonjson_streamresponse(response.text)
            # CGX extend requests.Response for any errors/warnings.
            response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
            response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
            return response
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\
            as e:"Error, %s.", text_type(e))
        # make a requests.Response object for return since we didn't get one.
        response = requests.Response
        # CGX extend requests.Response for return
        response.cgx_status = False
        response.cgx_content = {
            '_error': [
                    'message': 'REST Request Exception: {}'.format(e),
                    'data': {},
        # CGX extend requests.Response for any errors/warnings.
        response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs)
        response.cgx_errors = self.pull_content_error(response, raw=raw_msgs)
        return response

def set_debug(

self, debuglevel, set_format=None, set_handler=None)

Change the debug level of the API


  • set_format: Optional. If set and text_type, use input for formatter. Otherwise, default formatter.
  • set_format: Optional. If set and logging.Handler type, use input for handler. Otherwise, default logging.StreamHandler()

Returns: No item returned.

def set_debug(self, debuglevel, set_format=None, set_handler=None):
    Change the debug level of the API
      - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter.
      - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default
    **Returns:** No item returned.
    # set the logging formatter and stream handle
    if set_format is None:
        # default formatter
        api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
    elif not isinstance(set_format, text_type):
        # not a valid format string. Set to default.
        api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s")
        # valid logging string.
        api_formatter = logging.Formatter(set_format)
    # set the logging handler if supported handler is not passed.
    if set_handler is None:
        # Default handler
        api_handler = logging.StreamHandler()
    elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler,
        # not a valid handler. Set to default handler.
        api_handler = logging.StreamHandler()
        # passed valid handler
        api_handler = set_handler
    # set handler to use format.
    # Get the loggers from other modules in prep for setting new handlers.
    urllib3_logger = logging.getLogger("requests.packages.urllib3")
    urllib3_retry_logger = logging.getLogger("urllib3.util.retry")
    cookie_logger = logging.getLogger("http.cookiejar")
    # remove existing handlers
    api_logger.handlers = []
    urllib3_logger.handlers = []
    urllib3_retry_logger.handlers = []
    cookie_logger.handlers = []
    ws_logger.handlers = []
    # ok, lets set the new handlers.
    if isinstance(debuglevel, int):
        self._debuglevel = debuglevel
    if self._debuglevel == 1:
    elif self._debuglevel == 2:
        cookielib.debug = True
    elif self._debuglevel >= 3:
        cookielib.debug = True
        # set to warning
        cookielib.debug = False

def ssl_verify(

self, ssl_verify, ciphers='DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK', options=<Options.OP_NO_COMPRESSION|OP_NO_TICKET: 147456>, tls_min=<TLSVersion.TLSv1_2: 771>, tls_max=<TLSVersion.MAXIMUM_SUPPORTED: -1>, update_adapter=True)

Modify ssl verification settings


  • ssl_verify:
    • True: Verify using builtin BYTE_CA_BUNDLE.
    • False: No SSL Verification.
    • *Str: Full path to a x509 PEM CA File or bundle.
  • ciphers: List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified.
  • options: List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified.
  • tls_min: Lowest acceptable TLS version. Default is ssl.TLSVersion.TLSv1_2
  • tls_max: Maximum acceptable TLS version. Default is ssl.TLSVersion.MAXIMUM_SUPPORTED
  • update_adapter: bool, call update adapter function after running to auto update adapter.

Returns: No return, mutates the SSL Context / session in place directly.

def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS,
               tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True):
    Modify ssl verification settings
      - **ssl_verify:**
         - **True:** Verify using builtin BYTE_CA_BUNDLE.
         - **False:** No SSL Verification.
         - ***Str:** Full path to a x509 PEM CA File or bundle.
      - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified.
      - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified.
      - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2`
      - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED`
      - **update_adapter:** bool, call update adapter function after running to auto update adapter.
    **Returns:** No return, mutates the SSL Context / session in place directly.
    self.verify = ssl_verify
    # if verify true/false, set ca_verify_file appropriately
    if isinstance(self.verify, bool):
        if self.verify:  # True
            # load default CA list and set default SSL ciphers.
            self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
            self._session.verify = True
        else:  # False
            # disable warnings for SSL certs.
            self._ca_ssl_context = ssl.SSLContext()
            self._ca_ssl_context.check_hostname = False
            self._ca_ssl_context.verify_mode = ssl.CERT_NONE
            self._session.verify = False
        # Not True/False, assume path to file/dir for Requests
        # set filename/filepath for context
        self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify)
        self._ca_ssl_context.check_hostname = True
        self._session.verify = True
    # set the CA independent values
    self._ca_ssl_context.minimum_version = tls_min
    self._ca_ssl_context.maximum_version = tls_max
    self._ca_ssl_context.options |= options
    # update the session adapter.
    if update_adapter:

def throw_error(

message, resp=None, cr=True, exception=<class 'cloudgenix.CloudGenixAPIError'>)

Non-recoverable error, write message to STDERR and raise exception


  • message: Message text
  • resp: Optional - CloudGenix SDK Response object
  • cr: Optional - Use (or not) Carriage Returns.
  • exception: Optional - Custom Exception to throw, otherwise uses CloudGenixAPIError

Returns: No Return, throws exception.

def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError):
    Non-recoverable error, write message to STDERR and raise exception
      - **message:** Message text
      - **resp:** Optional - CloudGenix SDK Response object
      - **cr:** Optional - Use (or not) Carriage Returns.
      - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError`
    **Returns:** No Return, throws exception.
    output = "ERROR: " + str(message)
    if cr:
        output += "\n"
    if resp is not None:
        output2 = str(jdout_detailed(resp))
        if cr:
            output2 += "\n"
    raise exception(message)

def throw_warning(

message, resp=None, cr=True)

Recoverable Warning.


  • message: Message text
  • resp: Optional - CloudGenix SDK Response object
  • cr: Optional - Use (or not) Carriage Returns.

Returns: No Return.

def throw_warning(message, resp=None, cr=True):
    Recoverable Warning.
      - **message:** Message text
      - **resp:** Optional - CloudGenix SDK Response object
      - **cr:** Optional - Use (or not) Carriage Returns.
    **Returns:** No Return.
    output = "WARNING: " + str(message)
    if cr:
        output += "\n"
    if resp is not None:
        output2 = str(jdout_detailed(resp))
        if cr:
            output2 += "\n"

def update_region_to_controller(

self, region)

Update the controller string with dynamic region info. Controller string should end up as <name[-env]>.<region>


  • region: region string.

Returns: No return value, mutates the controller in the class namespace

def update_region_to_controller(self, region):
    Update the controller string with dynamic region info.
    Controller string should end up as `<name[-env]>.<region>`
      - **region:** region string.
    **Returns:** No return value, mutates the controller in the class namespace
    # default region position in a list
    region_position = 1
    # Check for a global "ignore region" flag
    if self.ignore_region:
        # bypass
        api_logger.debug("IGNORE_REGION set, not updating controller region.")
    api_logger.debug("Updating Controller Region")
    api_logger.debug("CONTROLLER = %s", self.controller)
    api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig)
    api_logger.debug("CONTROLLER_REGION = %s", self.controller_region)
    # Check if this is an initial region use or an update region use
    if self.controller_orig:
        controller_base = self.controller_orig
        controller_base = self.controller
        self.controller_orig = self.controller
    # splice controller string
    controller_full_part_list = controller_base.split('.')
    for idx, part in enumerate(controller_full_part_list):
        # is the region already in the controller string?
        if region == part:
            # yes, controller already has appropriate region
            api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx)
            # update region if it is not already set.
            if self.controller_region != region:
                self.controller_region = region
                api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)
            # Update controller if not already matching
            if self.controller != controller_base:
                self.controller = controller_base
                api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)
    controller_part_count = len(controller_full_part_list)
    # handle short domain case
    if controller_part_count > 1:
        # insert region
        controller_full_part_list[region_position] = region
        self.controller = ".".join(controller_full_part_list)
        # short domain, just add region
        self.controller = ".".join(controller_full_part_list) + '.' + region
    # update SDK vars with region info
    self.controller_orig = controller_base
    self.controller_region = region
    api_logger.debug("UPDATED_CONTROLLER = %s", self.controller)
    api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig)
    api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region)

def update_session_adapter(

self, adapter=None, retry=None, ssl_context=None, adapter_url='https://')

Mount/remount the HTTPS session adapter after changes that affect it.


  • adapter: requests.adapters.HTTPAdapter, or will use/create default adapter.
  • retry: urllib3.util.retry.Retry object, or will use default retry object.
  • ssl_context: ssl.SSLContext object, or will use default SSL Context.
  • adapter_url: URL to use for matching requests. Defaults to 'https://'

Returns: No return, mutates the session adapter directly

def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"):
    Mount/remount the HTTPS session adapter after changes that affect it.
      - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter.
      - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object.
      - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context.
      - **adapter_url:** URL to use for matching requests. Defaults to 'https://'
    **Returns:** No return, mutates the session adapter directly
    # use default or specified session objects.
    use_retry = retry or self._rest_call_retry_object
    use_ssl_context = ssl_context or self._ca_ssl_context
    # if object adapter has not been built, create a new one.
    if adapter is None:
        # does an effective retry object exist?
        if use_retry is None:
            self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context)
            self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context,
        # if adapter is specified, nothing else should be specified.
        if retry is not None or ssl_context is not None:
            self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored."
                               " retry and ssl_contexts need to be set in the adapter itself.")
    # Pick the specified or default adapter.
    use_adapter = adapter or self._rest_connection_adapter
    # mount the adapter.
    self._session.mount(adapter_url, use_adapter)

def url_decode(


URL Decode function using REGEX


  • url: URLENCODED text string

Returns: Non URLENCODED string

def url_decode(url):
    URL Decode function using REGEX
      - **url:** URLENCODED text string
    **Returns:** Non URLENCODED string
    return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(, 16)), url)

def view_cookies(


View current cookies in the requests.Session() object

Returns: List of Dicts, one cookie per Dict.

def view_cookies(self):
    View current cookies in the `requests.Session()` object
    **Returns:** List of Dicts, one cookie per Dict.
    return_list = []
    for cookie in self._session.cookies:
    return return_list

def view_headers(


View current headers in the requests.Session() object

Returns: Dict, Key header, value is header value.

def view_headers(self):
    View current headers in the `requests.Session()` object
    **Returns:** Dict, Key header, value is header value.
    return dict(self._session.headers)

def view_rest_retry(

self, url=None)

View current rest retry settings in the requests.Session() object


  • url: URL to use to determine retry methods for. Defaults to 'https://'

Returns: Dict, Key header, value is header value.

def view_rest_retry(self, url=None):
    View current rest retry settings in the `requests.Session()` object
      - **url:** URL to use to determine retry methods for. Defaults to 'https://'
    **Returns:** Dict, Key header, value is header value.
    if url is None:
        url = "https://"
    return vars(self._session.get_adapter(url).max_retries)

def websocket_add_headers(

self, headers)

Permanently add/overwrite headers to the API() WebSocket object


  • headers: dict with header/value

Returns: Mutates API() object, no return.

def websocket_add_headers(self, headers):
    Permanently add/overwrite headers to the `API()` WebSocket object
      - **headers:** dict with header/value
    **Returns:** Mutates `API()` object, no return.

def websocket_call(

self, url, *args, **kwargs)

Generic WebSocket worker function, automatically uses authentication from cloudgenix.API() session.


  • url: URL for the REST call
  • Any other websocket.client.Connect argument or keyword argument (see NOTE: below)

Returns: websocket.client.Connect object.

NOTE: Any websocket.client.Connect supported argument or keyword argument will be accepted, and will be passed to the underlying Connect() request. ssl and extra_header keyword arguments will override the SDK auto-generated cookies/headers and SSL contexts used for authentication. For more info on available options, see

def websocket_call(self, url, *args, **kwargs):
    Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session.
      - **url:** URL for the REST call
      - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below)
    **Returns:** `websocket.client.Connect` object.
    **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will
    be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK
    auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see
    headers = {}
    # add session headers
    # Get cookies from requests.
    cookies = self._session.cookies.get_dict()
    # create cookie header from the cookies in Requests
    headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()])
    # check for host header
    host_header = headers.get("Host")
    force_host_arg = None
    if host_header is not None:
        # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass
        # a separate IP to connect to as 'host' argument to `websockets.client.Connect`.
        # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host
        # header, and remove the host header - then send the original value to `Connect` as host kwarg.
        # split controller at "//" ( to get host.
        controller_host = self.controller.split("//")[1]
        force_host_arg = controller_host
        # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words)
        new_url = url.replace(force_host_arg, host_header, 1)
        api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url))
        url = new_url
        # delete the host header
        del headers["Host"]
    # convert the headers dictionary to a list of tuples.
    header_tuple_list = [(key, value) for key, value in headers.items()]
    # Create argument tuple
    ws_args = (url,) + args
    # Create keyword args
    ws_kwargs = {
        "ssl": self._ca_ssl_context,
        "extra_headers": header_tuple_list
    # check for force host arg
    if force_host_arg is not None:
        api_logger.debug("Forcing connection to host {0}".format(force_host_arg))
        ws_kwargs["host"] = force_host_arg
    # Override automatic with any manually passed kwargs
    return websockets.connect(*ws_args, **ws_kwargs)

def websocket_remove_header(

self, header)

Permanently remove a single header from the API() WebSocket object


  • header: str of single header to remove

Returns: Mutates API() object, no return.

def websocket_remove_header(self, header):
    Permanently remove a single header from the `API()` WebSocket object
      - **header:** str of single header to remove
    **Returns:** Mutates `API()` object, no return.
    # check for header first. Return silently if it does not exist.
    if self._websocket_headers.get(header) is not None:
        del self._websocket_headers[header]

def websocket_view_headers(


View current headers in the API() WebSocket object

Returns: Dict, Key header, value is header value.

def websocket_view_headers(self):
    View current headers in the `API()` WebSocket object
    **Returns:** Dict, Key header, value is header value.
    return dict(self._websocket_headers)

Instance variables

var delete

API object link to cloudgenix.delete_api.Delete

var get

API object link to cloudgenix.get_api.Get

var interactive

var patch

API object link to cloudgenix.patch_api.Patch

var post

API object link to cloudgenix.post_api.Post

var put

API object link to cloudgenix.put_api.Put

var update_info_url

Update Info URL for use once Constructor Created.

var version

Version string for use once Constructor created.

var ws

API object link to

class CloudGenixAPIError

Custom exception for errors when not exiting.

class CloudGenixAPIError(Exception):
    Custom exception for errors when not exiting.

Ancestors (in MRO)

Class variables

var args

class TlsHttpAdapter

A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context.

This allows for fine-grained SSL Cipher, Options and CA specification.

class TlsHttpAdapter(HTTPAdapter):
    A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context.

    This allows for fine-grained SSL Cipher, Options and CA specification.
    def __init__(self, ssl_context=None, **kwargs):
        if ssl_context is None:
            Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK,
             but is a desirable alternative to nothing.
            self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
            self.ssl_context.options |= DEFAULT_SSL_OPTIONS
            self.ssl_context = ssl_context
        super(TlsHttpAdapter, self).__init__(**kwargs)

    def init_poolmanager(self, *args, **kwargs):
        kwargs['ssl_context'] = self.ssl_context
        return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs)

    def proxy_manager_for(self, *args, **kwargs):
        kwargs['ssl_context'] = self.ssl_context
        return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs)

Ancestors (in MRO)

  • TlsHttpAdapter
  • requests.adapters.HTTPAdapter
  • requests.adapters.BaseAdapter
  • builtins.object

Static methods

def __init__(

self, ssl_context=None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, ssl_context=None, **kwargs):
    if ssl_context is None:
        Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK,
         but is a desirable alternative to nothing.
        self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii'))
        self.ssl_context.options |= DEFAULT_SSL_OPTIONS
        self.ssl_context = ssl_context
    super(TlsHttpAdapter, self).__init__(**kwargs)

def add_headers(

self, request, **kwargs)

Add any headers needed by the connection. As of v2.0 this does nothing by default, but is left for overriding by users that subclass the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param request: The :class:PreparedRequest <PreparedRequest> to add headers to. :param kwargs: The keyword arguments from the call to send().

def add_headers(self, request, **kwargs):
    """Add any headers needed by the connection. As of v2.0 this does
    nothing by default, but is left for overriding by users that subclass
    the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    This should not be called from user code, and is only exposed for use
    when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    :param request: The :class:`PreparedRequest <PreparedRequest>` to add headers to.
    :param kwargs: The keyword arguments from the call to send().

def build_response(

self, req, resp)

Builds a :class:Response <requests.Response> object from a urllib3 response. This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>

:param req: The :class:PreparedRequest <PreparedRequest> used to generate the response. :param resp: The urllib3 response object. :rtype: requests.Response

def build_response(self, req, resp):
    """Builds a :class:`Response <requests.Response>` object from a urllib3
    response. This should not be called from user code, and is only exposed
    for use when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`
    :param req: The :class:`PreparedRequest <PreparedRequest>` used to generate the response.
    :param resp: The urllib3 response object.
    :rtype: requests.Response
    response = Response()
    # Fallback to None if there's no status_code, for whatever reason.
    response.status_code = getattr(resp, "status", None)
    # Make headers case-insensitive.
    response.headers = CaseInsensitiveDict(getattr(resp, "headers", {}))
    # Set encoding.
    response.encoding = get_encoding_from_headers(response.headers)
    response.raw = resp
    response.reason = response.raw.reason
    if isinstance(req.url, bytes):
        response.url = req.url.decode("utf-8")
        response.url = req.url
    # Add new cookies from the server.
    extract_cookies_to_jar(response.cookies, req, resp)
    # Give the Response some context.
    response.request = req
    response.connection = self
    return response

def cert_verify(

self, conn, url, verify, cert)

Verify a SSL certificate. This method should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param conn: The urllib3 connection object associated with the cert. :param url: The requested URL. :param verify: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: The SSL certificate to verify.

def cert_verify(self, conn, url, verify, cert):
    """Verify a SSL certificate. This method should not be called from user
    code, and is only exposed for use when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    :param conn: The urllib3 connection object associated with the cert.
    :param url: The requested URL.
    :param verify: Either a boolean, in which case it controls whether we verify
        the server's TLS certificate, or a string, in which case it must be a path
        to a CA bundle to use
    :param cert: The SSL certificate to verify.
    if url.lower().startswith("https") and verify:
        cert_loc = None
        # Allow self-specified cert location.
        if verify is not True:
            cert_loc = verify
        if not cert_loc:
            cert_loc = extract_zipped_paths(DEFAULT_CA_BUNDLE_PATH)
        if not cert_loc or not os.path.exists(cert_loc):
            raise OSError(
                f"Could not find a suitable TLS CA certificate bundle, "
                f"invalid path: {cert_loc}"
        conn.cert_reqs = "CERT_REQUIRED"
        if not os.path.isdir(cert_loc):
            conn.ca_certs = cert_loc
            conn.ca_cert_dir = cert_loc
        conn.cert_reqs = "CERT_NONE"
        conn.ca_certs = None
        conn.ca_cert_dir = None
    if cert:
        if not isinstance(cert, basestring):
            conn.cert_file = cert[0]
            conn.key_file = cert[1]
            conn.cert_file = cert
            conn.key_file = None
        if conn.cert_file and not os.path.exists(conn.cert_file):
            raise OSError(
                f"Could not find the TLS certificate file, "
                f"invalid path: {conn.cert_file}"
        if conn.key_file and not os.path.exists(conn.key_file):
            raise OSError(
                f"Could not find the TLS key file, invalid path: {conn.key_file}"

def close(


Disposes of any internal state.

Currently, this closes the PoolManager and any active ProxyManager, which closes any pooled connections.

def close(self):
    """Disposes of any internal state.
    Currently, this closes the PoolManager and any active ProxyManager,
    which closes any pooled connections.
    for proxy in self.proxy_manager.values():

def get_connection(

self, url, proxies=None)

Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. :rtype: urllib3.ConnectionPool

def get_connection(self, url, proxies=None):
    """Returns a urllib3 connection for the given URL. This should not be
    called from user code, and is only exposed for use when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    :param url: The URL to connect to.
    :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
    :rtype: urllib3.ConnectionPool
    proxy = select_proxy(url, proxies)
    if proxy:
        proxy = prepend_scheme_if_needed(proxy, "http")
        proxy_url = parse_url(proxy)
        if not
            raise InvalidProxyURL(
                "Please check proxy URL. It is malformed "
                "and could be missing the host."
        proxy_manager = self.proxy_manager_for(proxy)
        conn = proxy_manager.connection_from_url(url)
        # Only scheme should be lower case
        parsed = urlparse(url)
        url = parsed.geturl()
        conn = self.poolmanager.connection_from_url(url)
    return conn

def init_poolmanager(

self, *args, **kwargs)

Initializes a urllib3 PoolManager.

This method should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.

def init_poolmanager(self, *args, **kwargs):
    kwargs['ssl_context'] = self.ssl_context
    return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs)

def proxy_headers(

self, proxy)

Returns a dictionary of the headers to add to any request sent through a proxy. This works with urllib3 magic to ensure that they are correctly sent to the proxy, rather than in a tunnelled request if CONNECT is being used.

This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param proxy: The url of the proxy being used for this request. :rtype: dict

def proxy_headers(self, proxy):
    """Returns a dictionary of the headers to add to any request sent
    through a proxy. This works with urllib3 magic to ensure that they are
    correctly sent to the proxy, rather than in a tunnelled request if
    CONNECT is being used.
    This should not be called from user code, and is only exposed for use
    when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    :param proxy: The url of the proxy being used for this request.
    :rtype: dict
    headers = {}
    username, password = get_auth_from_url(proxy)
    if username:
        headers["Proxy-Authorization"] = _basic_auth_str(username, password)
    return headers

def proxy_manager_for(

self, *args, **kwargs)

Return urllib3 ProxyManager for the given proxy.

This method should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param proxy: The proxy to return a urllib3 ProxyManager for. :param proxy_kwargs: Extra keyword arguments used to configure the Proxy Manager. :returns: ProxyManager :rtype: urllib3.ProxyManager

def proxy_manager_for(self, *args, **kwargs):
    kwargs['ssl_context'] = self.ssl_context
    return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs)

def request_url(

self, request, proxies)

Obtain the url to use when making the final request.

If the message is being sent through a HTTP proxy, the full URL has to be used. Otherwise, we should only use the path portion of the URL.

This should not be called from user code, and is only exposed for use when subclassing the :class:HTTPAdapter <requests.adapters.HTTPAdapter>.

:param request: The :class:PreparedRequest <PreparedRequest> being sent. :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs. :rtype: str

def request_url(self, request, proxies):
    """Obtain the url to use when making the final request.
    If the message is being sent through a HTTP proxy, the full URL has to
    be used. Otherwise, we should only use the path portion of the URL.
    This should not be called from user code, and is only exposed for use
    when subclassing the
    :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
    :param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
    :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs.
    :rtype: str
    proxy = select_proxy(request.url, proxies)
    scheme = urlparse(request.url).scheme
    is_proxied_http_request = proxy and scheme != "https"
    using_socks_proxy = False
    if proxy:
        proxy_scheme = urlparse(proxy).scheme.lower()
        using_socks_proxy = proxy_scheme.startswith("socks")
    url = request.path_url
    if is_proxied_http_request and not using_socks_proxy:
        url = urldefragauth(request.url)
    return url

def send(

self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None)

Sends PreparedRequest object. Returns Response object.

:param request: The :class:PreparedRequest <PreparedRequest> being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:(connect timeout, read timeout) <timeouts> tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response

def send(
    self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
    """Sends PreparedRequest object. Returns Response object.
    :param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
    :param stream: (optional) Whether to stream the request content.
    :param timeout: (optional) How long to wait for the server to send
        data before giving up, as a float, or a :ref:`(connect timeout,
        read timeout) <timeouts>` tuple.
    :type timeout: float or tuple or urllib3 Timeout object
    :param verify: (optional) Either a boolean, in which case it controls whether
        we verify the server's TLS certificate, or a string, in which case it
        must be a path to a CA bundle to use
    :param cert: (optional) Any user-provided SSL certificate to be trusted.
    :param proxies: (optional) The proxies dictionary to apply to the request.
    :rtype: requests.Response
        conn = self.get_connection(request.url, proxies)
    except LocationValueError as e:
        raise InvalidURL(e, request=request)
    self.cert_verify(conn, request.url, verify, cert)
    url = self.request_url(request, proxies)
    chunked = not (request.body is None or "Content-Length" in request.headers)
    if isinstance(timeout, tuple):
            connect, read = timeout
            timeout = TimeoutSauce(connect=connect, read=read)
        except ValueError:
            raise ValueError(
                f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, "
                f"or a single float to set both timeouts to the same value."
    elif isinstance(timeout, TimeoutSauce):
        timeout = TimeoutSauce(connect=timeout, read=timeout)
        resp = conn.urlopen(
    except (ProtocolError, OSError) as err:
        raise ConnectionError(err, request=request)
    except MaxRetryError as e:
        if isinstance(e.reason, ConnectTimeoutError):
            # TODO: Remove this in 3.0.0: see #2811
            if not isinstance(e.reason, NewConnectionError):
                raise ConnectTimeout(e, request=request)
        if isinstance(e.reason, ResponseError):
            raise RetryError(e, request=request)
        if isinstance(e.reason, _ProxyError):
            raise ProxyError(e, request=request)
        if isinstance(e.reason, _SSLError):
            # This branch is for urllib3 v1.22 and later.
            raise SSLError(e, request=request)
        raise ConnectionError(e, request=request)
    except ClosedPoolError as e:
        raise ConnectionError(e, request=request)
    except _ProxyError as e:
        raise ProxyError(e)
    except (_SSLError, _HTTPError) as e:
        if isinstance(e, _SSLError):
            # This branch is for urllib3 versions earlier than v1.22
            raise SSLError(e, request=request)
        elif isinstance(e, ReadTimeoutError):
            raise ReadTimeout(e, request=request)
        elif isinstance(e, _InvalidHeader):
            raise InvalidHeader(e, request=request)
    return self.build_response(request, resp)



CloudGenix Explicit CA Certificate Bundle for API calls (CA Pinning).

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - DELETE

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - GET

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python Interactive SDK Helper functions

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - PATCH

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - POST

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - PUT

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT


CloudGenix Python SDK - WebSocket Functions

Author: CloudGenix

Copyright: (c) 2017-2023 CloudGenix, Inc

License: MIT