forked from Hay1tsme/artemis
allow alembic to also connect with tls
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
import ssl
|
||||
from typing import Any, Union
|
||||
|
||||
from typing_extensions import Optional
|
||||
|
||||
@ -222,7 +223,7 @@ class DatabaseConfig:
|
||||
)
|
||||
|
||||
@property
|
||||
def ssl_verify_cert(self) -> Optional[bool]:
|
||||
def ssl_verify_cert(self) -> Optional[Union[str, bool]]:
|
||||
return CoreConfig.get_config_field(
|
||||
self.__config, "core", "database", "ssl_verify_cert", default=None
|
||||
)
|
||||
@ -259,6 +260,53 @@ class DatabaseConfig:
|
||||
self.__config, "core", "database", "memcached_host", default="localhost"
|
||||
)
|
||||
|
||||
def create_ssl_context_if_enabled(self):
|
||||
if not self.ssl_enabled:
|
||||
return
|
||||
|
||||
no_ca = (
|
||||
self.ssl_cafile is None
|
||||
and self.ssl_capath is None
|
||||
)
|
||||
|
||||
ctx = ssl.create_default_context(
|
||||
cafile=self.ssl_cafile,
|
||||
capath=self.ssl_capath,
|
||||
)
|
||||
ctx.check_hostname = not no_ca and self.ssl_verify_identity
|
||||
|
||||
if self.ssl_verify_cert is None:
|
||||
ctx.verify_mode = ssl.CERT_NONE if no_ca else ssl.CERT_REQUIRED
|
||||
elif isinstance(self.ssl_verify_cert, bool):
|
||||
ctx.verify_mode = (
|
||||
ssl.CERT_REQUIRED
|
||||
if self.ssl_verify_cert
|
||||
else ssl.CERT_NONE
|
||||
)
|
||||
elif isinstance(self.ssl_verify_cert, str):
|
||||
value = self.ssl_verify_cert.lower()
|
||||
|
||||
if value in ("none", "0", "false", "no"):
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
elif value == "optional":
|
||||
ctx.verify_mode = ssl.CERT_OPTIONAL
|
||||
elif value in ("required", "1", "true", "yes"):
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
else:
|
||||
ctx.verify_mode = ssl.CERT_NONE if no_ca else ssl.CERT_REQUIRED
|
||||
|
||||
if self.ssl_cert:
|
||||
ctx.load_cert_chain(
|
||||
self.ssl_cert,
|
||||
self.ssl_key,
|
||||
self.ssl_key_password,
|
||||
)
|
||||
|
||||
if self.ssl_ciphers:
|
||||
ctx.set_ciphers(self.ssl_ciphers)
|
||||
|
||||
return ctx
|
||||
|
||||
class FrontendConfig:
|
||||
def __init__(self, parent_config: "CoreConfig") -> None:
|
||||
self.__config = parent_config
|
||||
|
Reference in New Issue
Block a user