OXIESEC PANEL
- Current Dir:
/
/
usr
/
lib
/
python3
/
dist-packages
/
certbot
Server IP: 139.59.38.164
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
03/17/2025 09:32:20 AM
rwxr-xr-x
📄
__init__.py
114 bytes
02/07/2019 09:20:31 PM
rw-r--r--
📁
__pycache__
-
05/25/2021 01:14:27 PM
rwxr-xr-x
📄
account.py
13.98 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
achallenges.py
1.59 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
auth_handler.py
20.92 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
cert_manager.py
15.1 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
cli.py
71.49 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
client.py
28.72 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
compat.py
6.91 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
configuration.py
5.66 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
constants.py
6.54 KB
12/18/2020 08:47:58 PM
rw-r--r--
📄
crypto_util.py
15.29 KB
02/07/2019 09:20:29 PM
rw-r--r--
📁
display
-
05/25/2021 01:14:26 PM
rwxr-xr-x
📄
eff.py
3.07 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
error_handler.py
5.81 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
errors.py
2.59 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
hooks.py
8.44 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
interfaces.py
22.02 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
lock.py
3.56 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
log.py
12.39 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
main.py
48.47 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
notify.py
1.04 KB
02/07/2019 09:20:29 PM
rw-r--r--
📄
ocsp.py
4.1 KB
02/07/2019 09:20:29 PM
rw-r--r--
📁
plugins
-
05/25/2021 01:14:26 PM
rwxr-xr-x
📄
renewal.py
20.91 KB
12/18/2020 08:47:58 PM
rw-r--r--
📄
reporter.py
3.46 KB
02/07/2019 09:20:30 PM
rw-r--r--
📄
reverter.py
23.32 KB
02/07/2019 09:20:30 PM
rw-r--r--
📄
ssl-dhparams.pem
424 bytes
02/07/2019 09:20:30 PM
rw-r--r--
📄
storage.py
44.91 KB
02/07/2019 09:20:30 PM
rw-r--r--
📁
tests
-
05/25/2021 01:14:27 PM
rwxr-xr-x
📄
updater.py
3.86 KB
02/07/2019 09:20:30 PM
rw-r--r--
📄
util.py
20.35 KB
02/07/2019 09:20:30 PM
rw-r--r--
Editing: auth_handler.py
Close
"""ACME AuthHandler.""" import collections import logging import time import six import zope.component from acme import challenges from acme import messages # pylint: disable=unused-import, no-name-in-module from acme.magic_typing import DefaultDict, Dict, List, Set, Collection # pylint: enable=unused-import, no-name-in-module from certbot import achallenges from certbot import errors from certbot import error_handler from certbot import interfaces logger = logging.getLogger(__name__) AnnotatedAuthzr = collections.namedtuple("AnnotatedAuthzr", ["authzr", "achalls"]) """Stores an authorization resource and its active annotated challenges.""" class AuthHandler(object): """ACME Authorization Handler for a client. :ivar auth: Authenticator capable of solving :class:`~acme.challenges.Challenge` types :type auth: :class:`certbot.interfaces.IAuthenticator` :ivar acme.client.BackwardsCompatibleClientV2 acme_client: ACME client API. :ivar account: Client's Account :type account: :class:`certbot.account.Account` :ivar list pref_challs: sorted user specified preferred challenges type strings with the most preferred challenge listed first """ def __init__(self, auth, acme_client, account, pref_challs): self.auth = auth self.acme = acme_client self.account = account self.pref_challs = pref_challs def handle_authorizations(self, orderr, best_effort=False): """Retrieve all authorizations for challenges. :param acme.messages.OrderResource orderr: must have authorizations filled in :param bool best_effort: Whether or not all authorizations are required (this is useful in renewal) :returns: List of authorization resources :rtype: list :raises .AuthorizationError: If unable to retrieve all authorizations """ aauthzrs = [AnnotatedAuthzr(authzr, []) for authzr in orderr.authorizations] self._choose_challenges(aauthzrs) config = zope.component.getUtility(interfaces.IConfig) notify = zope.component.getUtility(interfaces.IDisplay).notification # While there are still challenges remaining... while self._has_challenges(aauthzrs): with error_handler.ExitHandler(self._cleanup_challenges, aauthzrs): resp = self._solve_challenges(aauthzrs) logger.info("Waiting for verification...") if config.debug_challenges: notify('Challenges loaded. Press continue to submit to CA. ' 'Pass "-v" for more info about challenges.', pause=True) # Send all Responses - this modifies achalls self._respond(aauthzrs, resp, best_effort) # Just make sure all decisions are complete. self.verify_authzr_complete(aauthzrs) # Only return valid authorizations ret_val = [aauthzr.authzr for aauthzr in aauthzrs if aauthzr.authzr.body.status == messages.STATUS_VALID] if not ret_val: raise errors.AuthorizationError( "Challenges failed for all domains") return ret_val def _choose_challenges(self, aauthzrs): """ Retrieve necessary and pending challenges to satisfy server. NB: Necessary and already validated challenges are not retrieved, as they can be reused for a certificate issuance. """ pending_authzrs = [aauthzr for aauthzr in aauthzrs if aauthzr.authzr.body.status != messages.STATUS_VALID] if pending_authzrs: logger.info("Performing the following challenges:") for aauthzr in pending_authzrs: aauthzr_challenges = aauthzr.authzr.body.challenges if self.acme.acme_version == 1: combinations = aauthzr.authzr.body.combinations else: combinations = tuple((i,) for i in range(len(aauthzr_challenges))) path = gen_challenge_path( aauthzr_challenges, self._get_chall_pref(aauthzr.authzr.body.identifier.value), combinations) aauthzr_achalls = self._challenge_factory( aauthzr.authzr, path) aauthzr.achalls.extend(aauthzr_achalls) for aauthzr in aauthzrs: for achall in aauthzr.achalls: if isinstance(achall.chall, challenges.TLSSNI01): logger.warning("TLS-SNI-01 is deprecated, and will stop working soon.") return def _has_challenges(self, aauthzrs): """Do we have any challenges to perform?""" return any(aauthzr.achalls for aauthzr in aauthzrs) def _solve_challenges(self, aauthzrs): """Get Responses for challenges from authenticators.""" resp = [] # type: Collection[challenges.ChallengeResponse] all_achalls = self._get_all_achalls(aauthzrs) try: if all_achalls: resp = self.auth.perform(all_achalls) except errors.AuthorizationError: logger.critical("Failure in setting up challenges.") logger.info("Attempting to clean up outstanding challenges...") raise assert len(resp) == len(all_achalls) return resp def _get_all_achalls(self, aauthzrs): """Return all active challenges.""" all_achalls = [] # type: Collection[challenges.ChallengeResponse] for aauthzr in aauthzrs: all_achalls.extend(aauthzr.achalls) return all_achalls def _respond(self, aauthzrs, resp, best_effort): """Send/Receive confirmation of all challenges. .. note:: This method also cleans up the auth_handler state. """ # TODO: chall_update is a dirty hack to get around acme-spec #105 chall_update = dict() \ # type: Dict[int, List[achallenges.KeyAuthorizationAnnotatedChallenge]] self._send_responses(aauthzrs, resp, chall_update) # Check for updated status... self._poll_challenges(aauthzrs, chall_update, best_effort) def _send_responses(self, aauthzrs, resps, chall_update): """Send responses and make sure errors are handled. :param aauthzrs: authorizations and the selected annotated challenges to try and perform :type aauthzrs: `list` of `AnnotatedAuthzr` :param resps: challenge responses from the authenticator where each response at index i corresponds to the annotated challenge at index i in the list returned by :func:`_get_all_achalls` :type resps: `collections.abc.Iterable` of :class:`~acme.challenges.ChallengeResponse` or `False` or `None` :param dict chall_update: parameter that is updated to hold aauthzr index to list of outstanding solved annotated challenges """ active_achalls = [] resps_iter = iter(resps) for i, aauthzr in enumerate(aauthzrs): for achall in aauthzr.achalls: # This line needs to be outside of the if block below to # ensure failed challenges are cleaned up correctly active_achalls.append(achall) resp = next(resps_iter) # Don't send challenges for None and False authenticator responses if resp: self.acme.answer_challenge(achall.challb, resp) # TODO: answer_challenge returns challr, with URI, # that can be used in _find_updated_challr # comparisons... chall_update.setdefault(i, []).append(achall) return active_achalls def _poll_challenges(self, aauthzrs, chall_update, best_effort, min_sleep=3, max_rounds=30): """Wait for all challenge results to be determined.""" indices_to_check = set(chall_update.keys()) comp_indices = set() rounds = 0 while indices_to_check and rounds < max_rounds: # TODO: Use retry-after... time.sleep(min_sleep) all_failed_achalls = set() # type: Set[achallenges.KeyAuthorizationAnnotatedChallenge] for index in indices_to_check: comp_achalls, failed_achalls = self._handle_check( aauthzrs, index, chall_update[index]) if len(comp_achalls) == len(chall_update[index]): comp_indices.add(index) elif not failed_achalls: for achall, _ in comp_achalls: chall_update[index].remove(achall) # We failed some challenges... damage control else: if best_effort: comp_indices.add(index) logger.warning( "Challenge failed for domain %s", aauthzrs[index].authzr.body.identifier.value) else: all_failed_achalls.update( updated for _, updated in failed_achalls) if all_failed_achalls: _report_failed_challs(all_failed_achalls) raise errors.FailedChallenges(all_failed_achalls) indices_to_check -= comp_indices comp_indices.clear() rounds += 1 def _handle_check(self, aauthzrs, index, achalls): """Returns tuple of ('completed', 'failed').""" completed = [] failed = [] original_aauthzr = aauthzrs[index] updated_authzr, _ = self.acme.poll(original_aauthzr.authzr) aauthzrs[index] = AnnotatedAuthzr(updated_authzr, original_aauthzr.achalls) if updated_authzr.body.status == messages.STATUS_VALID: return achalls, [] # Note: if the whole authorization is invalid, the individual failed # challenges will be determined here... for achall in achalls: updated_achall = achall.update(challb=self._find_updated_challb( updated_authzr, achall)) # This does nothing for challenges that have yet to be decided yet. if updated_achall.status == messages.STATUS_VALID: completed.append((achall, updated_achall)) elif updated_achall.status == messages.STATUS_INVALID: failed.append((achall, updated_achall)) return completed, failed def _find_updated_challb(self, authzr, achall): # pylint: disable=no-self-use """Find updated challenge body within Authorization Resource. .. warning:: This assumes only one instance of type of challenge in each challenge resource. :param .AuthorizationResource authzr: Authorization Resource :param .AnnotatedChallenge achall: Annotated challenge for which to get status """ for authzr_challb in authzr.body.challenges: if type(authzr_challb.chall) is type(achall.challb.chall): # noqa return authzr_challb raise errors.AuthorizationError( "Target challenge not found in authorization resource") def _get_chall_pref(self, domain): """Return list of challenge preferences. :param str domain: domain for which you are requesting preferences """ chall_prefs = [] # Make sure to make a copy... plugin_pref = self.auth.get_chall_pref(domain) if self.pref_challs: plugin_pref_types = set(chall.typ for chall in plugin_pref) for typ in self.pref_challs: if typ in plugin_pref_types: chall_prefs.append(challenges.Challenge.TYPES[typ]) if chall_prefs: return chall_prefs raise errors.AuthorizationError( "None of the preferred challenges " "are supported by the selected plugin") chall_prefs.extend(plugin_pref) return chall_prefs def _cleanup_challenges(self, aauthzrs, achalls=None): """Cleanup challenges. :param aauthzrs: authorizations and their selected annotated challenges :type aauthzrs: `list` of `AnnotatedAuthzr` :param achalls: annotated challenges to cleanup :type achalls: `list` of :class:`certbot.achallenges.AnnotatedChallenge` """ logger.info("Cleaning up challenges") if achalls is None: achalls = self._get_all_achalls(aauthzrs) if achalls: self.auth.cleanup(achalls) for achall in achalls: for aauthzr in aauthzrs: if achall in aauthzr.achalls: aauthzr.achalls.remove(achall) break def verify_authzr_complete(self, aauthzrs): """Verifies that all authorizations have been decided. :param aauthzrs: authorizations and their selected annotated challenges :type aauthzrs: `list` of `AnnotatedAuthzr` :returns: Whether all authzr are complete :rtype: bool """ for aauthzr in aauthzrs: authzr = aauthzr.authzr if (authzr.body.status != messages.STATUS_VALID and authzr.body.status != messages.STATUS_INVALID): raise errors.AuthorizationError("Incomplete authorizations") def _challenge_factory(self, authzr, path): """Construct Namedtuple Challenges :param messages.AuthorizationResource authzr: authorization :param list path: List of indices from `challenges`. :returns: achalls, list of challenge type :class:`certbot.achallenges.Indexed` :rtype: list :raises .errors.Error: if challenge type is not recognized """ achalls = [] for index in path: challb = authzr.body.challenges[index] achalls.append(challb_to_achall( challb, self.account.key, authzr.body.identifier.value)) return achalls def challb_to_achall(challb, account_key, domain): """Converts a ChallengeBody object to an AnnotatedChallenge. :param .ChallengeBody challb: ChallengeBody :param .JWK account_key: Authorized Account Key :param str domain: Domain of the challb :returns: Appropriate AnnotatedChallenge :rtype: :class:`certbot.achallenges.AnnotatedChallenge` """ chall = challb.chall logger.info("%s challenge for %s", chall.typ, domain) if isinstance(chall, challenges.KeyAuthorizationChallenge): return achallenges.KeyAuthorizationAnnotatedChallenge( challb=challb, domain=domain, account_key=account_key) elif isinstance(chall, challenges.DNS): return achallenges.DNS(challb=challb, domain=domain) else: raise errors.Error( "Received unsupported challenge of type: %s", chall.typ) def gen_challenge_path(challbs, preferences, combinations): """Generate a plan to get authority over the identity. .. todo:: This can be possibly be rewritten to use resolved_combinations. :param tuple challbs: A tuple of challenges (:class:`acme.messages.Challenge`) from :class:`acme.messages.AuthorizationResource` to be fulfilled by the client in order to prove possession of the identifier. :param list preferences: List of challenge preferences for domain (:class:`acme.challenges.Challenge` subclasses) :param tuple combinations: A collection of sets of challenges from :class:`acme.messages.Challenge`, each of which would be sufficient to prove possession of the identifier. :returns: tuple of indices from ``challenges``. :rtype: tuple :raises certbot.errors.AuthorizationError: If a path cannot be created that satisfies the CA given the preferences and combinations. """ if combinations: return _find_smart_path(challbs, preferences, combinations) else: return _find_dumb_path(challbs, preferences) def _find_smart_path(challbs, preferences, combinations): """Find challenge path with server hints. Can be called if combinations is included. Function uses a simple ranking system to choose the combo with the lowest cost. """ chall_cost = {} max_cost = 1 for i, chall_cls in enumerate(preferences): chall_cost[chall_cls] = i max_cost += i # max_cost is now equal to sum(indices) + 1 best_combo = None # Set above completing all of the available challenges best_combo_cost = max_cost combo_total = 0 for combo in combinations: for challenge_index in combo: combo_total += chall_cost.get(challbs[ challenge_index].chall.__class__, max_cost) if combo_total < best_combo_cost: best_combo = combo best_combo_cost = combo_total combo_total = 0 if not best_combo: _report_no_chall_path(challbs) return best_combo def _find_dumb_path(challbs, preferences): """Find challenge path without server hints. Should be called if the combinations hint is not included by the server. This function either returns a path containing all challenges provided by the CA or raises an exception. """ path = [] for i, challb in enumerate(challbs): # supported is set to True if the challenge type is supported supported = next((True for pref_c in preferences if isinstance(challb.chall, pref_c)), False) if supported: path.append(i) else: _report_no_chall_path(challbs) return path def _report_no_chall_path(challbs): """Logs and raises an error that no satisfiable chall path exists. :param challbs: challenges from the authorization that can't be satisfied """ msg = ("Client with the currently selected authenticator does not support " "any combination of challenges that will satisfy the CA.") if len(challbs) == 1 and isinstance(challbs[0].chall, challenges.DNS01): msg += ( " You may need to use an authenticator " "plugin that can do challenges over DNS.") logger.critical(msg) raise errors.AuthorizationError(msg) _ERROR_HELP_COMMON = ( "To fix these errors, please make sure that your domain name was entered " "correctly and the DNS A/AAAA record(s) for that domain contain(s) the " "right IP address.") _ERROR_HELP = { "connection": _ERROR_HELP_COMMON + " Additionally, please check that your computer " "has a publicly routable IP address and that no firewalls are preventing " "the server from communicating with the client. If you're using the " "webroot plugin, you should also verify that you are serving files " "from the webroot path you provided.", "dnssec": _ERROR_HELP_COMMON + " Additionally, if you have DNSSEC enabled for " "your domain, please ensure that the signature is valid.", "malformed": "To fix these errors, please make sure that you did not provide any " "invalid information to the client, and try running Certbot " "again.", "serverInternal": "Unfortunately, an error on the ACME server prevented you from completing " "authorization. Please try again later.", "tls": _ERROR_HELP_COMMON + " Additionally, please check that you have an " "up-to-date TLS configuration that allows the server to communicate " "with the Certbot client.", "unauthorized": _ERROR_HELP_COMMON, "unknownHost": _ERROR_HELP_COMMON, } def _report_failed_challs(failed_achalls): """Notifies the user about failed challenges. :param set failed_achalls: A set of failed :class:`certbot.achallenges.AnnotatedChallenge`. """ problems = collections.defaultdict(list)\ # type: DefaultDict[str, List[achallenges.KeyAuthorizationAnnotatedChallenge]] for achall in failed_achalls: if achall.error: problems[achall.error.typ].append(achall) reporter = zope.component.getUtility(interfaces.IReporter) for achalls in six.itervalues(problems): reporter.add_message( _generate_failed_chall_msg(achalls), reporter.MEDIUM_PRIORITY) def _generate_failed_chall_msg(failed_achalls): """Creates a user friendly error message about failed challenges. :param list failed_achalls: A list of failed :class:`certbot.achallenges.AnnotatedChallenge` with the same error type. :returns: A formatted error message for the client. :rtype: str """ error = failed_achalls[0].error typ = error.typ if messages.is_acme_error(error): typ = error.code msg = ["The following errors were reported by the server:"] for achall in failed_achalls: msg.append("\n\nDomain: %s\nType: %s\nDetail: %s" % ( achall.domain, typ, achall.error.detail)) if typ in _ERROR_HELP: msg.append("\n\n") msg.append(_ERROR_HELP[typ]) return "".join(msg)