cloudgenix module
Python3 SDK for the CloudGenix AppFabric
Version: v6.4.1b1
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
Location: https://github.com/CloudGenix/sdk-python
Synopsis
Intended to be a small, lightweight SDK wrapper around the CloudGenix API for easy use. Initial version requires knowledge of JSON/Dict objects for POST/PUT/PATCH operations.
Requirements
- Active CloudGenix Account
- Python >= 2.7 or >=3.7
- Python modules:
- Requests - http://docs.python-requests.org/en/master/
- Websockets (if Python > 3.6) >= 8.1- https://websockets.readthedocs.io/en/stable/index.html
- urllib3 >= 2.0.0 - https://urllib3.readthedocs.io/en/stable/
Code Example
Super-simplified example code (rewrite of example.py in ~4 lines of code):
# Import the CloudGenix SDK API constructor and JSON response pretty printer from cloudgenix import API, jd # Instantiate the CloudGenix API constructor sdk = API() # Call CloudGenix API login using the Interactive helpers (Handle SAML2.0 login and MSP functions too!). sdk.interactive.login() # Print a dump of the list of sites for your selected account jd(sdk.get.sites())
License
MIT
For more info
- Get help and additional CloudGenix Documentation at https://support.cloudgenix.com
- View the autogenerated documentation in the
docs/
directory, or at https://cloudgenix.github.io/sdk-python/. - View in-python help using
help()
functions. (example:help(sdk.get.login)
)
""" Python3 SDK for the CloudGenix AppFabric **Version:** v6.4.1b1 **Author:** CloudGenix **Copyright:** (c) 2017-2024 CloudGenix, Inc **License:** MIT **Location:** <https://github.com/CloudGenix/sdk-python> #### Synopsis Intended to be a small, lightweight SDK wrapper around the CloudGenix API for easy use. Initial version requires knowledge of JSON/Dict objects for POST/PUT/PATCH operations. #### Requirements * Active CloudGenix Account * Python >= 2.7 or >=3.7 * Python modules: * Requests - <http://docs.python-requests.org/en/master/> * Websockets (if Python > 3.6) >= 8.1- <https://websockets.readthedocs.io/en/stable/index.html> * urllib3 >= 2.0.0 - <https://urllib3.readthedocs.io/en/stable/> #### Code Example Super-simplified example code (rewrite of example.py in ~4 lines of code): #!python # Import the CloudGenix SDK API constructor and JSON response pretty printer from cloudgenix import API, jd # Instantiate the CloudGenix API constructor sdk = API() # Call CloudGenix API login using the Interactive helpers (Handle SAML2.0 login and MSP functions too!). sdk.interactive.login() # Print a dump of the list of sites for your selected account jd(sdk.get.sites()) #### License MIT #### For more info * Get help and additional CloudGenix Documentation at <https://support.cloudgenix.com> * View the autogenerated documentation in the `docs/` directory, or at <https://cloudgenix.github.io/sdk-python/>. * View in-python help using `help()` functions. (example: `help(sdk.get.login)`) """ from __future__ import unicode_literals import logging import json import re import sys import ssl import requests import requests.adapters import urllib3 from http import cookiejar as cookielib from requests.adapters import HTTPAdapter import websockets from .ws_api import WebSockets from .get_api import Get from .post_api import Post from .patch_api import Patch from .put_api import Put from .delete_api import Delete from .interactive import Interactive # CA Certificate bundle from .ca_bundle import CG_CA_BUNDLE as _CG_CA_BUNDLE # python 2 and 3 handling if sys.version_info >= (3, 7,): # Python 3.7 or higher text_type = str binary_type = bytes elif sys.version_info >= (3, ): raise Exception("ERROR: Python 3.0-3.6 support is deprecated. Please migrate to a 3.7+ Python interpreter.") else: # Python 2.x, is deprecated. raise Exception("ERROR: Python 2.x support is deprecated. Please migrate to a 3.7+ Python interpreter.") BYTE_CA_BUNDLE = binary_type(_CG_CA_BUNDLE) """ Explicit CA bundle for CA Pinning - Root Certificates for the CloudGenix Controller API Endpoint. Loaded from `cloudgenix.ca_bundle.CG_CA_BUNDLE` """ DEFAULT_SSL_CIPHERS = ":".join( [ "DEFAULT", "!aNULL", "!eNULL", "!MD5", "!3DES", "!DES", "!RC4", "!IDEA", "!SEED", "!aDSS", "!SRP", "!PSK" ] ) """ Default set of SSL/TLS ciphers for communication. Default is meant to be widely compatible with dangerous ciphers removed. """ DEFAULT_SSL_OPTIONS = 0 | ssl.OP_NO_COMPRESSION | ssl.OP_NO_TICKET """ Default SSL/TLS Options Default TLS v1.2/v1.3 or higher. """ __author__ = "CloudGenix Developer Support <developers@cloudgenix.com>" __email__ = "developers@cloudgenix.com" __copyright__ = "Copyright (c) 2017-2024 CloudGenix, Inc" __license__ = """ MIT License Copyright (c) 2017-2024 CloudGenix, Inc Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ # Set logging to function name api_logger = logging.getLogger(__name__) """logging.getlogger object to enable debug printing via `cloudgenix.API.set_debug`""" ws_logger = logging.getLogger('websockets') """websocket logger is handled slightly differently, so we will have a seperate handle.""" # Version of SDK version = "6.4.1b1" """SDK Version string""" __version__ = version # PyPI URL for checking for updates. update_info_url = "https://pypi.org/pypi/cloudgenix/json" """URL for checking for updates.""" # regex SDK_BUILD_REGEX = re.compile( r'^' # start of string r'(?P<major>[0-9]+)' # major number r'\.' # literal . character r'(?P<minor>[0-9]+)' # minor number r'\.' # literal . character r'(?P<patch>[0-9]+)' # patch number r'b' # literal 'b' character r'(?P<build>[0-9]+)' # build number r'$' # end of string ) """REGEX for parsing SDK builds""" def jd(api_response): """ JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body. Example: `jd(sdk.get.sites())` **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object **Returns:** No Return, directly prints all output. """ print(jdout(api_response)) return def jdout(api_response): """ JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string instead of directly printing content. **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object **Returns:** Pretty-formatted text of the Response body """ try: # attempt to output the cgx_content. should always be a Dict if it exists. output = json.dumps(api_response.cgx_content, indent=4) except (TypeError, ValueError, AttributeError): # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj. try: output = json.dumps(api_response, indent=4) except (TypeError, ValueError, AttributeError): # Same issue, just raw output the passed data. Let any exceptions happen here. output = api_response return output def jd_detailed(api_response, sensitive=False): """ JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. Example: `jd_detailed(cgx_sess.get.sites())` **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers). **Returns:** No Return, directly prints all output. """ print(jdout_detailed(api_response, sensitive=sensitive)) return def jdout_detailed(api_response, sensitive=False): """ JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. This function returns a string instead of directly printing content. **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers). **Returns:** Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers, and Response Body. """ try: # try to be super verbose. output = "REQUEST: {0} {1}\n".format(api_response.request.method, api_response.request.path_url) output += "REQUEST HEADERS:\n" for key, value in api_response.request.headers.items(): # look for sensitive values if key.lower() in ['cookie'] and not sensitive: # we need to do some work to watch for the AUTH_TOKEN cookie. Split on cookie separator cookie_list = value.split('; ') muted_cookie_list = [] for cookie in cookie_list: # check if cookie starts with a permutation of AUTH_TOKEN/whitespace. if cookie.lower().strip().startswith('auth_token='): # first 11 chars of cookie with whitespace removed + mute string. newcookie = cookie.strip()[:11] + "\"<SENSITIVE - NOT SHOWN BY DEFAULT>\"" muted_cookie_list.append(newcookie) else: muted_cookie_list.append(cookie) # got list of cookies, muted as needed. recombine. muted_value = "; ".join(muted_cookie_list) output += "\t{0}: {1}\n".format(key, muted_value) elif key.lower() in ['x-auth-token'] and not sensitive: output += "\t{0}: {1}\n".format(key, "<SENSITIVE - NOT SHOWN BY DEFAULT>") else: output += "\t{0}: {1}\n".format(key, value) # if body not present, output blank. if not api_response.request.body: output += "REQUEST BODY:\n{0}\n\n".format({}) else: try: # Attempt to load JSON from string to make it look beter. output += "REQUEST BODY:\n{0}\n\n".format(json.dumps(json.loads(api_response.request.body), indent=4)) except (TypeError, ValueError, AttributeError): # if pretty call above didn't work, just toss it to jdout to best effort it. output += "REQUEST BODY:\n{0}\n\n".format(jdout(api_response.request.body)) output += "RESPONSE: {0} {1}\n".format(api_response.status_code, api_response.reason) output += "RESPONSE HEADERS:\n" for key, value in api_response.headers.items(): output += "\t{0}: {1}\n".format(key, value) try: # look for CGX content first. output += "RESPONSE DATA:\n{0}".format(json.dumps(api_response.cgx_content, indent=4)) except (TypeError, ValueError, AttributeError): # look for standard response data. output += "RESPONSE DATA:\n{0}".format(json.dumps(json.loads(api_response.content), indent=4)) except (TypeError, ValueError, AttributeError, UnicodeDecodeError): # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj. try: output = json.dumps(api_response, indent=4) except (TypeError, ValueError, AttributeError): # Same issue, just raw output the passed data. Let any exceptions happen here. output = api_response return output class CloudGenixAPIError(Exception): """ Custom exception for errors when not exiting. """ pass class TlsHttpAdapter(HTTPAdapter): """ A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context. This allows for fine-grained SSL Cipher, Options and CA specification. """ def __init__(self, ssl_context=None, **kwargs): if ssl_context is None: """ Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK, but is a desirable alternative to nothing. """ self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self.ssl_context.set_ciphers(DEFAULT_SSL_CIPHERS) self.ssl_context.options |= DEFAULT_SSL_OPTIONS else: self.ssl_context = ssl_context super(TlsHttpAdapter, self).__init__(**kwargs) def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs) def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs) class API(object): """ Class for interacting with the CloudGenix API. Subclass objects are linked to various operations. - get: links to `cloudgenix.get_api.Get` for API Get Operations - post: links to `cloudgenix.post_api.Post` for API Post Operations - put: links to `cloudgenix.put_api.Put` for API Put Operations - patch: links to `cloudgenix.patch_api.Patch` for API Patch Operations - delete: links to `cloudgenix.delete_api.Delete` for API Delete Operations """ # Global structure, previously sdk_vars # Authentication is now stored as cookies, as part of the requests.Session() object. # Authentication can also be stored as an 'X-Auth-Token:' header, but cookies take precedence. controller = 'https://api.elcapitan.cloudgenix.com' """Current active controller URL""" controller_orig = None """Original Controller URL as entered - before Region re-parse""" controller_region = None """Controller Region, if present.""" ignore_region = False """Ignore regions returned by controller, and use explicit controller only.""" _debuglevel = 0 """debug level - set via set_debug()""" tenant_id = None """Numeric ID of tenant (account) - should be set after initial login from `cloudgenix.get_api.Get.profile` data""" tenant_name = None """Name of tenant (account), should be set after initial login from `cloudgenix.get_api.Get.profile` data""" token_session = None """Is this login from a static AUTH_TOKEN (True), or a standard login (False)""" is_esp = None """Is the current tenant an ESP/MSP?""" client_id = None """If ESP/MSP, is client currently logged in""" address_string = None """String representing address, optional - should be pulled from get.profile() data.""" email = None """Email (username) for session""" operator_id = None """Operator ID of current session.""" _user_id = None """Deprecated (replaced by `operator_id`.) Left for backwards compatibility.""" _password = None """user password - only used when argv passed, and cleared quickly""" roles = None """Roles list""" verify = True """Verify SSL certificate.""" version = None """Version string for use once Constructor created.""" ca_verify_filename = None """DEPRECATED: Filename to use for CA verification. Moved to modern python `ssl`, updated to only passing filename in event of local file.""" _ca_verify_file_handle = None """File handle for CA verification""" _ca_ssl_context = None """`ssl` library context for controller connections""" _rest_connection_adapter = None """`requests.adapters.HTTPAdapter` object for use by requests to request content.""" rest_call_retry = False """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" rest_call_max_retry = 30 """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" rest_call_sleep = 10 """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" _rest_call_retry_object = None """`urllib3.util.retry.Retry` object for use by SDK to do retries""" rest_call_timeout = 240 """Maximum time to wait for any data from REST server.""" cache = {} """API response cache (Future)""" _parent_namespace = None """holder for namespace for wrapper classes.""" _session = None """holder for requests.Session() object""" _websocket_headers = None """holder for WebSocket Headers""" update_check = True """Notify users of available update to SDK""" update_info_url = None """Update Info URL for use once Constructor Created.""" def __init__(self, controller=controller, ssl_verify=verify, update_check=True): """ Create the API constructor object - **controller:** Initial Controller URL String - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details. - **update_check:** Bool to Enable/Disable SDK update check and new release notifications. """ # set version and update url from outer scope. self.version = version """Version string for use once Constructor created.""" self.update_info_url = update_info_url """Update Info URL for use once Constructor Created.""" # try: if controller and isinstance(controller, (binary_type, text_type)): self.controller = controller.lower() self.controller_orig = controller.lower() # Create Requests Session. self._session = requests.Session() # set ssl context if isinstance(ssl_verify, (binary_type, text_type, bool)): self.ssl_verify(ssl_verify, update_adapter=False) # Set default REST retry parameters self.modify_rest_retry(update_adapter=False) # update the HTTP Adapter for requests. self.update_session_adapter() # handle update check if isinstance(update_check, bool): self.update_check = update_check if update_check: self.notify_for_new_version() # Identify SDK in the User-Agent. user_agent = self._session.headers.get('User-Agent') if user_agent: user_agent += ' (CGX SDK v{0})'.format(self.version) else: user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version) # Update Headers self._session.headers.update({ 'Accept': 'application/json', 'User-Agent': text_type(user_agent) }) # except Exception as e: # raise ValueError("Unable to create Requests session object: {0}.".format(e)) api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s", self.controller, self.verify, self._session) # Update Headers for WebSocket requests websocketlib_name = websockets.__name__ websocketlib_version = websockets.version.version if not websocketlib_name: websocketlib_name = 'websockets' if not websocketlib_version: websocketlib_version = 'UNKNOWN' ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name, websocketlib_version, self.version) self._websocket_headers = { 'Accept': 'application/json', 'User-Agent': text_type(ws_user_agent) } # Bind API method classes to this object subclasses = self._subclass_container() self.get = subclasses["get"]() """API object link to `cloudgenix.get_api.Get`""" self.post = subclasses["post"]() """API object link to `cloudgenix.post_api.Post`""" self.put = subclasses["put"]() """API object link to `cloudgenix.put_api.Put`""" self.patch = subclasses["patch"]() """API object link to `cloudgenix.patch_api.Patch`""" self.delete = subclasses["delete"]() """API object link to `cloudgenix.delete_api.Delete`""" self.interactive = subclasses["interactive"]() """API object link to `cloudgenix.interactive.Interactive`""" self.ws = subclasses["ws"]() """API object link to `cloudgenix.ws.WebSockets`""" return def notify_for_new_version(self): """ Check for a new version of the SDK on API constructor instantiation. If new version found, print Notification to STDERR. On failure of this check, fail silently. **Returns:** No item returned, directly prints notification to `sys.stderr`. """ # broad exception clause, if this fails for any reason just return. try: recommend_update = False update_check_resp = requests.get(self.update_info_url, timeout=3) web_version = update_check_resp.json()["info"]["version"] api_logger.debug("RETRIEVED_VERSION: %s", web_version) available_version = SDK_BUILD_REGEX.search(web_version).groupdict() current_version = SDK_BUILD_REGEX.search(self.version).groupdict() available_major = available_version.get('major') available_minor = available_version.get('minor') available_patch = available_version.get('patch') available_build = available_version.get('build') current_major = current_version.get('major') current_minor = current_version.get('minor') current_patch = current_version.get('patch') current_build = current_version.get('build') api_logger.debug("AVAILABLE_VERSION: %s", available_version) api_logger.debug("CURRENT_VERSION: %s", current_version) # check for major/minor version differences, do not alert for build differences. if available_major > current_major: recommend_update = True elif available_major >= current_major and available_minor > current_minor: recommend_update = True elif available_major >= current_major and available_minor >= current_minor and \ available_patch > current_patch: recommend_update = True api_logger.debug("NEED_UPDATE: %s", recommend_update) # notify. if recommend_update: sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 " "months after release of a new version.\n" "\tLatest Version: {0}\n" "\tCurrent Version: {1}\n" "\tFor more info, see 'https://github.com/cloudgenix/sdk-python'. Additionally, this " "message can be suppressed by instantiating the API with API(update_check=False).\n\n" "".format(web_version, self.version)) return except Exception: # just return and continue. return def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS, tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True): """ Modify ssl verification settings **Parameters:** - **ssl_verify:** - **True:** Verify using builtin BYTE_CA_BUNDLE. - **False:** No SSL Verification. - ***Str:** Full path to a x509 PEM CA File or bundle. - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified. - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified. - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2` - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED` - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the SSL Context / session in place directly. """ self.verify = ssl_verify # if verify true/false, set ca_verify_file appropriately if isinstance(self.verify, bool): if self.verify: # True # load default CA list and set default SSL ciphers. self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self._session.verify = True else: # False # disable warnings for SSL certs. urllib3.disable_warnings() self._ca_ssl_context = ssl.SSLContext() self._ca_ssl_context.check_hostname = False self._ca_ssl_context.verify_mode = ssl.CERT_NONE self._session.verify = False else: # Not True/False, assume path to file/dir for Requests # set filename/filepath for context self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify) self._ca_ssl_context.check_hostname = True self._session.verify = True # set the CA independent values self._ca_ssl_context.minimum_version = tls_min self._ca_ssl_context.maximum_version = tls_max self._ca_ssl_context.set_ciphers(ciphers) self._ca_ssl_context.options |= options # update the session adapter. if update_adapter: self.update_session_adapter() return def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0, allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None, backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True, respect_retry_after_header=True, update_adapter=True): """ Modify retry parameters for the SDKs rest call object. Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to the underlying `requests.Session` object. Default retry with total=8 and backoff_factor=0.705883: - Try 1, 0 delay (0 total seconds) - Try 2, 0 delay (0 total seconds) - Try 3, 0.705883 delay (0.705883 total seconds) - Try 4, 1.411766 delay (2.117649 total seconds) - Try 5, 2.823532 delay (4.941181 total seconds) - Try 6, 5.647064 delay (10.588245 total seconds) - Try 7, 11.294128 delay (21.882373 total seconds) - Try 8, 22.588256 delay (44.470629 total seconds) - Try 9, 45.176512 delay (89.647141 total seconds) - Try 10, 90.353024 delay (180.000165 total seconds) **Parameters:** - **total:** int, Total number of retries to allow. Takes precedence over other counts. - **connect:** int, How many connection-related errors to retry on. - **read:** int, How many times to retry on read errors. - **redirect:** int, How many redirects to perform. loops. - **status:** int, How many times to retry on bad status codes. - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors. - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on. - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on. - **backoff_factor:** float, A backoff factor to apply between attempts after the second try. - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response. - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses. - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes. - **adapter_url:** string, URL match for these retry values (default `https://`) - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the retry / session directly """ # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified. if status_forcelist is None: status_forcelist = (413, 429, 502, 503, 504) retry = urllib3.util.retry.Retry(total=total, connect=connect, read=read, redirect=redirect, status=status, other=other, allowed_methods=allowed_methods, status_forcelist=status_forcelist, backoff_factor=backoff_factor, raise_on_redirect=raise_on_redirect, raise_on_status=raise_on_status, respect_retry_after_header=respect_retry_after_header) # set updated retry object. self._rest_call_retry_object = retry # call update to mount/remount session adapter if update_adapter: self.update_session_adapter() return def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"): """ Mount/remount the HTTPS session adapter after changes that affect it. **Parameters:** - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter. - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object. - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context. - **adapter_url:** URL to use for matching requests. Defaults to 'https://' **Returns:** No return, mutates the session adapter directly """ # use default or specified session objects. use_retry = retry or self._rest_call_retry_object use_ssl_context = ssl_context or self._ca_ssl_context # if object adapter has not been built, create a new one. if adapter is None: # does an effective retry object exist? if use_retry is None: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context) else: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context, max_retries=use_retry) else: # if adapter is specified, nothing else should be specified. if retry is not None or ssl_context is not None: self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored." " retry and ssl_contexts need to be set in the adapter itself.") # Pick the specified or default adapter. use_adapter = adapter or self._rest_connection_adapter # mount the adapter. self._session.mount(adapter_url, use_adapter) return def view_rest_retry(self, url=None): """ View current rest retry settings in the `requests.Session()` object **Parameters:** - **url:** URL to use to determine retry methods for. Defaults to 'https://' **Returns:** Dict, Key header, value is header value. """ if url is None: url = "https://" return vars(self._session.get_adapter(url).max_retries) def expose_session(self): """ Call to expose the Requests Session object **Returns:** `requests.Session` object """ return self._session def add_headers(self, headers): """ Permanently add/overwrite headers to session. **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `requests.Session()` object, no return. """ self._session.headers.update(headers) return def remove_header(self, header): """ Permanently remove a single header from session **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `requests.Session()` object, no return. """ # check for header first. Return silently if it does not exist. if self._session.headers.get(header) is not None: del self._session.headers[header] return def view_headers(self): """ View current headers in the `requests.Session()` object **Returns:** Dict, Key header, value is header value. """ return dict(self._session.headers) def websocket_add_headers(self, headers): """ Permanently add/overwrite headers to the `API()` WebSocket object **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `API()` object, no return. """ self._websocket_headers.update(headers) return def websocket_remove_header(self, header): """ Permanently remove a single header from the `API()` WebSocket object **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `API()` object, no return. """ # check for header first. Return silently if it does not exist. if self._websocket_headers.get(header) is not None: del self._websocket_headers[header] return def websocket_view_headers(self): """ View current headers in the `API()` WebSocket object **Returns:** Dict, Key header, value is header value. """ return dict(self._websocket_headers) def view_cookies(self): """ View current cookies in the `requests.Session()` object **Returns:** List of Dicts, one cookie per Dict. """ return_list = [] for cookie in self._session.cookies: return_list.append(vars(cookie)) return return_list def set_debug(self, debuglevel, set_format=None, set_handler=None): """ Change the debug level of the API **Parameters:** - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter. - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default `logging.StreamHandler()` **Returns:** No item returned. """ # set the logging formatter and stream handle if set_format is None: # default formatter api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") elif not isinstance(set_format, text_type): # not a valid format string. Set to default. api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") else: # valid logging string. api_formatter = logging.Formatter(set_format) # set the logging handler if supported handler is not passed. if set_handler is None: # Default handler api_handler = logging.StreamHandler() elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler, logging.StreamHandler)): # not a valid handler. Set to default handler. api_handler = logging.StreamHandler() else: # passed valid handler api_handler = set_handler # set handler to use format. api_handler.setFormatter(api_formatter) # Get the loggers from other modules in prep for setting new handlers. urllib3_logger = logging.getLogger("requests.packages.urllib3") urllib3_retry_logger = logging.getLogger("urllib3.util.retry") cookie_logger = logging.getLogger("http.cookiejar") # remove existing handlers api_logger.handlers = [] urllib3_logger.handlers = [] urllib3_retry_logger.handlers = [] cookie_logger.handlers = [] ws_logger.handlers = [] # ok, lets set the new handlers. if isinstance(debuglevel, int): self._debuglevel = debuglevel if self._debuglevel == 1: api_logger.addHandler(api_handler) api_logger.setLevel(logging.INFO) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.INFO) elif self._debuglevel == 2: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) elif self._debuglevel >= 3: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True urllib3_logger.addHandler(api_handler) urllib3_logger.setLevel(logging.DEBUG) urllib3_retry_logger.addHandler(api_handler) urllib3_retry_logger.setLevel(logging.DEBUG) api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) else: # set to warning cookie_logger.setLevel(logging.WARNING) cookielib.debug = False urllib3_logger.setLevel(logging.WARNING) urllib3_retry_logger.setLevel(logging.WARNING) api_logger.setLevel(logging.WARNING) ws_logger.setLevel(logging.WARNING) return def _subclass_container(self): """ Call subclasses via function to allow passing parent namespace to subclasses. **Returns:** dict with subclass references. """ _parent_class = self return_object = {} class GetWrapper(Get): def __init__(self): self._parent_class = _parent_class return_object['get'] = GetWrapper class PostWrapper(Post): def __init__(self): self._parent_class = _parent_class return_object['post'] = PostWrapper class PutWrapper(Put): def __init__(self): self._parent_class = _parent_class return_object['put'] = PutWrapper class PatchWrapper(Patch): def __init__(self): self._parent_class = _parent_class return_object['patch'] = PatchWrapper class DeleteWrapper(Delete): def __init__(self): self._parent_class = _parent_class return_object['delete'] = DeleteWrapper class InteractiveWrapper(Interactive): def __init__(self): self._parent_class = _parent_class return_object['interactive'] = InteractiveWrapper class WebSocketsWrapper(WebSockets): def __init__(self): self._parent_class = _parent_class return_object['ws'] = WebSocketsWrapper return return_object def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False, retry=None, max_retry=None, retry_sleep=None): """ Generic REST call worker function **Parameters:** - **url:** URL for the REST call - **method:** METHOD for the REST call - **data:** Optional DATA for the call (for POST/PUT/etc.) - **sensitive:** Flag if content request/response should be hidden from logging functions - **timeout:** Requests Timeout - **content_json:** Bool on whether the Content-Type header should be set to application/json - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text. - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. **Returns:** Requests.Response object, extended with: - **cgx_status**: Bool, True if a successful CloudGenix response, False if error. - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses will be converted to a Dict response. - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True. - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True. """ # pull retry related items from Constructor if not specified. if timeout is None: timeout = self.rest_call_timeout if retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if max_retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if retry_sleep is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") # Get logging level, use this to bypass logging functions with possible large content if not set. logger_level = api_logger.getEffectiveLevel() # populate headers and cookies from session. if content_json and method.lower() not in ['get', 'delete']: headers = { 'Content-Type': 'application/json' } else: headers = {} # add session headers headers.update(self._session.headers) cookie = self._session.cookies.get_dict() # make sure data is populated if present. if isinstance(data, (list, dict)): data = json.dumps(data) api_logger.debug('REST_CALL URL = %s', url) # make request try: if not sensitive: api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n', method.upper(), url, headers, cookie, data) # Actual request response = self._session.request(method, url, data=data, stream=True, timeout=timeout, headers=headers, allow_redirects=False) # Request complete - lets parse. # if it's a non-CGX-good response, return with cgx_status = False if response.status_code not in [requests.codes.ok, requests.codes.no_content, requests.codes.found, requests.codes.moved]: # Simple JSON debug if not sensitive: try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) try: api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE: %s\n', text_type(response.text)) else: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') api_logger.debug("Error, non-200 response received: %s", response.status_code) # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) # We are in a failed request. If no error text in response, give the response code and detail. if response.cgx_errors is None: response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code)) return response else: # Simple JSON debug if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET): try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) api_logger.debug('RESPONSE: %s\n', text_type(response.text)) elif sensitive: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') # CGX extend requests.Response for return response.cgx_status = True response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\ as e: api_logger.info("Error, %s.", text_type(e)) # make a requests.Response object for return since we didn't get one. response = requests.Response # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = { '_error': [ { 'message': 'REST Request Exception: {}'.format(e), 'data': {}, } ] } # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response def websocket_call(self, url, *args, **kwargs): """ Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session. **Parameters:** - **url:** URL for the REST call - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below) **Returns:** `websocket.client.Connect` object. **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see <https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect> """ headers = {} # add session headers headers.update(self._websocket_headers) # Get cookies from requests. cookies = self._session.cookies.get_dict() # create cookie header from the cookies in Requests headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()]) # check for host header host_header = headers.get("Host") force_host_arg = None if host_header is not None: # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass # a separate IP to connect to as 'host' argument to `websockets.client.Connect`. # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host # header, and remove the host header - then send the original value to `Connect` as host kwarg. # split controller at "//" (https://controller.host.com) to get host. controller_host = self.controller.split("//")[1] force_host_arg = controller_host # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words) new_url = url.replace(force_host_arg, host_header, 1) api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url)) url = new_url # delete the host header del headers["Host"] # convert the headers dictionary to a list of tuples. header_tuple_list = [(key, value) for key, value in headers.items()] # Create argument tuple ws_args = (url,) + args # Create keyword args ws_kwargs = { "ssl": self._ca_ssl_context, "extra_headers": header_tuple_list } # check for force host arg if force_host_arg is not None: api_logger.debug("Forcing connection to host {0}".format(force_host_arg)) ws_kwargs["host"] = force_host_arg # Override automatic with any manually passed kwargs ws_kwargs.update(kwargs) return websockets.connect(*ws_args, **ws_kwargs) def parse_auth_token(self, auth_token): """ Break auth_token up into its constituent values. **Parameters:** - **auth_token:** Auth_token string **Returns:** dict with Auth Token constituents """ # remove the random security key value from the front of the auth_token auth_token_cleaned = auth_token.split('-', 1)[1] # URL Decode the Auth Token auth_token_decoded = self.url_decode(auth_token_cleaned) # Create a new dict to hold the response. auth_dict = {} # Parse the token for key_value in auth_token_decoded.split("&"): key_value_list = key_value.split("=") # check for valid token parts if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]: auth_dict[key_value_list[0]] = key_value_list[1] # Return the dict of key/values in the token. return auth_dict def update_region_to_controller(self, region): """ Update the controller string with dynamic region info. Controller string should end up as `<name[-env]>.<region>.cloudgenix.com` **Parameters:** - **region:** region string. **Returns:** No return value, mutates the controller in the class namespace """ # default region position in a list region_position = 1 # Check for a global "ignore region" flag if self.ignore_region: # bypass api_logger.debug("IGNORE_REGION set, not updating controller region.") return api_logger.debug("Updating Controller Region") api_logger.debug("CONTROLLER = %s", self.controller) api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("CONTROLLER_REGION = %s", self.controller_region) # Check if this is an initial region use or an update region use if self.controller_orig: controller_base = self.controller_orig else: controller_base = self.controller self.controller_orig = self.controller # splice controller string controller_full_part_list = controller_base.split('.') for idx, part in enumerate(controller_full_part_list): # is the region already in the controller string? if region == part: # yes, controller already has appropriate region api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx) # update region if it is not already set. if self.controller_region != region: self.controller_region = region api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) # Update controller if not already matching if self.controller != controller_base: self.controller = controller_base api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) return controller_part_count = len(controller_full_part_list) # handle short domain case if controller_part_count > 1: # insert region controller_full_part_list[region_position] = region self.controller = ".".join(controller_full_part_list) else: # short domain, just add region self.controller = ".".join(controller_full_part_list) + '.' + region # update SDK vars with region info self.controller_orig = controller_base self.controller_region = region api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) return def parse_region(self, login_response): """ Return region from a successful login response. **Parameters:** - **login_response:** requests.Response from a successful login. **Returns:** region name. """ auth_token = login_response.cgx_content['x_auth_token'] auth_token_dict = self.parse_auth_token(auth_token) auth_region = auth_token_dict.get('region') return auth_region def reparse_login_cookie_after_region_update(self, login_response): """ Sometimes, login cookie gets sent with region info instead of api.cloudgenix.com. This function re-parses the original login request and applies cookies to the session if they now match the new region. **Parameters:** - **login_response:** requests.Response from a non-region login. **Returns:** updates API() object directly, no return. """ login_url = login_response.request.url api_logger.debug("ORIGINAL REQUEST URL = %s", login_url) # replace old controller with new controller. login_url_new = login_url.replace(self.controller_orig, self.controller) api_logger.debug("UPDATED REQUEST URL = %s", login_url_new) # reset login url with new region login_response.request.url = login_url_new # prep cookie jar parsing req = requests.cookies.MockRequest(login_response.request) res = requests.cookies.MockResponse(login_response.raw._original_response.msg) # extract cookies to session cookie jar. self._session.cookies.extract_cookies(res, req) return @staticmethod def _catch_nonjson_streamresponse(rawresponse): """ Validate a streamed response is JSON. Return a Python dictionary either way. **Parameters:** - **rawresponse:** Streamed Response from Requests. **Returns:** Dictionary """ # attempt to load response for return. try: response = json.loads(rawresponse) except (ValueError, TypeError): if rawresponse: response = { '_error': [ { 'message': 'Response not in JSON format.', 'data': rawresponse, } ] } else: # in case of null response, return empty dict. response = {} return response @staticmethod def url_decode(url): """ URL Decode function using REGEX **Parameters:** - **url:** URLENCODED text string **Returns:** Non URLENCODED string """ return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(m.group(1), 16)), url) @staticmethod def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError): """ Non-recoverable error, write message to STDERR and raise exception **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError` **Returns:** No Return, throws exception. """ output = "ERROR: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) raise exception(message) @staticmethod def throw_warning(message, resp=None, cr=True): """ Recoverable Warning. **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. **Returns:** No Return. """ output = "WARNING: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) return def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'): """ Extract list of items from a CloudGenix API Response object. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **error_label:** Optional - text to describe operation on error. - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response. - **items_key:** Optional - Text for items key to extract (default 'items') **Returns:** list of 'items' objects. """ if pass_code_list is None: pass_code_list = [404, 400] items = resp_object.cgx_content.get(items_key) if resp_object.cgx_status and items is not None: return items # handle 404 and other error codes for certain APIs where objects may not exist elif resp_object.status_code in pass_code_list: return [{}] else: if error_label is not None: self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object) return [{}] else: self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object) return [{}] def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None): """ Build key/value lookup dictionary from a list of dictionaries with specified key/value entries. **Parameters:** - **list_content:** List of dicts to derive lookup structs from - **key_val:** Optional - Value to extract from entry to be key - **value_val:** Optional - Value to extract from entry to be value - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache` - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked. **Returns:** Lookup Dictionary """ if nag_cache and isinstance(nag_cache, list): already_nagged_dup_keys = nag_cache else: already_nagged_dup_keys = [] lookup_dict = {} blacklist_duplicate_keys = [] blacklist_duplicate_entries = [] for item in list_content: item_key = item.get(key_val) item_value = item.get(value_val) # print(item_key, item_value) if item_key and item_value is not None: # check if it's a duplicate key. if str(item_key) in lookup_dict: # First duplicate we've seen - save for warning. duplicate_value = lookup_dict.get(item_key) blacklist_duplicate_keys.append(item_key) blacklist_duplicate_entries.append({item_key: duplicate_value}) blacklist_duplicate_entries.append({item_key: item_value}) # remove from lookup dict to prevent accidental overlap usage del lookup_dict[str(item_key)] # check if it was a third+ duplicate key for a previous key elif item_key in blacklist_duplicate_keys: # save for warning. blacklist_duplicate_entries.append({item_key: item_value}) else: # no duplicates, append lookup_dict[str(item_key)] = item_value for duplicate_key in blacklist_duplicate_keys: matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry] # check if force_nag set and if not, has key already been notified to the end user. if force_nag or duplicate_key not in already_nagged_dup_keys: self.throw_warning( "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller," " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries) # we've now notified, add to notified list. already_nagged_dup_keys.append(duplicate_key) return lookup_dict @staticmethod def pull_content_error(resp_object, raw=False): """ Parse API response object, return error detail text for printing on error in response content. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False. **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors. """ api_logger.debug('pull_content_error function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object)) return None parsed_messages = [] errors = [] if isinstance(data, dict): # got a parsed response. errors = data.get('_error') if isinstance(errors, list): # Some errors in the response if raw is True: # just return raw error list return errors else: for error in errors: code = error.get('code') message = error.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif errors is None: # no errors, ensure list and empty. errors = [] else: # not a list.. put whatever it is into a list. errors = [errors] api_logger.debug("ERRORS: %s", errors) api_logger.debug("PARSED_ERRORS: %s", parsed_messages) # is parsed_messages empty and errors exist? dump errors as txt if not parsed_messages and len(errors) > 1: return text_type(errors) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of errors return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no errors return None @staticmethod def pull_content_warning(resp_object, raw=False): """ Parse API response object, return text for printing on warning in response. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False. **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings. """ api_logger.debug('pull_content_warning function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}' ''.format(resp_object)) return None parsed_messages = [] warnings = [] if isinstance(data, dict): # got a parsed response. warnings = data.get('_warning') if isinstance(warnings, list): # Some warnings in the response if raw is True: # just return raw warning list return warnings else: for warning in warnings: code = warning.get('code') message = warning.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif warnings is None: # no warnings, ensure list and empty. warnings = [] else: # not a list.. put whatever it is into a list. warnings = [warnings] api_logger.debug("WARNINGS: %s", warnings) api_logger.debug("PARSED_WARNINGS: %s", parsed_messages) # is parsed_messages empty and warnings exist? dump warnings as txt if not parsed_messages and len(warnings) > 1: return text_type(warnings) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of warnings return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no warnings return None
Module variables
var BYTE_CA_BUNDLE
Explicit CA bundle for CA Pinning - Root Certificates for the CloudGenix Controller API Endpoint.
Loaded from cloudgenix.ca_bundle.CG_CA_BUNDLE
var DEFAULT_SSL_CIPHERS
Default set of SSL/TLS ciphers for communication.
Default is meant to be widely compatible with dangerous ciphers removed.
var DEFAULT_SSL_OPTIONS
Default SSL/TLS Options
Default TLS v1.2/v1.3 or higher.
var SDK_BUILD_REGEX
REGEX for parsing SDK builds
var update_info_url
URL for checking for updates.
var version
SDK Version string
var ws_logger
websocket logger is handled slightly differently, so we will have a seperate handle.
Functions
def jd(
api_response)
JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body.
Example: jd(sdk.get.sites())
Parameters:
- api_response: A CloudGenix-attribute extended
requests.Response
object
Returns: No Return, directly prints all output.
def jd(api_response): """ JD (JSON Dump) function. Meant for quick pretty-printing of a CloudGenix Response body. Example: `jd(sdk.get.sites())` **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object **Returns:** No Return, directly prints all output. """ print(jdout(api_response)) return
def jd_detailed(
api_response, sensitive=False)
JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting.
Example: jd_detailed(cgx_sess.get.sites())
Parameters:
- api_response: A CloudGenix-attribute extended
requests.Response
object - sensitive: Boolean, if True will print sensitive content (specifically, authentication cookies/headers).
Returns: No Return, directly prints all output.
def jd_detailed(api_response, sensitive=False): """ JD (JSON Dump) Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. Example: `jd_detailed(cgx_sess.get.sites())` **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers). **Returns:** No Return, directly prints all output. """ print(jdout_detailed(api_response, sensitive=sensitive)) return
def jdout(
api_response)
JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string instead of directly printing content.
Parameters:
- api_response: A CloudGenix-attribute extended
requests.Response
object
Returns: Pretty-formatted text of the Response body
def jdout(api_response): """ JD Output function. Does quick pretty printing of a CloudGenix Response body. This function returns a string instead of directly printing content. **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object **Returns:** Pretty-formatted text of the Response body """ try: # attempt to output the cgx_content. should always be a Dict if it exists. output = json.dumps(api_response.cgx_content, indent=4) except (TypeError, ValueError, AttributeError): # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj. try: output = json.dumps(api_response, indent=4) except (TypeError, ValueError, AttributeError): # Same issue, just raw output the passed data. Let any exceptions happen here. output = api_response return output
def jdout_detailed(
api_response, sensitive=False)
JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. This function returns a string instead of directly printing content.
Parameters:
- api_response: A CloudGenix-attribute extended
requests.Response
object - sensitive: Boolean, if True will print sensitive content (specifically, authentication cookies/headers).
Returns: Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers, and Response Body.
def jdout_detailed(api_response, sensitive=False): """ JD Output Detailed function. Meant for quick DETAILED pretty-printing of CloudGenix Request and Response objects for troubleshooting. This function returns a string instead of directly printing content. **Parameters:** - **api_response:** A CloudGenix-attribute extended `requests.Response` object - **sensitive:** Boolean, if True will print sensitive content (specifically, authentication cookies/headers). **Returns:** Pretty-formatted text of the Request, Request Headers, Request body, Response, Response Headers, and Response Body. """ try: # try to be super verbose. output = "REQUEST: {0} {1}\n".format(api_response.request.method, api_response.request.path_url) output += "REQUEST HEADERS:\n" for key, value in api_response.request.headers.items(): # look for sensitive values if key.lower() in ['cookie'] and not sensitive: # we need to do some work to watch for the AUTH_TOKEN cookie. Split on cookie separator cookie_list = value.split('; ') muted_cookie_list = [] for cookie in cookie_list: # check if cookie starts with a permutation of AUTH_TOKEN/whitespace. if cookie.lower().strip().startswith('auth_token='): # first 11 chars of cookie with whitespace removed + mute string. newcookie = cookie.strip()[:11] + "\"<SENSITIVE - NOT SHOWN BY DEFAULT>\"" muted_cookie_list.append(newcookie) else: muted_cookie_list.append(cookie) # got list of cookies, muted as needed. recombine. muted_value = "; ".join(muted_cookie_list) output += "\t{0}: {1}\n".format(key, muted_value) elif key.lower() in ['x-auth-token'] and not sensitive: output += "\t{0}: {1}\n".format(key, "<SENSITIVE - NOT SHOWN BY DEFAULT>") else: output += "\t{0}: {1}\n".format(key, value) # if body not present, output blank. if not api_response.request.body: output += "REQUEST BODY:\n{0}\n\n".format({}) else: try: # Attempt to load JSON from string to make it look beter. output += "REQUEST BODY:\n{0}\n\n".format(json.dumps(json.loads(api_response.request.body), indent=4)) except (TypeError, ValueError, AttributeError): # if pretty call above didn't work, just toss it to jdout to best effort it. output += "REQUEST BODY:\n{0}\n\n".format(jdout(api_response.request.body)) output += "RESPONSE: {0} {1}\n".format(api_response.status_code, api_response.reason) output += "RESPONSE HEADERS:\n" for key, value in api_response.headers.items(): output += "\t{0}: {1}\n".format(key, value) try: # look for CGX content first. output += "RESPONSE DATA:\n{0}".format(json.dumps(api_response.cgx_content, indent=4)) except (TypeError, ValueError, AttributeError): # look for standard response data. output += "RESPONSE DATA:\n{0}".format(json.dumps(json.loads(api_response.content), indent=4)) except (TypeError, ValueError, AttributeError, UnicodeDecodeError): # cgx_content did not exist, or was not JSON serializable. Try pretty output the base obj. try: output = json.dumps(api_response, indent=4) except (TypeError, ValueError, AttributeError): # Same issue, just raw output the passed data. Let any exceptions happen here. output = api_response return output
Classes
class API
Class for interacting with the CloudGenix API.
Subclass objects are linked to various operations.
- get: links to
cloudgenix.get_api.Get
for API Get Operations - post: links to
cloudgenix.post_api.Post
for API Post Operations - put: links to
cloudgenix.put_api.Put
for API Put Operations - patch: links to
cloudgenix.patch_api.Patch
for API Patch Operations - delete: links to
cloudgenix.delete_api.Delete
for API Delete Operations
class API(object): """ Class for interacting with the CloudGenix API. Subclass objects are linked to various operations. - get: links to `cloudgenix.get_api.Get` for API Get Operations - post: links to `cloudgenix.post_api.Post` for API Post Operations - put: links to `cloudgenix.put_api.Put` for API Put Operations - patch: links to `cloudgenix.patch_api.Patch` for API Patch Operations - delete: links to `cloudgenix.delete_api.Delete` for API Delete Operations """ # Global structure, previously sdk_vars # Authentication is now stored as cookies, as part of the requests.Session() object. # Authentication can also be stored as an 'X-Auth-Token:' header, but cookies take precedence. controller = 'https://api.elcapitan.cloudgenix.com' """Current active controller URL""" controller_orig = None """Original Controller URL as entered - before Region re-parse""" controller_region = None """Controller Region, if present.""" ignore_region = False """Ignore regions returned by controller, and use explicit controller only.""" _debuglevel = 0 """debug level - set via set_debug()""" tenant_id = None """Numeric ID of tenant (account) - should be set after initial login from `cloudgenix.get_api.Get.profile` data""" tenant_name = None """Name of tenant (account), should be set after initial login from `cloudgenix.get_api.Get.profile` data""" token_session = None """Is this login from a static AUTH_TOKEN (True), or a standard login (False)""" is_esp = None """Is the current tenant an ESP/MSP?""" client_id = None """If ESP/MSP, is client currently logged in""" address_string = None """String representing address, optional - should be pulled from get.profile() data.""" email = None """Email (username) for session""" operator_id = None """Operator ID of current session.""" _user_id = None """Deprecated (replaced by `operator_id`.) Left for backwards compatibility.""" _password = None """user password - only used when argv passed, and cleared quickly""" roles = None """Roles list""" verify = True """Verify SSL certificate.""" version = None """Version string for use once Constructor created.""" ca_verify_filename = None """DEPRECATED: Filename to use for CA verification. Moved to modern python `ssl`, updated to only passing filename in event of local file.""" _ca_verify_file_handle = None """File handle for CA verification""" _ca_ssl_context = None """`ssl` library context for controller connections""" _rest_connection_adapter = None """`requests.adapters.HTTPAdapter` object for use by requests to request content.""" rest_call_retry = False """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" rest_call_max_retry = 30 """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" rest_call_sleep = 10 """DEPRECATED: Please use `cloudgenix.API.modify_rest_retry`.""" _rest_call_retry_object = None """`urllib3.util.retry.Retry` object for use by SDK to do retries""" rest_call_timeout = 240 """Maximum time to wait for any data from REST server.""" cache = {} """API response cache (Future)""" _parent_namespace = None """holder for namespace for wrapper classes.""" _session = None """holder for requests.Session() object""" _websocket_headers = None """holder for WebSocket Headers""" update_check = True """Notify users of available update to SDK""" update_info_url = None """Update Info URL for use once Constructor Created.""" def __init__(self, controller=controller, ssl_verify=verify, update_check=True): """ Create the API constructor object - **controller:** Initial Controller URL String - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details. - **update_check:** Bool to Enable/Disable SDK update check and new release notifications. """ # set version and update url from outer scope. self.version = version """Version string for use once Constructor created.""" self.update_info_url = update_info_url """Update Info URL for use once Constructor Created.""" # try: if controller and isinstance(controller, (binary_type, text_type)): self.controller = controller.lower() self.controller_orig = controller.lower() # Create Requests Session. self._session = requests.Session() # set ssl context if isinstance(ssl_verify, (binary_type, text_type, bool)): self.ssl_verify(ssl_verify, update_adapter=False) # Set default REST retry parameters self.modify_rest_retry(update_adapter=False) # update the HTTP Adapter for requests. self.update_session_adapter() # handle update check if isinstance(update_check, bool): self.update_check = update_check if update_check: self.notify_for_new_version() # Identify SDK in the User-Agent. user_agent = self._session.headers.get('User-Agent') if user_agent: user_agent += ' (CGX SDK v{0})'.format(self.version) else: user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version) # Update Headers self._session.headers.update({ 'Accept': 'application/json', 'User-Agent': text_type(user_agent) }) # except Exception as e: # raise ValueError("Unable to create Requests session object: {0}.".format(e)) api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s", self.controller, self.verify, self._session) # Update Headers for WebSocket requests websocketlib_name = websockets.__name__ websocketlib_version = websockets.version.version if not websocketlib_name: websocketlib_name = 'websockets' if not websocketlib_version: websocketlib_version = 'UNKNOWN' ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name, websocketlib_version, self.version) self._websocket_headers = { 'Accept': 'application/json', 'User-Agent': text_type(ws_user_agent) } # Bind API method classes to this object subclasses = self._subclass_container() self.get = subclasses["get"]() """API object link to `cloudgenix.get_api.Get`""" self.post = subclasses["post"]() """API object link to `cloudgenix.post_api.Post`""" self.put = subclasses["put"]() """API object link to `cloudgenix.put_api.Put`""" self.patch = subclasses["patch"]() """API object link to `cloudgenix.patch_api.Patch`""" self.delete = subclasses["delete"]() """API object link to `cloudgenix.delete_api.Delete`""" self.interactive = subclasses["interactive"]() """API object link to `cloudgenix.interactive.Interactive`""" self.ws = subclasses["ws"]() """API object link to `cloudgenix.ws.WebSockets`""" return def notify_for_new_version(self): """ Check for a new version of the SDK on API constructor instantiation. If new version found, print Notification to STDERR. On failure of this check, fail silently. **Returns:** No item returned, directly prints notification to `sys.stderr`. """ # broad exception clause, if this fails for any reason just return. try: recommend_update = False update_check_resp = requests.get(self.update_info_url, timeout=3) web_version = update_check_resp.json()["info"]["version"] api_logger.debug("RETRIEVED_VERSION: %s", web_version) available_version = SDK_BUILD_REGEX.search(web_version).groupdict() current_version = SDK_BUILD_REGEX.search(self.version).groupdict() available_major = available_version.get('major') available_minor = available_version.get('minor') available_patch = available_version.get('patch') available_build = available_version.get('build') current_major = current_version.get('major') current_minor = current_version.get('minor') current_patch = current_version.get('patch') current_build = current_version.get('build') api_logger.debug("AVAILABLE_VERSION: %s", available_version) api_logger.debug("CURRENT_VERSION: %s", current_version) # check for major/minor version differences, do not alert for build differences. if available_major > current_major: recommend_update = True elif available_major >= current_major and available_minor > current_minor: recommend_update = True elif available_major >= current_major and available_minor >= current_minor and \ available_patch > current_patch: recommend_update = True api_logger.debug("NEED_UPDATE: %s", recommend_update) # notify. if recommend_update: sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 " "months after release of a new version.\n" "\tLatest Version: {0}\n" "\tCurrent Version: {1}\n" "\tFor more info, see 'https://github.com/cloudgenix/sdk-python'. Additionally, this " "message can be suppressed by instantiating the API with API(update_check=False).\n\n" "".format(web_version, self.version)) return except Exception: # just return and continue. return def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS, tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True): """ Modify ssl verification settings **Parameters:** - **ssl_verify:** - **True:** Verify using builtin BYTE_CA_BUNDLE. - **False:** No SSL Verification. - ***Str:** Full path to a x509 PEM CA File or bundle. - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified. - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified. - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2` - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED` - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the SSL Context / session in place directly. """ self.verify = ssl_verify # if verify true/false, set ca_verify_file appropriately if isinstance(self.verify, bool): if self.verify: # True # load default CA list and set default SSL ciphers. self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self._session.verify = True else: # False # disable warnings for SSL certs. urllib3.disable_warnings() self._ca_ssl_context = ssl.SSLContext() self._ca_ssl_context.check_hostname = False self._ca_ssl_context.verify_mode = ssl.CERT_NONE self._session.verify = False else: # Not True/False, assume path to file/dir for Requests # set filename/filepath for context self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify) self._ca_ssl_context.check_hostname = True self._session.verify = True # set the CA independent values self._ca_ssl_context.minimum_version = tls_min self._ca_ssl_context.maximum_version = tls_max self._ca_ssl_context.set_ciphers(ciphers) self._ca_ssl_context.options |= options # update the session adapter. if update_adapter: self.update_session_adapter() return def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0, allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None, backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True, respect_retry_after_header=True, update_adapter=True): """ Modify retry parameters for the SDKs rest call object. Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to the underlying `requests.Session` object. Default retry with total=8 and backoff_factor=0.705883: - Try 1, 0 delay (0 total seconds) - Try 2, 0 delay (0 total seconds) - Try 3, 0.705883 delay (0.705883 total seconds) - Try 4, 1.411766 delay (2.117649 total seconds) - Try 5, 2.823532 delay (4.941181 total seconds) - Try 6, 5.647064 delay (10.588245 total seconds) - Try 7, 11.294128 delay (21.882373 total seconds) - Try 8, 22.588256 delay (44.470629 total seconds) - Try 9, 45.176512 delay (89.647141 total seconds) - Try 10, 90.353024 delay (180.000165 total seconds) **Parameters:** - **total:** int, Total number of retries to allow. Takes precedence over other counts. - **connect:** int, How many connection-related errors to retry on. - **read:** int, How many times to retry on read errors. - **redirect:** int, How many redirects to perform. loops. - **status:** int, How many times to retry on bad status codes. - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors. - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on. - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on. - **backoff_factor:** float, A backoff factor to apply between attempts after the second try. - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response. - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses. - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes. - **adapter_url:** string, URL match for these retry values (default `https://`) - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the retry / session directly """ # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified. if status_forcelist is None: status_forcelist = (413, 429, 502, 503, 504) retry = urllib3.util.retry.Retry(total=total, connect=connect, read=read, redirect=redirect, status=status, other=other, allowed_methods=allowed_methods, status_forcelist=status_forcelist, backoff_factor=backoff_factor, raise_on_redirect=raise_on_redirect, raise_on_status=raise_on_status, respect_retry_after_header=respect_retry_after_header) # set updated retry object. self._rest_call_retry_object = retry # call update to mount/remount session adapter if update_adapter: self.update_session_adapter() return def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"): """ Mount/remount the HTTPS session adapter after changes that affect it. **Parameters:** - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter. - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object. - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context. - **adapter_url:** URL to use for matching requests. Defaults to 'https://' **Returns:** No return, mutates the session adapter directly """ # use default or specified session objects. use_retry = retry or self._rest_call_retry_object use_ssl_context = ssl_context or self._ca_ssl_context # if object adapter has not been built, create a new one. if adapter is None: # does an effective retry object exist? if use_retry is None: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context) else: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context, max_retries=use_retry) else: # if adapter is specified, nothing else should be specified. if retry is not None or ssl_context is not None: self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored." " retry and ssl_contexts need to be set in the adapter itself.") # Pick the specified or default adapter. use_adapter = adapter or self._rest_connection_adapter # mount the adapter. self._session.mount(adapter_url, use_adapter) return def view_rest_retry(self, url=None): """ View current rest retry settings in the `requests.Session()` object **Parameters:** - **url:** URL to use to determine retry methods for. Defaults to 'https://' **Returns:** Dict, Key header, value is header value. """ if url is None: url = "https://" return vars(self._session.get_adapter(url).max_retries) def expose_session(self): """ Call to expose the Requests Session object **Returns:** `requests.Session` object """ return self._session def add_headers(self, headers): """ Permanently add/overwrite headers to session. **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `requests.Session()` object, no return. """ self._session.headers.update(headers) return def remove_header(self, header): """ Permanently remove a single header from session **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `requests.Session()` object, no return. """ # check for header first. Return silently if it does not exist. if self._session.headers.get(header) is not None: del self._session.headers[header] return def view_headers(self): """ View current headers in the `requests.Session()` object **Returns:** Dict, Key header, value is header value. """ return dict(self._session.headers) def websocket_add_headers(self, headers): """ Permanently add/overwrite headers to the `API()` WebSocket object **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `API()` object, no return. """ self._websocket_headers.update(headers) return def websocket_remove_header(self, header): """ Permanently remove a single header from the `API()` WebSocket object **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `API()` object, no return. """ # check for header first. Return silently if it does not exist. if self._websocket_headers.get(header) is not None: del self._websocket_headers[header] return def websocket_view_headers(self): """ View current headers in the `API()` WebSocket object **Returns:** Dict, Key header, value is header value. """ return dict(self._websocket_headers) def view_cookies(self): """ View current cookies in the `requests.Session()` object **Returns:** List of Dicts, one cookie per Dict. """ return_list = [] for cookie in self._session.cookies: return_list.append(vars(cookie)) return return_list def set_debug(self, debuglevel, set_format=None, set_handler=None): """ Change the debug level of the API **Parameters:** - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter. - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default `logging.StreamHandler()` **Returns:** No item returned. """ # set the logging formatter and stream handle if set_format is None: # default formatter api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") elif not isinstance(set_format, text_type): # not a valid format string. Set to default. api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") else: # valid logging string. api_formatter = logging.Formatter(set_format) # set the logging handler if supported handler is not passed. if set_handler is None: # Default handler api_handler = logging.StreamHandler() elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler, logging.StreamHandler)): # not a valid handler. Set to default handler. api_handler = logging.StreamHandler() else: # passed valid handler api_handler = set_handler # set handler to use format. api_handler.setFormatter(api_formatter) # Get the loggers from other modules in prep for setting new handlers. urllib3_logger = logging.getLogger("requests.packages.urllib3") urllib3_retry_logger = logging.getLogger("urllib3.util.retry") cookie_logger = logging.getLogger("http.cookiejar") # remove existing handlers api_logger.handlers = [] urllib3_logger.handlers = [] urllib3_retry_logger.handlers = [] cookie_logger.handlers = [] ws_logger.handlers = [] # ok, lets set the new handlers. if isinstance(debuglevel, int): self._debuglevel = debuglevel if self._debuglevel == 1: api_logger.addHandler(api_handler) api_logger.setLevel(logging.INFO) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.INFO) elif self._debuglevel == 2: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) elif self._debuglevel >= 3: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True urllib3_logger.addHandler(api_handler) urllib3_logger.setLevel(logging.DEBUG) urllib3_retry_logger.addHandler(api_handler) urllib3_retry_logger.setLevel(logging.DEBUG) api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) else: # set to warning cookie_logger.setLevel(logging.WARNING) cookielib.debug = False urllib3_logger.setLevel(logging.WARNING) urllib3_retry_logger.setLevel(logging.WARNING) api_logger.setLevel(logging.WARNING) ws_logger.setLevel(logging.WARNING) return def _subclass_container(self): """ Call subclasses via function to allow passing parent namespace to subclasses. **Returns:** dict with subclass references. """ _parent_class = self return_object = {} class GetWrapper(Get): def __init__(self): self._parent_class = _parent_class return_object['get'] = GetWrapper class PostWrapper(Post): def __init__(self): self._parent_class = _parent_class return_object['post'] = PostWrapper class PutWrapper(Put): def __init__(self): self._parent_class = _parent_class return_object['put'] = PutWrapper class PatchWrapper(Patch): def __init__(self): self._parent_class = _parent_class return_object['patch'] = PatchWrapper class DeleteWrapper(Delete): def __init__(self): self._parent_class = _parent_class return_object['delete'] = DeleteWrapper class InteractiveWrapper(Interactive): def __init__(self): self._parent_class = _parent_class return_object['interactive'] = InteractiveWrapper class WebSocketsWrapper(WebSockets): def __init__(self): self._parent_class = _parent_class return_object['ws'] = WebSocketsWrapper return return_object def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False, retry=None, max_retry=None, retry_sleep=None): """ Generic REST call worker function **Parameters:** - **url:** URL for the REST call - **method:** METHOD for the REST call - **data:** Optional DATA for the call (for POST/PUT/etc.) - **sensitive:** Flag if content request/response should be hidden from logging functions - **timeout:** Requests Timeout - **content_json:** Bool on whether the Content-Type header should be set to application/json - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text. - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. **Returns:** Requests.Response object, extended with: - **cgx_status**: Bool, True if a successful CloudGenix response, False if error. - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses will be converted to a Dict response. - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True. - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True. """ # pull retry related items from Constructor if not specified. if timeout is None: timeout = self.rest_call_timeout if retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if max_retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if retry_sleep is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") # Get logging level, use this to bypass logging functions with possible large content if not set. logger_level = api_logger.getEffectiveLevel() # populate headers and cookies from session. if content_json and method.lower() not in ['get', 'delete']: headers = { 'Content-Type': 'application/json' } else: headers = {} # add session headers headers.update(self._session.headers) cookie = self._session.cookies.get_dict() # make sure data is populated if present. if isinstance(data, (list, dict)): data = json.dumps(data) api_logger.debug('REST_CALL URL = %s', url) # make request try: if not sensitive: api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n', method.upper(), url, headers, cookie, data) # Actual request response = self._session.request(method, url, data=data, stream=True, timeout=timeout, headers=headers, allow_redirects=False) # Request complete - lets parse. # if it's a non-CGX-good response, return with cgx_status = False if response.status_code not in [requests.codes.ok, requests.codes.no_content, requests.codes.found, requests.codes.moved]: # Simple JSON debug if not sensitive: try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) try: api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE: %s\n', text_type(response.text)) else: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') api_logger.debug("Error, non-200 response received: %s", response.status_code) # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) # We are in a failed request. If no error text in response, give the response code and detail. if response.cgx_errors is None: response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code)) return response else: # Simple JSON debug if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET): try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) api_logger.debug('RESPONSE: %s\n', text_type(response.text)) elif sensitive: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') # CGX extend requests.Response for return response.cgx_status = True response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\ as e: api_logger.info("Error, %s.", text_type(e)) # make a requests.Response object for return since we didn't get one. response = requests.Response # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = { '_error': [ { 'message': 'REST Request Exception: {}'.format(e), 'data': {}, } ] } # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response def websocket_call(self, url, *args, **kwargs): """ Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session. **Parameters:** - **url:** URL for the REST call - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below) **Returns:** `websocket.client.Connect` object. **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see <https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect> """ headers = {} # add session headers headers.update(self._websocket_headers) # Get cookies from requests. cookies = self._session.cookies.get_dict() # create cookie header from the cookies in Requests headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()]) # check for host header host_header = headers.get("Host") force_host_arg = None if host_header is not None: # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass # a separate IP to connect to as 'host' argument to `websockets.client.Connect`. # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host # header, and remove the host header - then send the original value to `Connect` as host kwarg. # split controller at "//" (https://controller.host.com) to get host. controller_host = self.controller.split("//")[1] force_host_arg = controller_host # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words) new_url = url.replace(force_host_arg, host_header, 1) api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url)) url = new_url # delete the host header del headers["Host"] # convert the headers dictionary to a list of tuples. header_tuple_list = [(key, value) for key, value in headers.items()] # Create argument tuple ws_args = (url,) + args # Create keyword args ws_kwargs = { "ssl": self._ca_ssl_context, "extra_headers": header_tuple_list } # check for force host arg if force_host_arg is not None: api_logger.debug("Forcing connection to host {0}".format(force_host_arg)) ws_kwargs["host"] = force_host_arg # Override automatic with any manually passed kwargs ws_kwargs.update(kwargs) return websockets.connect(*ws_args, **ws_kwargs) def parse_auth_token(self, auth_token): """ Break auth_token up into its constituent values. **Parameters:** - **auth_token:** Auth_token string **Returns:** dict with Auth Token constituents """ # remove the random security key value from the front of the auth_token auth_token_cleaned = auth_token.split('-', 1)[1] # URL Decode the Auth Token auth_token_decoded = self.url_decode(auth_token_cleaned) # Create a new dict to hold the response. auth_dict = {} # Parse the token for key_value in auth_token_decoded.split("&"): key_value_list = key_value.split("=") # check for valid token parts if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]: auth_dict[key_value_list[0]] = key_value_list[1] # Return the dict of key/values in the token. return auth_dict def update_region_to_controller(self, region): """ Update the controller string with dynamic region info. Controller string should end up as `<name[-env]>.<region>.cloudgenix.com` **Parameters:** - **region:** region string. **Returns:** No return value, mutates the controller in the class namespace """ # default region position in a list region_position = 1 # Check for a global "ignore region" flag if self.ignore_region: # bypass api_logger.debug("IGNORE_REGION set, not updating controller region.") return api_logger.debug("Updating Controller Region") api_logger.debug("CONTROLLER = %s", self.controller) api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("CONTROLLER_REGION = %s", self.controller_region) # Check if this is an initial region use or an update region use if self.controller_orig: controller_base = self.controller_orig else: controller_base = self.controller self.controller_orig = self.controller # splice controller string controller_full_part_list = controller_base.split('.') for idx, part in enumerate(controller_full_part_list): # is the region already in the controller string? if region == part: # yes, controller already has appropriate region api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx) # update region if it is not already set. if self.controller_region != region: self.controller_region = region api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) # Update controller if not already matching if self.controller != controller_base: self.controller = controller_base api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) return controller_part_count = len(controller_full_part_list) # handle short domain case if controller_part_count > 1: # insert region controller_full_part_list[region_position] = region self.controller = ".".join(controller_full_part_list) else: # short domain, just add region self.controller = ".".join(controller_full_part_list) + '.' + region # update SDK vars with region info self.controller_orig = controller_base self.controller_region = region api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) return def parse_region(self, login_response): """ Return region from a successful login response. **Parameters:** - **login_response:** requests.Response from a successful login. **Returns:** region name. """ auth_token = login_response.cgx_content['x_auth_token'] auth_token_dict = self.parse_auth_token(auth_token) auth_region = auth_token_dict.get('region') return auth_region def reparse_login_cookie_after_region_update(self, login_response): """ Sometimes, login cookie gets sent with region info instead of api.cloudgenix.com. This function re-parses the original login request and applies cookies to the session if they now match the new region. **Parameters:** - **login_response:** requests.Response from a non-region login. **Returns:** updates API() object directly, no return. """ login_url = login_response.request.url api_logger.debug("ORIGINAL REQUEST URL = %s", login_url) # replace old controller with new controller. login_url_new = login_url.replace(self.controller_orig, self.controller) api_logger.debug("UPDATED REQUEST URL = %s", login_url_new) # reset login url with new region login_response.request.url = login_url_new # prep cookie jar parsing req = requests.cookies.MockRequest(login_response.request) res = requests.cookies.MockResponse(login_response.raw._original_response.msg) # extract cookies to session cookie jar. self._session.cookies.extract_cookies(res, req) return @staticmethod def _catch_nonjson_streamresponse(rawresponse): """ Validate a streamed response is JSON. Return a Python dictionary either way. **Parameters:** - **rawresponse:** Streamed Response from Requests. **Returns:** Dictionary """ # attempt to load response for return. try: response = json.loads(rawresponse) except (ValueError, TypeError): if rawresponse: response = { '_error': [ { 'message': 'Response not in JSON format.', 'data': rawresponse, } ] } else: # in case of null response, return empty dict. response = {} return response @staticmethod def url_decode(url): """ URL Decode function using REGEX **Parameters:** - **url:** URLENCODED text string **Returns:** Non URLENCODED string """ return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(m.group(1), 16)), url) @staticmethod def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError): """ Non-recoverable error, write message to STDERR and raise exception **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError` **Returns:** No Return, throws exception. """ output = "ERROR: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) raise exception(message) @staticmethod def throw_warning(message, resp=None, cr=True): """ Recoverable Warning. **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. **Returns:** No Return. """ output = "WARNING: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) return def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'): """ Extract list of items from a CloudGenix API Response object. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **error_label:** Optional - text to describe operation on error. - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response. - **items_key:** Optional - Text for items key to extract (default 'items') **Returns:** list of 'items' objects. """ if pass_code_list is None: pass_code_list = [404, 400] items = resp_object.cgx_content.get(items_key) if resp_object.cgx_status and items is not None: return items # handle 404 and other error codes for certain APIs where objects may not exist elif resp_object.status_code in pass_code_list: return [{}] else: if error_label is not None: self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object) return [{}] else: self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object) return [{}] def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None): """ Build key/value lookup dictionary from a list of dictionaries with specified key/value entries. **Parameters:** - **list_content:** List of dicts to derive lookup structs from - **key_val:** Optional - Value to extract from entry to be key - **value_val:** Optional - Value to extract from entry to be value - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache` - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked. **Returns:** Lookup Dictionary """ if nag_cache and isinstance(nag_cache, list): already_nagged_dup_keys = nag_cache else: already_nagged_dup_keys = [] lookup_dict = {} blacklist_duplicate_keys = [] blacklist_duplicate_entries = [] for item in list_content: item_key = item.get(key_val) item_value = item.get(value_val) # print(item_key, item_value) if item_key and item_value is not None: # check if it's a duplicate key. if str(item_key) in lookup_dict: # First duplicate we've seen - save for warning. duplicate_value = lookup_dict.get(item_key) blacklist_duplicate_keys.append(item_key) blacklist_duplicate_entries.append({item_key: duplicate_value}) blacklist_duplicate_entries.append({item_key: item_value}) # remove from lookup dict to prevent accidental overlap usage del lookup_dict[str(item_key)] # check if it was a third+ duplicate key for a previous key elif item_key in blacklist_duplicate_keys: # save for warning. blacklist_duplicate_entries.append({item_key: item_value}) else: # no duplicates, append lookup_dict[str(item_key)] = item_value for duplicate_key in blacklist_duplicate_keys: matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry] # check if force_nag set and if not, has key already been notified to the end user. if force_nag or duplicate_key not in already_nagged_dup_keys: self.throw_warning( "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller," " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries) # we've now notified, add to notified list. already_nagged_dup_keys.append(duplicate_key) return lookup_dict @staticmethod def pull_content_error(resp_object, raw=False): """ Parse API response object, return error detail text for printing on error in response content. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False. **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors. """ api_logger.debug('pull_content_error function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object)) return None parsed_messages = [] errors = [] if isinstance(data, dict): # got a parsed response. errors = data.get('_error') if isinstance(errors, list): # Some errors in the response if raw is True: # just return raw error list return errors else: for error in errors: code = error.get('code') message = error.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif errors is None: # no errors, ensure list and empty. errors = [] else: # not a list.. put whatever it is into a list. errors = [errors] api_logger.debug("ERRORS: %s", errors) api_logger.debug("PARSED_ERRORS: %s", parsed_messages) # is parsed_messages empty and errors exist? dump errors as txt if not parsed_messages and len(errors) > 1: return text_type(errors) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of errors return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no errors return None @staticmethod def pull_content_warning(resp_object, raw=False): """ Parse API response object, return text for printing on warning in response. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False. **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings. """ api_logger.debug('pull_content_warning function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}' ''.format(resp_object)) return None parsed_messages = [] warnings = [] if isinstance(data, dict): # got a parsed response. warnings = data.get('_warning') if isinstance(warnings, list): # Some warnings in the response if raw is True: # just return raw warning list return warnings else: for warning in warnings: code = warning.get('code') message = warning.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif warnings is None: # no warnings, ensure list and empty. warnings = [] else: # not a list.. put whatever it is into a list. warnings = [warnings] api_logger.debug("WARNINGS: %s", warnings) api_logger.debug("PARSED_WARNINGS: %s", parsed_messages) # is parsed_messages empty and warnings exist? dump warnings as txt if not parsed_messages and len(warnings) > 1: return text_type(warnings) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of warnings return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no warnings return None
Ancestors (in MRO)
- API
- builtins.object
Class variables
var address_string
String representing address, optional - should be pulled from get.profile() data.
var ca_verify_filename
DEPRECATED: Filename to use for CA verification. Moved to modern python ssl
, updated to only passing
filename in event of local file.
var cache
API response cache (Future)
var client_id
If ESP/MSP, is client currently logged in
var controller
Current active controller URL
var controller_orig
Original Controller URL as entered - before Region re-parse
var controller_region
Controller Region, if present.
var email
Email (username) for session
var ignore_region
Ignore regions returned by controller, and use explicit controller only.
var is_esp
Is the current tenant an ESP/MSP?
var operator_id
Operator ID of current session.
var rest_call_timeout
Maximum time to wait for any data from REST server.
var roles
Roles list
var tenant_id
Numeric ID of tenant (account) - should be set after initial login from cloudgenix.get_api.Get.profile
data
var tenant_name
Name of tenant (account), should be set after initial login from cloudgenix.get_api.Get.profile
data
var token_session
Is this login from a static AUTH_TOKEN (True), or a standard login (False)
var update_check
Notify users of available update to SDK
var update_info_url
Update Info URL for use once Constructor Created.
var verify
Verify SSL certificate.
var version
Version string for use once Constructor created.
Static methods
def __init__(
self, controller='https://api.elcapitan.cloudgenix.com', ssl_verify=True, update_check=True)
Create the API constructor object
- controller: Initial Controller URL String
- ssl_verify: Should SSL be verified for this system. Can be
file
or BOOL. Seessl_verify
for more details. - update_check: Bool to Enable/Disable SDK update check and new release notifications.
def __init__(self, controller=controller, ssl_verify=verify, update_check=True): """ Create the API constructor object - **controller:** Initial Controller URL String - **ssl_verify:** Should SSL be verified for this system. Can be `file` or BOOL. See `cloudgenix.API.ssl_verify` for more details. - **update_check:** Bool to Enable/Disable SDK update check and new release notifications. """ # set version and update url from outer scope. self.version = version """Version string for use once Constructor created.""" self.update_info_url = update_info_url """Update Info URL for use once Constructor Created.""" # try: if controller and isinstance(controller, (binary_type, text_type)): self.controller = controller.lower() self.controller_orig = controller.lower() # Create Requests Session. self._session = requests.Session() # set ssl context if isinstance(ssl_verify, (binary_type, text_type, bool)): self.ssl_verify(ssl_verify, update_adapter=False) # Set default REST retry parameters self.modify_rest_retry(update_adapter=False) # update the HTTP Adapter for requests. self.update_session_adapter() # handle update check if isinstance(update_check, bool): self.update_check = update_check if update_check: self.notify_for_new_version() # Identify SDK in the User-Agent. user_agent = self._session.headers.get('User-Agent') if user_agent: user_agent += ' (CGX SDK v{0})'.format(self.version) else: user_agent = 'python-requests/UNKNOWN (CGX SDK v{0})'.format(self.version) # Update Headers self._session.headers.update({ 'Accept': 'application/json', 'User-Agent': text_type(user_agent) }) # except Exception as e: # raise ValueError("Unable to create Requests session object: {0}.".format(e)) api_logger.debug("DEBUG: URL: %s, SSL Verify: %s, Session: %s", self.controller, self.verify, self._session) # Update Headers for WebSocket requests websocketlib_name = websockets.__name__ websocketlib_version = websockets.version.version if not websocketlib_name: websocketlib_name = 'websockets' if not websocketlib_version: websocketlib_version = 'UNKNOWN' ws_user_agent = 'python-{0}/{1} (CGX SDK v{2})'.format(websocketlib_name, websocketlib_version, self.version) self._websocket_headers = { 'Accept': 'application/json', 'User-Agent': text_type(ws_user_agent) } # Bind API method classes to this object subclasses = self._subclass_container() self.get = subclasses["get"]() """API object link to `cloudgenix.get_api.Get`""" self.post = subclasses["post"]() """API object link to `cloudgenix.post_api.Post`""" self.put = subclasses["put"]() """API object link to `cloudgenix.put_api.Put`""" self.patch = subclasses["patch"]() """API object link to `cloudgenix.patch_api.Patch`""" self.delete = subclasses["delete"]() """API object link to `cloudgenix.delete_api.Delete`""" self.interactive = subclasses["interactive"]() """API object link to `cloudgenix.interactive.Interactive`""" self.ws = subclasses["ws"]() """API object link to `cloudgenix.ws.WebSockets`""" return
def add_headers(
self, headers)
Permanently add/overwrite headers to session.
Parameters:
- headers: dict with header/value
Returns: Mutates requests.Session()
object, no return.
def add_headers(self, headers): """ Permanently add/overwrite headers to session. **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `requests.Session()` object, no return. """ self._session.headers.update(headers) return
def build_lookup_dict(
self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None)
Build key/value lookup dictionary from a list of dictionaries with specified key/value entries.
Parameters:
- list_content: List of dicts to derive lookup structs from
- key_val: Optional - Value to extract from entry to be key
- value_val: Optional - Value to extract from entry to be value
- force_nag: Optional - Bool, if True will nag even if key in
nag_cache
- nag_cache: Optional - List of keys that already exist in a lookup dict that should be duplicate checked.
Returns: Lookup Dictionary
def build_lookup_dict(self, list_content, key_val='name', value_val='id', force_nag=False, nag_cache=None): """ Build key/value lookup dictionary from a list of dictionaries with specified key/value entries. **Parameters:** - **list_content:** List of dicts to derive lookup structs from - **key_val:** Optional - Value to extract from entry to be key - **value_val:** Optional - Value to extract from entry to be value - **force_nag:** Optional - Bool, if True will nag even if key in `nag_cache` - **nag_cache:** Optional - List of keys that already exist in a lookup dict that should be duplicate checked. **Returns:** Lookup Dictionary """ if nag_cache and isinstance(nag_cache, list): already_nagged_dup_keys = nag_cache else: already_nagged_dup_keys = [] lookup_dict = {} blacklist_duplicate_keys = [] blacklist_duplicate_entries = [] for item in list_content: item_key = item.get(key_val) item_value = item.get(value_val) # print(item_key, item_value) if item_key and item_value is not None: # check if it's a duplicate key. if str(item_key) in lookup_dict: # First duplicate we've seen - save for warning. duplicate_value = lookup_dict.get(item_key) blacklist_duplicate_keys.append(item_key) blacklist_duplicate_entries.append({item_key: duplicate_value}) blacklist_duplicate_entries.append({item_key: item_value}) # remove from lookup dict to prevent accidental overlap usage del lookup_dict[str(item_key)] # check if it was a third+ duplicate key for a previous key elif item_key in blacklist_duplicate_keys: # save for warning. blacklist_duplicate_entries.append({item_key: item_value}) else: # no duplicates, append lookup_dict[str(item_key)] = item_value for duplicate_key in blacklist_duplicate_keys: matching_entries = [entry for entry in blacklist_duplicate_entries if duplicate_key in entry] # check if force_nag set and if not, has key already been notified to the end user. if force_nag or duplicate_key not in already_nagged_dup_keys: self.throw_warning( "Lookup value '{0}' was seen two or more times. To use, please remove duplicates in the controller," " or reference it explicitly by the actual value: ".format(duplicate_key), matching_entries) # we've now notified, add to notified list. already_nagged_dup_keys.append(duplicate_key) return lookup_dict
def expose_session(
self)
Call to expose the Requests Session object
Returns: requests.Session
object
def expose_session(self): """ Call to expose the Requests Session object **Returns:** `requests.Session` object """ return self._session
def extract_items(
self, resp_object, error_label=None, pass_code_list=None, items_key='items')
Extract list of items from a CloudGenix API Response object.
Parameters:
- resp_object: CloudGenix Extended
requests.Response
object. - error_label: Optional - text to describe operation on error.
- pass_code_list: Optional - list of HTTP response codes to silently pass with empty list response.
- items_key: Optional - Text for items key to extract (default 'items')
Returns: list of 'items' objects.
def extract_items(self, resp_object, error_label=None, pass_code_list=None, items_key='items'): """ Extract list of items from a CloudGenix API Response object. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **error_label:** Optional - text to describe operation on error. - **pass_code_list:** Optional - list of HTTP response codes to silently pass with empty list response. - **items_key:** Optional - Text for items key to extract (default 'items') **Returns:** list of 'items' objects. """ if pass_code_list is None: pass_code_list = [404, 400] items = resp_object.cgx_content.get(items_key) if resp_object.cgx_status and items is not None: return items # handle 404 and other error codes for certain APIs where objects may not exist elif resp_object.status_code in pass_code_list: return [{}] else: if error_label is not None: self.throw_error("Unable to extract '{0}' from {1}.".format(items_key, error_label), resp_object) return [{}] else: self.throw_error("Unable to extract '{0}' from response.".format(items_key), resp_object) return [{}]
def modify_rest_retry(
self, total=8, connect=None, read=None, redirect=None, status=None, other=0, allowed_methods=frozenset({'HEAD', 'PUT', 'GET', 'DELETE', 'OPTIONS', 'TRACE'}), status_forcelist=None, backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True, respect_retry_after_header=True, update_adapter=True)
Modify retry parameters for the SDKs rest call object.
Parameters are directly from and passed directly to urllib3.util.retry.Retry
, and get applied directly to
the underlying requests.Session
object.
Default retry with total=8 and backoff_factor=0.705883:
- Try 1, 0 delay (0 total seconds)
- Try 2, 0 delay (0 total seconds)
- Try 3, 0.705883 delay (0.705883 total seconds)
- Try 4, 1.411766 delay (2.117649 total seconds)
- Try 5, 2.823532 delay (4.941181 total seconds)
- Try 6, 5.647064 delay (10.588245 total seconds)
- Try 7, 11.294128 delay (21.882373 total seconds)
- Try 8, 22.588256 delay (44.470629 total seconds)
- Try 9, 45.176512 delay (89.647141 total seconds)
- Try 10, 90.353024 delay (180.000165 total seconds)
Parameters:
- total: int, Total number of retries to allow. Takes precedence over other counts.
- connect: int, How many connection-related errors to retry on.
- read: int, How many times to retry on read errors.
- redirect: int, How many redirects to perform. loops.
- status: int, How many times to retry on bad status codes.
- other: int, How many times to retry on other errors. Set to 0 by default for things like SSL errors.
- method_whitelist: iterable, Set of uppercased HTTP method verbs that we should retry on.
- status_forcelist: iterable, A set of integer HTTP status codes that we should force a retry on.
- backoff_factor: float, A backoff factor to apply between attempts after the second try.
- raise_on_redirect: bool, True = raise a MaxRetryError, False = return latest 3xx response.
- raise_on_status: bool, Similar logic to
raise_on_redirect
but for status responses. - respect_retry_after_header: bool, Whether to respect Retry-After header on status codes.
- adapter_url: string, URL match for these retry values (default
https://
) - update_adapter: bool, call update adapter function after running to auto update adapter.
Returns: No return, mutates the retry / session directly
def modify_rest_retry(self, total=8, connect=None, read=None, redirect=None, status=None, other=0, allowed_methods=urllib3.util.retry.Retry.DEFAULT_ALLOWED_METHODS, status_forcelist=None, backoff_factor=0.705883, raise_on_redirect=True, raise_on_status=True, respect_retry_after_header=True, update_adapter=True): """ Modify retry parameters for the SDKs rest call object. Parameters are directly from and passed directly to `urllib3.util.retry.Retry`, and get applied directly to the underlying `requests.Session` object. Default retry with total=8 and backoff_factor=0.705883: - Try 1, 0 delay (0 total seconds) - Try 2, 0 delay (0 total seconds) - Try 3, 0.705883 delay (0.705883 total seconds) - Try 4, 1.411766 delay (2.117649 total seconds) - Try 5, 2.823532 delay (4.941181 total seconds) - Try 6, 5.647064 delay (10.588245 total seconds) - Try 7, 11.294128 delay (21.882373 total seconds) - Try 8, 22.588256 delay (44.470629 total seconds) - Try 9, 45.176512 delay (89.647141 total seconds) - Try 10, 90.353024 delay (180.000165 total seconds) **Parameters:** - **total:** int, Total number of retries to allow. Takes precedence over other counts. - **connect:** int, How many connection-related errors to retry on. - **read:** int, How many times to retry on read errors. - **redirect:** int, How many redirects to perform. loops. - **status:** int, How many times to retry on bad status codes. - **other:** int, How many times to retry on other errors. Set to 0 by default for things like SSL errors. - **method_whitelist:** iterable, Set of uppercased HTTP method verbs that we should retry on. - **status_forcelist:** iterable, A set of integer HTTP status codes that we should force a retry on. - **backoff_factor:** float, A backoff factor to apply between attempts after the second try. - **raise_on_redirect:** bool, True = raise a MaxRetryError, False = return latest 3xx response. - **raise_on_status:** bool, Similar logic to ``raise_on_redirect`` but for status responses. - **respect_retry_after_header:** bool, Whether to respect Retry-After header on status codes. - **adapter_url:** string, URL match for these retry values (default `https://`) - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the retry / session directly """ # Cloudgenix responses with 502/504 are usually recoverable. Use them if no list specified. if status_forcelist is None: status_forcelist = (413, 429, 502, 503, 504) retry = urllib3.util.retry.Retry(total=total, connect=connect, read=read, redirect=redirect, status=status, other=other, allowed_methods=allowed_methods, status_forcelist=status_forcelist, backoff_factor=backoff_factor, raise_on_redirect=raise_on_redirect, raise_on_status=raise_on_status, respect_retry_after_header=respect_retry_after_header) # set updated retry object. self._rest_call_retry_object = retry # call update to mount/remount session adapter if update_adapter: self.update_session_adapter() return
def notify_for_new_version(
self)
Check for a new version of the SDK on API constructor instantiation. If new version found, print Notification to STDERR.
On failure of this check, fail silently.
Returns: No item returned, directly prints notification to sys.stderr
.
def notify_for_new_version(self): """ Check for a new version of the SDK on API constructor instantiation. If new version found, print Notification to STDERR. On failure of this check, fail silently. **Returns:** No item returned, directly prints notification to `sys.stderr`. """ # broad exception clause, if this fails for any reason just return. try: recommend_update = False update_check_resp = requests.get(self.update_info_url, timeout=3) web_version = update_check_resp.json()["info"]["version"] api_logger.debug("RETRIEVED_VERSION: %s", web_version) available_version = SDK_BUILD_REGEX.search(web_version).groupdict() current_version = SDK_BUILD_REGEX.search(self.version).groupdict() available_major = available_version.get('major') available_minor = available_version.get('minor') available_patch = available_version.get('patch') available_build = available_version.get('build') current_major = current_version.get('major') current_minor = current_version.get('minor') current_patch = current_version.get('patch') current_build = current_version.get('build') api_logger.debug("AVAILABLE_VERSION: %s", available_version) api_logger.debug("CURRENT_VERSION: %s", current_version) # check for major/minor version differences, do not alert for build differences. if available_major > current_major: recommend_update = True elif available_major >= current_major and available_minor > current_minor: recommend_update = True elif available_major >= current_major and available_minor >= current_minor and \ available_patch > current_patch: recommend_update = True api_logger.debug("NEED_UPDATE: %s", recommend_update) # notify. if recommend_update: sys.stderr.write("WARNING: CloudGenix Python SDK upgrade available. SDKs are often deprecated 6 " "months after release of a new version.\n" "\tLatest Version: {0}\n" "\tCurrent Version: {1}\n" "\tFor more info, see 'https://github.com/cloudgenix/sdk-python'. Additionally, this " "message can be suppressed by instantiating the API with API(update_check=False).\n\n" "".format(web_version, self.version)) return except Exception: # just return and continue. return
def parse_auth_token(
self, auth_token)
Break auth_token up into its constituent values.
Parameters:
- auth_token: Auth_token string
Returns: dict with Auth Token constituents
def parse_auth_token(self, auth_token): """ Break auth_token up into its constituent values. **Parameters:** - **auth_token:** Auth_token string **Returns:** dict with Auth Token constituents """ # remove the random security key value from the front of the auth_token auth_token_cleaned = auth_token.split('-', 1)[1] # URL Decode the Auth Token auth_token_decoded = self.url_decode(auth_token_cleaned) # Create a new dict to hold the response. auth_dict = {} # Parse the token for key_value in auth_token_decoded.split("&"): key_value_list = key_value.split("=") # check for valid token parts if len(key_value_list) == 2 and type(key_value_list[0]) in [text_type, binary_type]: auth_dict[key_value_list[0]] = key_value_list[1] # Return the dict of key/values in the token. return auth_dict
def parse_region(
self, login_response)
Return region from a successful login response.
Parameters:
- login_response: requests.Response from a successful login.
Returns: region name.
def parse_region(self, login_response): """ Return region from a successful login response. **Parameters:** - **login_response:** requests.Response from a successful login. **Returns:** region name. """ auth_token = login_response.cgx_content['x_auth_token'] auth_token_dict = self.parse_auth_token(auth_token) auth_region = auth_token_dict.get('region') return auth_region
def pull_content_error(
resp_object, raw=False)
Parse API response object, return error detail text for printing on error in response content.
Parameters:
- resp_object: CloudGenix Extended
requests.Response
object. - raw: Optional. If True, return list of dicts (raw error messages.) Default False.
Returns: text_type error message, or list of dicts (if raw=True). None if no errors.
@staticmethod def pull_content_error(resp_object, raw=False): """ Parse API response object, return error detail text for printing on error in response content. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw error messages.) Default False. **Returns:** text_type error message, or list of dicts (if raw=True). None if no errors. """ api_logger.debug('pull_content_error function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_ERROR: not able to find a valid dict object in resp_object: {0}'.format(resp_object)) return None parsed_messages = [] errors = [] if isinstance(data, dict): # got a parsed response. errors = data.get('_error') if isinstance(errors, list): # Some errors in the response if raw is True: # just return raw error list return errors else: for error in errors: code = error.get('code') message = error.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif errors is None: # no errors, ensure list and empty. errors = [] else: # not a list.. put whatever it is into a list. errors = [errors] api_logger.debug("ERRORS: %s", errors) api_logger.debug("PARSED_ERRORS: %s", parsed_messages) # is parsed_messages empty and errors exist? dump errors as txt if not parsed_messages and len(errors) > 1: return text_type(errors) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of errors return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no errors return None
def pull_content_warning(
resp_object, raw=False)
Parse API response object, return text for printing on warning in response.
Parameters:
- resp_object: CloudGenix Extended
requests.Response
object. - raw: Optional. If True, return list of dicts (raw warning messages.) Default False.
Returns: text_type warning message, or list of dicts (if raw=True). None if no warnings.
@staticmethod def pull_content_warning(resp_object, raw=False): """ Parse API response object, return text for printing on warning in response. **Parameters:** - **resp_object:** CloudGenix Extended `requests.Response` object. - **raw:** Optional. If True, return list of dicts (raw warning messages.) Default False. **Returns:** text_type warning message, or list of dicts (if raw=True). None if no warnings. """ api_logger.debug('pull_content_warning function:') try: # attempt to grab the cgx_content. should always be a Dict if it exists. data = resp_object.cgx_content except (TypeError, ValueError, AttributeError): # cgx_content did not exist. check root object for dict as end-user may pass the content and not the # extended `requests.Response` object. data = resp_object if not isinstance(data, dict): # fast fail if data isn't correct format. api_logger.debug('PULL_WARNING: not able to find a valid dict object in resp_object: {0}' ''.format(resp_object)) return None parsed_messages = [] warnings = [] if isinstance(data, dict): # got a parsed response. warnings = data.get('_warning') if isinstance(warnings, list): # Some warnings in the response if raw is True: # just return raw warning list return warnings else: for warning in warnings: code = warning.get('code') message = warning.get('message') if code and message: parsed_messages.append("{0} ({1})".format(message, code)) elif warnings is None: # no warnings, ensure list and empty. warnings = [] else: # not a list.. put whatever it is into a list. warnings = [warnings] api_logger.debug("WARNINGS: %s", warnings) api_logger.debug("PARSED_WARNINGS: %s", parsed_messages) # is parsed_messages empty and warnings exist? dump warnings as txt if not parsed_messages and len(warnings) > 1: return text_type(warnings) elif len(parsed_messages) == 1: return text_type(parsed_messages[0]) elif len(parsed_messages) > 1: # return comma separated string of warnings return text_type("{0}, and {1}".format(", ".join(parsed_messages[:-1]), parsed_messages[-1])) else: # no warnings return None
def remove_header(
self, header)
Permanently remove a single header from session
Parameters:
- header: str of single header to remove
Returns: Mutates requests.Session()
object, no return.
def remove_header(self, header): """ Permanently remove a single header from session **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `requests.Session()` object, no return. """ # check for header first. Return silently if it does not exist. if self._session.headers.get(header) is not None: del self._session.headers[header] return
def reparse_login_cookie_after_region_update(
self, login_response)
Sometimes, login cookie gets sent with region info instead of api.cloudgenix.com. This function re-parses the original login request and applies cookies to the session if they now match the new region.
Parameters:
- login_response: requests.Response from a non-region login.
Returns: updates API() object directly, no return.
def rest_call(
self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False, retry=None, max_retry=None, retry_sleep=None)
Generic REST call worker function
Parameters:
- url: URL for the REST call
- method: METHOD for the REST call
- data: Optional DATA for the call (for POST/PUT/etc.)
- sensitive: Flag if content request/response should be hidden from logging functions
- timeout: Requests Timeout
- content_json: Bool on whether the Content-Type header should be set to application/json
- raw_msgs: True/False, if True, do not convert API sideband messages (warnings, errors) to text.
- retry: DEPRECATED - please use
modify_rest_retry
instead. - max_retry: DEPRECATED - please use
modify_rest_retry
instead. - retry_sleep: DEPRECATED - please use
modify_rest_retry
instead.
Returns: Requests.Response object, extended with:
- cgx_status: Bool, True if a successful CloudGenix response, False if error.
- cgx_content: Content of the response, guaranteed to be in Dict format. Empty/invalid responses will be converted to a Dict response.
- cgx_errors: Text error messages if any are present. None if none. List if raw_msgs is True.
- cgx_warnings: Text warning messages if any are present. None if none. List if raw_msgs is True.
def rest_call(self, url, method, data=None, sensitive=False, timeout=None, content_json=True, raw_msgs=False, retry=None, max_retry=None, retry_sleep=None): """ Generic REST call worker function **Parameters:** - **url:** URL for the REST call - **method:** METHOD for the REST call - **data:** Optional DATA for the call (for POST/PUT/etc.) - **sensitive:** Flag if content request/response should be hidden from logging functions - **timeout:** Requests Timeout - **content_json:** Bool on whether the Content-Type header should be set to application/json - **raw_msgs:** True/False, if True, do not convert API sideband messages (warnings, errors) to text. - **retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **max_retry:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. - **retry_sleep:** DEPRECATED - please use `cloudgenix.API.modify_rest_retry` instead. **Returns:** Requests.Response object, extended with: - **cgx_status**: Bool, True if a successful CloudGenix response, False if error. - **cgx_content**: Content of the response, guaranteed to be in Dict format. Empty/invalid responses will be converted to a Dict response. - **cgx_errors**: Text error messages if any are present. None if none. List if raw_msgs is True. - **cgx_warnings**: Text warning messages if any are present. None if none. List if raw_msgs is True. """ # pull retry related items from Constructor if not specified. if timeout is None: timeout = self.rest_call_timeout if retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if max_retry is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") if retry_sleep is not None: # Someone using deprecated retry code. Notify. sys.stderr.write("WARNING: 'max_retry' option of rest_call() has been deprecated. " "Please use 'API.modify_rest_retry()' instead.") # Get logging level, use this to bypass logging functions with possible large content if not set. logger_level = api_logger.getEffectiveLevel() # populate headers and cookies from session. if content_json and method.lower() not in ['get', 'delete']: headers = { 'Content-Type': 'application/json' } else: headers = {} # add session headers headers.update(self._session.headers) cookie = self._session.cookies.get_dict() # make sure data is populated if present. if isinstance(data, (list, dict)): data = json.dumps(data) api_logger.debug('REST_CALL URL = %s', url) # make request try: if not sensitive: api_logger.debug('\n\tREQUEST: %s %s\n\tHEADERS: %s\n\tCOOKIES: %s\n\tDATA: %s\n', method.upper(), url, headers, cookie, data) # Actual request response = self._session.request(method, url, data=data, stream=True, timeout=timeout, headers=headers, allow_redirects=False) # Request complete - lets parse. # if it's a non-CGX-good response, return with cgx_status = False if response.status_code not in [requests.codes.ok, requests.codes.no_content, requests.codes.found, requests.codes.moved]: # Simple JSON debug if not sensitive: try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) try: api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE: %s\n', text_type(response.text)) else: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') api_logger.debug("Error, non-200 response received: %s", response.status_code) # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) # We are in a failed request. If no error text in response, give the response code and detail. if response.cgx_errors is None: response.cgx_errors = text_type("{0} ({1})".format(response.reason, response.status_code)) return response else: # Simple JSON debug if not sensitive and (logger_level <= logging.DEBUG and logger_level != logging.NOTSET): try: api_logger.debug('RESPONSE HEADERS: %s\n', json.dumps( json.loads(text_type(response.headers)), indent=4)) api_logger.debug('RESPONSE: %s\n', json.dumps(response.json(), indent=4)) except ValueError: api_logger.debug('RESPONSE HEADERS: %s\n', text_type(response.headers)) api_logger.debug('RESPONSE: %s\n', text_type(response.text)) elif sensitive: api_logger.debug('RESPONSE NOT LOGGED (sensitive content)') # CGX extend requests.Response for return response.cgx_status = True response.cgx_content = self._catch_nonjson_streamresponse(response.text) # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, urllib3.exceptions.MaxRetryError)\ as e: api_logger.info("Error, %s.", text_type(e)) # make a requests.Response object for return since we didn't get one. response = requests.Response # CGX extend requests.Response for return response.cgx_status = False response.cgx_content = { '_error': [ { 'message': 'REST Request Exception: {}'.format(e), 'data': {}, } ] } # CGX extend requests.Response for any errors/warnings. response.cgx_warnings = self.pull_content_warning(response, raw=raw_msgs) response.cgx_errors = self.pull_content_error(response, raw=raw_msgs) return response
def set_debug(
self, debuglevel, set_format=None, set_handler=None)
Change the debug level of the API
Parameters:
- set_format: Optional. If set and text_type, use input for formatter. Otherwise, default formatter.
- set_format: Optional. If set and
logging.Handler
type, use input for handler. Otherwise, defaultlogging.StreamHandler()
Returns: No item returned.
def set_debug(self, debuglevel, set_format=None, set_handler=None): """ Change the debug level of the API **Parameters:** - **set_format:** Optional. If set and text_type, use input for formatter. Otherwise, default formatter. - **set_format:** Optional. If set and `logging.Handler` type, use input for handler. Otherwise, default `logging.StreamHandler()` **Returns:** No item returned. """ # set the logging formatter and stream handle if set_format is None: # default formatter api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") elif not isinstance(set_format, text_type): # not a valid format string. Set to default. api_formatter = logging.Formatter("%(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s") else: # valid logging string. api_formatter = logging.Formatter(set_format) # set the logging handler if supported handler is not passed. if set_handler is None: # Default handler api_handler = logging.StreamHandler() elif not isinstance(set_handler, (logging.FileHandler, logging.Handler, logging.NullHandler, logging.StreamHandler)): # not a valid handler. Set to default handler. api_handler = logging.StreamHandler() else: # passed valid handler api_handler = set_handler # set handler to use format. api_handler.setFormatter(api_formatter) # Get the loggers from other modules in prep for setting new handlers. urllib3_logger = logging.getLogger("requests.packages.urllib3") urllib3_retry_logger = logging.getLogger("urllib3.util.retry") cookie_logger = logging.getLogger("http.cookiejar") # remove existing handlers api_logger.handlers = [] urllib3_logger.handlers = [] urllib3_retry_logger.handlers = [] cookie_logger.handlers = [] ws_logger.handlers = [] # ok, lets set the new handlers. if isinstance(debuglevel, int): self._debuglevel = debuglevel if self._debuglevel == 1: api_logger.addHandler(api_handler) api_logger.setLevel(logging.INFO) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.INFO) elif self._debuglevel == 2: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) elif self._debuglevel >= 3: cookie_logger.addHandler(api_handler) cookie_logger.setLevel(logging.DEBUG) cookielib.debug = True urllib3_logger.addHandler(api_handler) urllib3_logger.setLevel(logging.DEBUG) urllib3_retry_logger.addHandler(api_handler) urllib3_retry_logger.setLevel(logging.DEBUG) api_logger.addHandler(api_handler) api_logger.setLevel(logging.DEBUG) ws_logger.addHandler(api_handler) ws_logger.setLevel(logging.DEBUG) else: # set to warning cookie_logger.setLevel(logging.WARNING) cookielib.debug = False urllib3_logger.setLevel(logging.WARNING) urllib3_retry_logger.setLevel(logging.WARNING) api_logger.setLevel(logging.WARNING) ws_logger.setLevel(logging.WARNING) return
def ssl_verify(
self, ssl_verify, ciphers='DEFAULT:!aNULL:!eNULL:!MD5:!3DES:!DES:!RC4:!IDEA:!SEED:!aDSS:!SRP:!PSK', options=<Options.OP_NO_COMPRESSION|OP_NO_TICKET: 147456>, tls_min=<TLSVersion.TLSv1_2: 771>, tls_max=<TLSVersion.MAXIMUM_SUPPORTED: -1>, update_adapter=True)
Modify ssl verification settings
Parameters:
- ssl_verify:
- True: Verify using builtin BYTE_CA_BUNDLE.
- False: No SSL Verification.
- *Str: Full path to a x509 PEM CA File or bundle.
- ciphers: List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified.
- options: List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified.
- tls_min: Lowest acceptable TLS version. Default is
ssl.TLSVersion.TLSv1_2
- tls_max: Maximum acceptable TLS version. Default is
ssl.TLSVersion.MAXIMUM_SUPPORTED
- update_adapter: bool, call update adapter function after running to auto update adapter.
Returns: No return, mutates the SSL Context / session in place directly.
def ssl_verify(self, ssl_verify, ciphers=DEFAULT_SSL_CIPHERS, options=DEFAULT_SSL_OPTIONS, tls_min=ssl.TLSVersion.TLSv1_2, tls_max=ssl.TLSVersion.MAXIMUM_SUPPORTED, update_adapter=True): """ Modify ssl verification settings **Parameters:** - **ssl_verify:** - **True:** Verify using builtin BYTE_CA_BUNDLE. - **False:** No SSL Verification. - ***Str:** Full path to a x509 PEM CA File or bundle. - **ciphers:** List of specific ciphers to allow. Default list is DEFAULT_SSL_CIPHERS if not specified. - **options:** List of SSL options. Default options are contained in DEFAULT_SSL_OPTIONS if not specified. - **tls_min:** Lowest acceptable TLS version. Default is `ssl.TLSVersion.TLSv1_2` - **tls_max:** Maximum acceptable TLS version. Default is `ssl.TLSVersion.MAXIMUM_SUPPORTED` - **update_adapter:** bool, call update adapter function after running to auto update adapter. **Returns:** No return, mutates the SSL Context / session in place directly. """ self.verify = ssl_verify # if verify true/false, set ca_verify_file appropriately if isinstance(self.verify, bool): if self.verify: # True # load default CA list and set default SSL ciphers. self._ca_ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self._session.verify = True else: # False # disable warnings for SSL certs. urllib3.disable_warnings() self._ca_ssl_context = ssl.SSLContext() self._ca_ssl_context.check_hostname = False self._ca_ssl_context.verify_mode = ssl.CERT_NONE self._session.verify = False else: # Not True/False, assume path to file/dir for Requests # set filename/filepath for context self._ca_ssl_context = ssl.create_default_context(cafile=self.verify, capath=self.verify) self._ca_ssl_context.check_hostname = True self._session.verify = True # set the CA independent values self._ca_ssl_context.minimum_version = tls_min self._ca_ssl_context.maximum_version = tls_max self._ca_ssl_context.set_ciphers(ciphers) self._ca_ssl_context.options |= options # update the session adapter. if update_adapter: self.update_session_adapter() return
def throw_error(
message, resp=None, cr=True, exception=<class 'cloudgenix.CloudGenixAPIError'>)
Non-recoverable error, write message to STDERR and raise exception
Parameters:
- message: Message text
- resp: Optional - CloudGenix SDK Response object
- cr: Optional - Use (or not) Carriage Returns.
- exception: Optional - Custom Exception to throw, otherwise uses
CloudGenixAPIError
Returns: No Return, throws exception.
@staticmethod def throw_error(message, resp=None, cr=True, exception=CloudGenixAPIError): """ Non-recoverable error, write message to STDERR and raise exception **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. - **exception:** Optional - Custom Exception to throw, otherwise uses `CloudGenixAPIError` **Returns:** No Return, throws exception. """ output = "ERROR: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) raise exception(message)
def throw_warning(
message, resp=None, cr=True)
Recoverable Warning.
Parameters:
- message: Message text
- resp: Optional - CloudGenix SDK Response object
- cr: Optional - Use (or not) Carriage Returns.
Returns: No Return.
@staticmethod def throw_warning(message, resp=None, cr=True): """ Recoverable Warning. **Parameters:** - **message:** Message text - **resp:** Optional - CloudGenix SDK Response object - **cr:** Optional - Use (or not) Carriage Returns. **Returns:** No Return. """ output = "WARNING: " + str(message) if cr: output += "\n" sys.stderr.write(output) if resp is not None: output2 = str(jdout_detailed(resp)) if cr: output2 += "\n" sys.stderr.write(output2) return
def update_region_to_controller(
self, region)
Update the controller string with dynamic region info.
Controller string should end up as <name[-env]>.<region>.cloudgenix.com
Parameters:
- region: region string.
Returns: No return value, mutates the controller in the class namespace
def update_region_to_controller(self, region): """ Update the controller string with dynamic region info. Controller string should end up as `<name[-env]>.<region>.cloudgenix.com` **Parameters:** - **region:** region string. **Returns:** No return value, mutates the controller in the class namespace """ # default region position in a list region_position = 1 # Check for a global "ignore region" flag if self.ignore_region: # bypass api_logger.debug("IGNORE_REGION set, not updating controller region.") return api_logger.debug("Updating Controller Region") api_logger.debug("CONTROLLER = %s", self.controller) api_logger.debug("CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("CONTROLLER_REGION = %s", self.controller_region) # Check if this is an initial region use or an update region use if self.controller_orig: controller_base = self.controller_orig else: controller_base = self.controller self.controller_orig = self.controller # splice controller string controller_full_part_list = controller_base.split('.') for idx, part in enumerate(controller_full_part_list): # is the region already in the controller string? if region == part: # yes, controller already has appropriate region api_logger.debug("REGION %s ALREADY IN BASE CONTROLLER AT INDEX = %s", region, idx) # update region if it is not already set. if self.controller_region != region: self.controller_region = region api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) # Update controller if not already matching if self.controller != controller_base: self.controller = controller_base api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) return controller_part_count = len(controller_full_part_list) # handle short domain case if controller_part_count > 1: # insert region controller_full_part_list[region_position] = region self.controller = ".".join(controller_full_part_list) else: # short domain, just add region self.controller = ".".join(controller_full_part_list) + '.' + region # update SDK vars with region info self.controller_orig = controller_base self.controller_region = region api_logger.debug("UPDATED_CONTROLLER = %s", self.controller) api_logger.debug("UPDATED_CONTROLLER_ORIG = %s", self.controller_orig) api_logger.debug("UPDATED_CONTROLLER_REGION = %s", self.controller_region) return
def update_session_adapter(
self, adapter=None, retry=None, ssl_context=None, adapter_url='https://')
Mount/remount the HTTPS session adapter after changes that affect it.
Parameters:
- adapter:
requests.adapters.HTTPAdapter
, or will use/create default adapter. - retry:
urllib3.util.retry.Retry
object, or will use default retry object. - ssl_context:
ssl.SSLContext
object, or will use default SSL Context. - adapter_url: URL to use for matching requests. Defaults to 'https://'
Returns: No return, mutates the session adapter directly
def update_session_adapter(self, adapter=None, retry=None, ssl_context=None, adapter_url="https://"): """ Mount/remount the HTTPS session adapter after changes that affect it. **Parameters:** - **adapter:** `requests.adapters.HTTPAdapter`, or will use/create default adapter. - **retry:** `urllib3.util.retry.Retry` object, or will use default retry object. - **ssl_context:** `ssl.SSLContext` object, or will use default SSL Context. - **adapter_url:** URL to use for matching requests. Defaults to 'https://' **Returns:** No return, mutates the session adapter directly """ # use default or specified session objects. use_retry = retry or self._rest_call_retry_object use_ssl_context = ssl_context or self._ca_ssl_context # if object adapter has not been built, create a new one. if adapter is None: # does an effective retry object exist? if use_retry is None: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context) else: self._rest_connection_adapter = TlsHttpAdapter(ssl_context=use_ssl_context, max_retries=use_retry) else: # if adapter is specified, nothing else should be specified. if retry is not None or ssl_context is not None: self.throw_warning("Error: HTTPAdapter was specified with retry and ssl_context. These will be ignored." " retry and ssl_contexts need to be set in the adapter itself.") # Pick the specified or default adapter. use_adapter = adapter or self._rest_connection_adapter # mount the adapter. self._session.mount(adapter_url, use_adapter) return
def url_decode(
url)
URL Decode function using REGEX
Parameters:
- url: URLENCODED text string
Returns: Non URLENCODED string
@staticmethod def url_decode(url): """ URL Decode function using REGEX **Parameters:** - **url:** URLENCODED text string **Returns:** Non URLENCODED string """ return re.compile('%([0-9a-fA-F]{2})', re.M).sub(lambda m: chr(int(m.group(1), 16)), url)
def view_cookies(
self)
View current cookies in the requests.Session()
object
Returns: List of Dicts, one cookie per Dict.
def view_headers(
self)
View current headers in the requests.Session()
object
Returns: Dict, Key header, value is header value.
def view_headers(self): """ View current headers in the `requests.Session()` object **Returns:** Dict, Key header, value is header value. """ return dict(self._session.headers)
def view_rest_retry(
self, url=None)
View current rest retry settings in the requests.Session()
object
Parameters:
- url: URL to use to determine retry methods for. Defaults to 'https://'
Returns: Dict, Key header, value is header value.
def view_rest_retry(self, url=None): """ View current rest retry settings in the `requests.Session()` object **Parameters:** - **url:** URL to use to determine retry methods for. Defaults to 'https://' **Returns:** Dict, Key header, value is header value. """ if url is None: url = "https://" return vars(self._session.get_adapter(url).max_retries)
def websocket_add_headers(
self, headers)
Permanently add/overwrite headers to the API()
WebSocket object
Parameters:
- headers: dict with header/value
Returns: Mutates API()
object, no return.
def websocket_add_headers(self, headers): """ Permanently add/overwrite headers to the `API()` WebSocket object **Parameters:** - **headers:** dict with header/value **Returns:** Mutates `API()` object, no return. """ self._websocket_headers.update(headers) return
def websocket_call(
self, url, *args, **kwargs)
Generic WebSocket worker function, automatically uses authentication from cloudgenix.API()
session.
Parameters:
- url: URL for the REST call
- Any other
websocket.client.Connect
argument or keyword argument (see NOTE: below)
Returns: websocket.client.Connect
object.
NOTE: Any websocket.client.Connect
supported argument or keyword argument will be accepted, and will
be passed to the underlying Connect() request. ssl
and extra_header
keyword arguments will override the SDK
auto-generated cookies/headers and SSL contexts used for authentication. For more info on available options, see
https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect
def websocket_call(self, url, *args, **kwargs): """ Generic WebSocket worker function, automatically uses authentication from `cloudgenix.API()` session. **Parameters:** - **url:** URL for the REST call - Any other `websocket.client.Connect` argument or keyword argument (see NOTE: below) **Returns:** `websocket.client.Connect` object. **NOTE:** Any `websocket.client.Connect` supported argument or keyword argument will be accepted, and will be passed to the underlying Connect() request. **`ssl` and `extra_header` keyword arguments will override the SDK auto-generated cookies/headers and SSL contexts used for authentication.** For more info on available options, see <https://websockets.readthedocs.io/en/stable/api.html#websockets.client.connect> """ headers = {} # add session headers headers.update(self._websocket_headers) # Get cookies from requests. cookies = self._session.cookies.get_dict() # create cookie header from the cookies in Requests headers["Cookie"] = "; ".join(["{0}={1}".format(key, value) for key, value in cookies.items()]) # check for host header host_header = headers.get("Host") force_host_arg = None if host_header is not None: # Ok, we got a host header. Unlike Requests, websockets wants the "Host Header" in the URI. You pass # a separate IP to connect to as 'host' argument to `websockets.client.Connect`. # So, to keep our "requests"-like behavior, we need to replace the value in the URI with the host # header, and remove the host header - then send the original value to `Connect` as host kwarg. # split controller at "//" (https://controller.host.com) to get host. controller_host = self.controller.split("//")[1] force_host_arg = controller_host # we don't support user/password in URI, so replacing first instance 'should' be OK (famous last words) new_url = url.replace(force_host_arg, host_header, 1) api_logger.debug("Host replacement. Original URL: {0}, New URL: {1}".format(url, new_url)) url = new_url # delete the host header del headers["Host"] # convert the headers dictionary to a list of tuples. header_tuple_list = [(key, value) for key, value in headers.items()] # Create argument tuple ws_args = (url,) + args # Create keyword args ws_kwargs = { "ssl": self._ca_ssl_context, "extra_headers": header_tuple_list } # check for force host arg if force_host_arg is not None: api_logger.debug("Forcing connection to host {0}".format(force_host_arg)) ws_kwargs["host"] = force_host_arg # Override automatic with any manually passed kwargs ws_kwargs.update(kwargs) return websockets.connect(*ws_args, **ws_kwargs)
def websocket_remove_header(
self, header)
Permanently remove a single header from the API()
WebSocket object
Parameters:
- header: str of single header to remove
Returns: Mutates API()
object, no return.
def websocket_remove_header(self, header): """ Permanently remove a single header from the `API()` WebSocket object **Parameters:** - **header:** str of single header to remove **Returns:** Mutates `API()` object, no return. """ # check for header first. Return silently if it does not exist. if self._websocket_headers.get(header) is not None: del self._websocket_headers[header] return
def websocket_view_headers(
self)
View current headers in the API()
WebSocket object
Returns: Dict, Key header, value is header value.
def websocket_view_headers(self): """ View current headers in the `API()` WebSocket object **Returns:** Dict, Key header, value is header value. """ return dict(self._websocket_headers)
Instance variables
var update_info_url
Update Info URL for use once Constructor Created.
var version
Version string for use once Constructor created.
var ws
API object link to cloudgenix.ws.WebSockets
class CloudGenixAPIError
Custom exception for errors when not exiting.
class CloudGenixAPIError(Exception): """ Custom exception for errors when not exiting. """ pass
Ancestors (in MRO)
- CloudGenixAPIError
- builtins.Exception
- builtins.BaseException
- builtins.object
Class variables
var args
class TlsHttpAdapter
A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context.
This allows for fine-grained SSL Cipher, Options and CA specification.
class TlsHttpAdapter(HTTPAdapter): """ A patched Requests HTTP Transport adapter that allows specification of a specific SSL Context. This allows for fine-grained SSL Cipher, Options and CA specification. """ def __init__(self, ssl_context=None, **kwargs): if ssl_context is None: """ Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK, but is a desirable alternative to nothing. """ self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self.ssl_context.set_ciphers(DEFAULT_SSL_CIPHERS) self.ssl_context.options |= DEFAULT_SSL_OPTIONS else: self.ssl_context = ssl_context super(TlsHttpAdapter, self).__init__(**kwargs) def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs) def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs)
Ancestors (in MRO)
- TlsHttpAdapter
- requests.adapters.HTTPAdapter
- requests.adapters.BaseAdapter
- builtins.object
Static methods
def __init__(
self, ssl_context=None, **kwargs)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, ssl_context=None, **kwargs): if ssl_context is None: """ Create a default SSL context for requests, if one was not specified. This likely won't be usable by the SDK, but is a desirable alternative to nothing. """ self.ssl_context = ssl.create_default_context(cadata=BYTE_CA_BUNDLE.decode('ascii')) self.ssl_context.set_ciphers(DEFAULT_SSL_CIPHERS) self.ssl_context.options |= DEFAULT_SSL_OPTIONS else: self.ssl_context = ssl_context super(TlsHttpAdapter, self).__init__(**kwargs)
def add_headers(
self, request, **kwargs)
Add any headers needed by the connection. As of v2.0 this does
nothing by default, but is left for overriding by users that subclass
the :class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param request: The :class:PreparedRequest <PreparedRequest>
to add headers to.
:param kwargs: The keyword arguments from the call to send().
def add_headers(self, request, **kwargs): """Add any headers needed by the connection. As of v2.0 this does nothing by default, but is left for overriding by users that subclass the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param request: The :class:`PreparedRequest <PreparedRequest>` to add headers to. :param kwargs: The keyword arguments from the call to send(). """ pass
def build_response(
self, req, resp)
Builds a :class:Response <requests.Response>
object from a urllib3
response. This should not be called from user code, and is only exposed
for use when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
:param req: The :class:PreparedRequest <PreparedRequest>
used to generate the response.
:param resp: The urllib3 response object.
:rtype: requests.Response
def build_response(self, req, resp): """Builds a :class:`Response <requests.Response>` object from a urllib3 response. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>` :param req: The :class:`PreparedRequest <PreparedRequest>` used to generate the response. :param resp: The urllib3 response object. :rtype: requests.Response """ response = Response() # Fallback to None if there's no status_code, for whatever reason. response.status_code = getattr(resp, "status", None) # Make headers case-insensitive. response.headers = CaseInsensitiveDict(getattr(resp, "headers", {})) # Set encoding. response.encoding = get_encoding_from_headers(response.headers) response.raw = resp response.reason = response.raw.reason if isinstance(req.url, bytes): response.url = req.url.decode("utf-8") else: response.url = req.url # Add new cookies from the server. extract_cookies_to_jar(response.cookies, req, resp) # Give the Response some context. response.request = req response.connection = self return response
def cert_verify(
self, conn, url, verify, cert)
Verify a SSL certificate. This method should not be called from user
code, and is only exposed for use when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param conn: The urllib3 connection object associated with the cert. :param url: The requested URL. :param verify: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: The SSL certificate to verify.
def cert_verify(self, conn, url, verify, cert): """Verify a SSL certificate. This method should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param conn: The urllib3 connection object associated with the cert. :param url: The requested URL. :param verify: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: The SSL certificate to verify. """ if url.lower().startswith("https") and verify: cert_loc = None # Allow self-specified cert location. if verify is not True: cert_loc = verify if not cert_loc: cert_loc = extract_zipped_paths(DEFAULT_CA_BUNDLE_PATH) if not cert_loc or not os.path.exists(cert_loc): raise OSError( f"Could not find a suitable TLS CA certificate bundle, " f"invalid path: {cert_loc}" ) conn.cert_reqs = "CERT_REQUIRED" if not os.path.isdir(cert_loc): conn.ca_certs = cert_loc else: conn.ca_cert_dir = cert_loc else: conn.cert_reqs = "CERT_NONE" conn.ca_certs = None conn.ca_cert_dir = None if cert: if not isinstance(cert, basestring): conn.cert_file = cert[0] conn.key_file = cert[1] else: conn.cert_file = cert conn.key_file = None if conn.cert_file and not os.path.exists(conn.cert_file): raise OSError( f"Could not find the TLS certificate file, " f"invalid path: {conn.cert_file}" ) if conn.key_file and not os.path.exists(conn.key_file): raise OSError( f"Could not find the TLS key file, invalid path: {conn.key_file}" )
def close(
self)
Disposes of any internal state.
Currently, this closes the PoolManager and any active ProxyManager, which closes any pooled connections.
def close(self): """Disposes of any internal state. Currently, this closes the PoolManager and any active ProxyManager, which closes any pooled connections. """ self.poolmanager.clear() for proxy in self.proxy_manager.values(): proxy.clear()
def get_connection(
self, url, proxies=None)
Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. :rtype: urllib3.ConnectionPool
def get_connection(self, url, proxies=None): """Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. :rtype: urllib3.ConnectionPool """ proxy = select_proxy(url, proxies) if proxy: proxy = prepend_scheme_if_needed(proxy, "http") proxy_url = parse_url(proxy) if not proxy_url.host: raise InvalidProxyURL( "Please check proxy URL. It is malformed " "and could be missing the host." ) proxy_manager = self.proxy_manager_for(proxy) conn = proxy_manager.connection_from_url(url) else: # Only scheme should be lower case parsed = urlparse(url) url = parsed.geturl() conn = self.poolmanager.connection_from_url(url) return conn
def init_poolmanager(
self, *args, **kwargs)
Initializes a urllib3 PoolManager.
This method should not be called from user code, and is only
exposed for use when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.
def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_headers(
self, proxy)
Returns a dictionary of the headers to add to any request sent through a proxy. This works with urllib3 magic to ensure that they are correctly sent to the proxy, rather than in a tunnelled request if CONNECT is being used.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param proxy: The url of the proxy being used for this request. :rtype: dict
def proxy_headers(self, proxy): """Returns a dictionary of the headers to add to any request sent through a proxy. This works with urllib3 magic to ensure that they are correctly sent to the proxy, rather than in a tunnelled request if CONNECT is being used. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param proxy: The url of the proxy being used for this request. :rtype: dict """ headers = {} username, password = get_auth_from_url(proxy) if username: headers["Proxy-Authorization"] = _basic_auth_str(username, password) return headers
def proxy_manager_for(
self, *args, **kwargs)
Return urllib3 ProxyManager for the given proxy.
This method should not be called from user code, and is only
exposed for use when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param proxy: The proxy to return a urllib3 ProxyManager for. :param proxy_kwargs: Extra keyword arguments used to configure the Proxy Manager. :returns: ProxyManager :rtype: urllib3.ProxyManager
def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(TlsHttpAdapter, self).proxy_manager_for(*args, **kwargs)
def request_url(
self, request, proxies)
Obtain the url to use when making the final request.
If the message is being sent through a HTTP proxy, the full URL has to be used. Otherwise, we should only use the path portion of the URL.
This should not be called from user code, and is only exposed for use
when subclassing the
:class:HTTPAdapter <requests.adapters.HTTPAdapter>
.
:param request: The :class:PreparedRequest <PreparedRequest>
being sent.
:param proxies: A dictionary of schemes or schemes and hosts to proxy URLs.
:rtype: str
def request_url(self, request, proxies): """Obtain the url to use when making the final request. If the message is being sent through a HTTP proxy, the full URL has to be used. Otherwise, we should only use the path portion of the URL. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs. :rtype: str """ proxy = select_proxy(request.url, proxies) scheme = urlparse(request.url).scheme is_proxied_http_request = proxy and scheme != "https" using_socks_proxy = False if proxy: proxy_scheme = urlparse(proxy).scheme.lower() using_socks_proxy = proxy_scheme.startswith("socks") url = request.path_url if is_proxied_http_request and not using_socks_proxy: url = urldefragauth(request.url) return url
def send(
self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None)
Sends PreparedRequest object. Returns Response object.
:param request: The :class:PreparedRequest <PreparedRequest>
being sent.
:param stream: (optional) Whether to stream the request content.
:param timeout: (optional) How long to wait for the server to send
data before giving up, as a float, or a :ref:(connect timeout,
read timeout) <timeouts>
tuple.
:type timeout: float or tuple or urllib3 Timeout object
:param verify: (optional) Either a boolean, in which case it controls whether
we verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
:rtype: requests.Response
def send( self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None ): """Sends PreparedRequest object. Returns Response object. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response """ try: conn = self.get_connection(request.url, proxies) except LocationValueError as e: raise InvalidURL(e, request=request) self.cert_verify(conn, request.url, verify, cert) url = self.request_url(request, proxies) self.add_headers( request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, ) chunked = not (request.body is None or "Content-Length" in request.headers) if isinstance(timeout, tuple): try: connect, read = timeout timeout = TimeoutSauce(connect=connect, read=read) except ValueError: raise ValueError( f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, " f"or a single float to set both timeouts to the same value." ) elif isinstance(timeout, TimeoutSauce): pass else: timeout = TimeoutSauce(connect=timeout, read=timeout) try: resp = conn.urlopen( method=request.method, url=url, body=request.body, headers=request.headers, redirect=False, assert_same_host=False, preload_content=False, decode_content=False, retries=self.max_retries, timeout=timeout, chunked=chunked, ) except (ProtocolError, OSError) as err: raise ConnectionError(err, request=request) except MaxRetryError as e: if isinstance(e.reason, ConnectTimeoutError): # TODO: Remove this in 3.0.0: see #2811 if not isinstance(e.reason, NewConnectionError): raise ConnectTimeout(e, request=request) if isinstance(e.reason, ResponseError): raise RetryError(e, request=request) if isinstance(e.reason, _ProxyError): raise ProxyError(e, request=request) if isinstance(e.reason, _SSLError): # This branch is for urllib3 v1.22 and later. raise SSLError(e, request=request) raise ConnectionError(e, request=request) except ClosedPoolError as e: raise ConnectionError(e, request=request) except _ProxyError as e: raise ProxyError(e) except (_SSLError, _HTTPError) as e: if isinstance(e, _SSLError): # This branch is for urllib3 versions earlier than v1.22 raise SSLError(e, request=request) elif isinstance(e, ReadTimeoutError): raise ReadTimeout(e, request=request) elif isinstance(e, _InvalidHeader): raise InvalidHeader(e, request=request) else: raise return self.build_response(request, resp)
Sub-modules
CloudGenix Explicit CA Certificate Bundle for API calls (CA Pinning).
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - DELETE
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - GET
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python Interactive SDK Helper functions
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - PATCH
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - POST
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - PUT
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT
CloudGenix Python SDK - WebSocket Functions
Author: CloudGenix
Copyright: (c) 2017-2024 CloudGenix, Inc
License: MIT