Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /lib/python3.6/site-packages/supervisor/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3.6/site-packages/supervisor/datatypes.py
import grp
import os
import pwd
import signal
import socket
import shlex

from supervisor.compat import urlparse
from supervisor.compat import long
from supervisor.loggers import getLevelNumByDescription

def process_or_group_name(name):
    """Ensures that a process or group name is not created with
       characters that break the eventlistener protocol or web UI URLs"""
    s = str(name).strip()
    for character in ' :/':
        if character in s:
            raise ValueError("Invalid name: %r because of character: %r" % (name, character))
    return s

def integer(value):
    try:
        return int(value)
    except (ValueError, OverflowError):
        return long(value) # why does this help ValueError? (CM)

TRUTHY_STRINGS = ('yes', 'true', 'on', '1')
FALSY_STRINGS  = ('no', 'false', 'off', '0')

def boolean(s):
    """Convert a string value to a boolean value."""
    ss = str(s).lower()
    if ss in TRUTHY_STRINGS:
        return True
    elif ss in FALSY_STRINGS:
        return False
    else:
        raise ValueError("not a valid boolean value: " + repr(s))

def list_of_strings(arg):
    if not arg:
        return []
    try:
        return [x.strip() for x in arg.split(',')]
    except:
        raise ValueError("not a valid list of strings: " + repr(arg))

def list_of_ints(arg):
    if not arg:
        return []
    else:
        try:
            return list(map(int, arg.split(",")))
        except:
            raise ValueError("not a valid list of ints: " + repr(arg))

def list_of_exitcodes(arg):
    try:
        vals = list_of_ints(arg)
        for val in vals:
            if (val > 255) or (val < 0):
                raise ValueError('Invalid exit code "%s"' % val)
        return vals
    except:
        raise ValueError("not a valid list of exit codes: " + repr(arg))

def dict_of_key_value_pairs(arg):
    """ parse KEY=val,KEY2=val2 into {'KEY':'val', 'KEY2':'val2'}
        Quotes can be used to allow commas in the value
    """
    lexer = shlex.shlex(str(arg))
    lexer.wordchars += '/.+-():'

    tokens = list(lexer)
    tokens_len = len(tokens)

    D = {}
    i = 0
    while i < tokens_len:
        k_eq_v = tokens[i:i+3]
        if len(k_eq_v) != 3 or k_eq_v[1] != '=':
            raise ValueError(
                "Unexpected end of key/value pairs in value '%s'" % arg)
        D[k_eq_v[0]] = k_eq_v[2].strip('\'"')
        i += 4
    return D

class Automatic:
    pass

class Syslog:
    """TODO deprecated; remove this special 'syslog' filename in the future"""
    pass

LOGFILE_NONES = ('none', 'off', None)
LOGFILE_AUTOS = (Automatic, 'auto')
LOGFILE_SYSLOGS = (Syslog, 'syslog')

def logfile_name(val):
    if hasattr(val, 'lower'):
        coerced = val.lower()
    else:
        coerced = val

    if coerced in LOGFILE_NONES:
        return None
    elif coerced in LOGFILE_AUTOS:
        return Automatic
    elif coerced in LOGFILE_SYSLOGS:
        return Syslog
    else:
        return existing_dirpath(val)

class RangeCheckedConversion:
    """Conversion helper that range checks another conversion."""

    def __init__(self, conversion, min=None, max=None):
        self._min = min
        self._max = max
        self._conversion = conversion

    def __call__(self, value):
        v = self._conversion(value)
        if self._min is not None and v < self._min:
            raise ValueError("%s is below lower bound (%s)"
                             % (repr(v), repr(self._min)))
        if self._max is not None and v > self._max:
            raise ValueError("%s is above upper bound (%s)"
                             % (repr(v), repr(self._max)))
        return v

port_number = RangeCheckedConversion(integer, min=1, max=0xffff).__call__

def inet_address(s):
    # returns (host, port) tuple
    host = ''
    if ":" in s:
        host, s = s.split(":", 1)
        if not s:
            raise ValueError("no port number specified in %r" % s)
        port = port_number(s)
        host = host.lower()
    else:
        try:
            port = port_number(s)
        except ValueError:
            raise ValueError("not a valid port number: %r " %s)
    if not host or host == '*':
        host = ''
    return host, port

class SocketAddress:
    def __init__(self, s):
        # returns (family, address) tuple
        if "/" in s or s.find(os.sep) >= 0 or ":" not in s:
            self.family = getattr(socket, "AF_UNIX", None)
            self.address = s
        else:
            self.family = socket.AF_INET
            self.address = inet_address(s)

class SocketConfig:
    """ Abstract base class which provides a uniform abstraction
    for TCP vs Unix sockets """
    url = '' # socket url
    addr = None #socket addr
    backlog = None # socket listen backlog

    def __repr__(self):
        return '<%s at %s for %s>' % (self.__class__,
                                      id(self),
                                      self.url)

    def __str__(self):
        return str(self.url)

    def __eq__(self, other):
        if not isinstance(other, SocketConfig):
            return False

        if self.url != other.url:
            return False

        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    def get_backlog(self):
        return self.backlog

    def addr(self): # pragma: no cover
        raise NotImplementedError

    def create_and_bind(self): # pragma: no cover
        raise NotImplementedError

class InetStreamSocketConfig(SocketConfig):
    """ TCP socket config helper """

    host = None # host name or ip to bind to
    port = None # integer port to bind to

    def __init__(self, host, port, **kwargs):
        self.host = host.lower()
        self.port = port_number(port)
        self.url = 'tcp://%s:%d' % (self.host, self.port)
        self.backlog = kwargs.get('backlog', None)

    def addr(self):
        return self.host, self.port

    def create_and_bind(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind(self.addr())
        except:
            sock.close()
            raise
        return sock

class UnixStreamSocketConfig(SocketConfig):
    """ Unix domain socket config helper """

    path = None # Unix domain socket path
    mode = None # Unix permission mode bits for socket
    owner = None # Tuple (uid, gid) for Unix ownership of socket
    sock = None # socket object

    def __init__(self, path, **kwargs):
        self.path = path
        self.url = 'unix://%s' % path
        self.mode = kwargs.get('mode', None)
        self.owner = kwargs.get('owner', None)
        self.backlog = kwargs.get('backlog', None)

    def addr(self):
        return self.path

    def create_and_bind(self):
        if os.path.exists(self.path):
            os.unlink(self.path)
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        try:
            sock.bind(self.addr())
            self._chown()
            self._chmod()
        except:
            sock.close()
            if os.path.exists(self.path):
                os.unlink(self.path)
            raise
        return sock

    def get_mode(self):
        return self.mode

    def get_owner(self):
        return self.owner

    def _chmod(self):
        if self.mode is not None:
            try:
                os.chmod(self.path, self.mode)
            except Exception as e:
                raise ValueError("Could not change permissions of socket "
                                    + "file: %s" % e)

    def _chown(self):
        if self.owner is not None:
            try:
                os.chown(self.path, self.owner[0], self.owner[1])
            except Exception as e:
                raise ValueError("Could not change ownership of socket file: "
                                    + "%s" % e)

def colon_separated_user_group(arg):
    """ Find a user ID and group ID from a string like 'user:group'.  Returns
        a tuple (uid, gid).  If the string only contains a user like 'user'
        then (uid, -1) will be returned.  Raises ValueError if either
        the user or group can't be resolved to valid IDs on the system. """
    try:
        parts = arg.split(':', 1)
        if len(parts) == 1:
            uid = name_to_uid(parts[0])
            gid = -1
        else:
            uid = name_to_uid(parts[0])
            gid = name_to_gid(parts[1])
        return (uid, gid)
    except:
        raise ValueError('Invalid user:group definition %s' % arg)

def name_to_uid(name):
    """ Find a user ID from a string containing a user name or ID.
        Raises ValueError if the string can't be resolved to a valid
        user ID on the system. """
    try:
        uid = int(name)
    except ValueError:
        try:
            pwdrec = pwd.getpwnam(name)
        except KeyError:
            raise ValueError("Invalid user name %s" % name)
        uid = pwdrec[2]
    else:
        try:
            pwd.getpwuid(uid) # check if uid is valid
        except KeyError:
            raise ValueError("Invalid user id %s" % name)
    return uid

def name_to_gid(name):
    """ Find a group ID from a string containing a group name or ID.
        Raises ValueError if the string can't be resolved to a valid
        group ID on the system. """
    try:
        gid = int(name)
    except ValueError:
        try:
            grprec = grp.getgrnam(name)
        except KeyError:
            raise ValueError("Invalid group name %s" % name)
        gid = grprec[2]
    else:
        try:
            grp.getgrgid(gid) # check if gid is valid
        except KeyError:
            raise ValueError("Invalid group id %s" % name)
    return gid

def gid_for_uid(uid):
    pwrec = pwd.getpwuid(uid)
    return pwrec[3]

def octal_type(arg):
    try:
        return int(arg, 8)
    except (TypeError, ValueError):
        raise ValueError('%s can not be converted to an octal type' % arg)

def existing_directory(v):
    nv = os.path.expanduser(v)
    if os.path.isdir(nv):
        return nv
    raise ValueError('%s is not an existing directory' % v)

def existing_dirpath(v):
    nv = os.path.expanduser(v)
    dir = os.path.dirname(nv)
    if not dir:
        # relative pathname with no directory component
        return nv
    if os.path.isdir(dir):
        return nv
    raise ValueError('The directory named as part of the path %s '
                     'does not exist' % v)

def logging_level(value):
    s = str(value).lower()
    level = getLevelNumByDescription(s)
    if level is None:
        raise ValueError('bad logging level name %r' % value)
    return level

class SuffixMultiplier:
    # d is a dictionary of suffixes to integer multipliers.  If no suffixes
    # match, default is the multiplier.  Matches are case insensitive.  Return
    # values are in the fundamental unit.
    def __init__(self, d, default=1):
        self._d = d
        self._default = default
        # all keys must be the same size
        self._keysz = None
        for k in d.keys():
            if self._keysz is None:
                self._keysz = len(k)
            else:
                assert self._keysz == len(k)

    def __call__(self, v):
        v = v.lower()
        for s, m in self._d.items():
            if v[-self._keysz:] == s:
                return int(v[:-self._keysz]) * m
        return int(v) * self._default

byte_size = SuffixMultiplier({'kb': 1024,
                              'mb': 1024*1024,
                              'gb': 1024*1024*long(1024),})

def url(value):
    # earlier Python 2.6 urlparse (2.6.4 and under) can't parse unix:// URLs,
    # later ones can but we need to straddle
    uri = value.replace('unix://', 'http://', 1).strip()
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
    if scheme and (netloc or path):
        return value
    raise ValueError("value %r is not a URL" % value)

# all valid signal numbers
SIGNUMS = [ getattr(signal, k) for k in dir(signal) if k.startswith('SIG') ]

def signal_number(value):
    try:
        num = int(value)
    except (ValueError, TypeError):
        name = value.strip().upper()
        if not name.startswith('SIG'):
            name = 'SIG' + name
        num = getattr(signal, name, None)
        if num is None:
            raise ValueError('value %r is not a valid signal name' % value)
    if num not in SIGNUMS:
        raise ValueError('value %r is not a valid signal number' % value)
    return num

class RestartWhenExitUnexpected:
    pass

class RestartUnconditionally:
    pass

def auto_restart(value):
    value = str(value.lower())
    computed_value  = value
    if value in TRUTHY_STRINGS:
        computed_value = RestartUnconditionally
    elif value in FALSY_STRINGS:
        computed_value = False
    elif value == 'unexpected':
        computed_value = RestartWhenExitUnexpected
    if computed_value not in (RestartWhenExitUnexpected,
                              RestartUnconditionally, False):
        raise ValueError("invalid 'autorestart' value %r" % value)
    return computed_value

def profile_options(value):
    options = [x.lower() for x in list_of_strings(value) ]
    sort_options = []
    callers = False
    for thing in options:
        if thing != 'callers':
            sort_options.append(thing)
        else:
            callers = True
    return sort_options, callers

Spamworldpro Mini