Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /proc/self/root/usr/lib/python3.6/site-packages/up2date_client/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/usr/lib/python3.6/site-packages/up2date_client/config.py
# This file is a portion of the Red Hat Update Agent
# Copyright (c) 1999--2016 Red Hat, Inc.  Distributed under GPL
#
# Authors:
#       Cristian Gafton <[email protected]>
#       Adrian Likins   <[email protected]>
#
"""
This module includes the Config and Up2date Config classes use by the
up2date agent to hold config info.
"""

import os
import sys
import locale
from rhn.connections import idn_ascii_to_puny, idn_puny_to_unicode
from rhn.i18n import ustr, sstr

try: # python2
    from urlparse import urlsplit, urlunsplit
except ImportError: # python3
    from urllib.parse import urlsplit, urlunsplit

import gettext
t = gettext.translation('rhn-client-tools', fallback=True)
# Python 3 translations don't have a ugettext method
if not hasattr(t, 'ugettext'):
    t.ugettext = t.gettext
_ = t.ugettext

# XXX: This could be moved in a more "static" location if it is too
# much of an eye sore
Defaults = {
    'enableProxy'       : ("Use a HTTP Proxy",
                           0),
    'serverURL'         : ("Remote server URL",
                           "https://your.server.url.here/XMLRPC"),
    'debug'             : ("Whether or not debugging is enabled",
                           0),
    'systemIdPath'      : ("Location of system id",
                           "/etc/sysconfig/rhn/systemid"),
    'versionOverride'   : ("Override the automatically determined "\
                           "system version",
                           ""),
    'httpProxy'         : ("HTTP proxy in host:port format, e.g. "\
                           "squid.example.com:3128",
                           ""),
    'proxyUser'         : ("The username for an authenticated proxy",
                           ""),
    'proxyPassword'     : ("The password to use for an authenticated proxy",
                           ""),
    'enableProxyAuth'   : ("To use an authenticated proxy or not",
                           0),
    'networkRetries'    : ("Number of attempts to make at network "\
                           "connections before giving up",
                           1),
    'sslCACert'         : ("The CA cert used to verify the ssl server",
                           "/usr/share/rhn/RHN-ORG-TRUSTED-SSL-CERT"),
    'noReboot'          : ("Disable the reboot action",
                           0),
    'disallowConfChanges': ("Config options that can not be overwritten by a config update action",
                            ['sslCACert','serverURL','disallowConfChanges',
                             'noReboot']),
}

FileOptions = ['systemIdPath', 'sslCACert', 'tmpDir', ]

# a peristent configuration storage class
class ConfigFile:
    "class for handling persistent config options for the client"
    def __init__(self, filename = None):
        self.dict = {}
        self.fileName = filename
        if self.fileName:
            self.load()

    def load(self, filename = None):
        if filename:
            self.fileName = filename
        if self.fileName == None:
            return
        if not os.access(self.fileName, os.R_OK):
#            print("warning: can't access %s" % self.fileName)
            return

        f = open(self.fileName, "r")

        multiline = ''
        for line in f.readlines():
            # strip comments
            if line.find('#') == 0:
                continue
            line = multiline + line.strip()
            if not line:
                continue

            # if line ends in '\', append the next line before parsing
            if line[-1] == '\\':
                multiline = line[:-1].strip()
                continue
            else:
                multiline = ''

            split = line.split('=', 1)
            if len(split) != 2:
                # not in 'a = b' format. we should log this
                # or maybe error.
                continue
            key = split[0].strip()
            value = ustr(split[1].strip())

            # decode a comment line
            comment = None
            pos = key.find("[comment]")
            if pos != -1:
                key = key[:pos]
                comment = value
                value = None

            # figure out if we need to parse the value further
            if value:
                # possibly split value into a list
                values = value.split(";")
                if key in ['proxyUser', 'proxyPassword']:
                    value = sstr(value.encode(locale.getpreferredencoding()))
                elif len(values) == 1:
                    try:
                        value = int(value)
                    except ValueError:
                        pass
                elif values[0] == "":
                    value = []
                else:
                    # there could be whitespace between the values on
                    # one line, let's strip it out
                    value = [val.strip() for val in values if val.strip() ]

            # now insert the (comment, value) in the dictionary
            newval = (comment, value)
            if key in self.dict: # do we need to update
                newval = self.dict[key]
                if comment is not None: # override comment
                    newval = (comment, newval[1])
                if value is not None: # override value
                    newval = (newval[0], value)
            self.dict[key] = newval
        f.close()

    def save(self):
        if self.fileName == None:
            return

        # this really shouldn't happen, since it means that the
        # /etc/sysconfig/rhn directory doesn't exist, which is way broken

        # and note the attempted fix breaks useage of this by the applet
        # since it reuses this code to create its config file, and therefore
        # tries to makedirs() the users home dir again (with a specific perms)
        # and fails (see #130391)
        if not os.access(self.fileName, os.R_OK):
            if not os.access(os.path.dirname(self.fileName), os.R_OK):
                print(_("%s was not found" % os.path.dirname(self.fileName)))
                return

        f = open(self.fileName+'.new', "w")
        os.chmod(self.fileName+'.new', int('0644', 8))

        f.write("# Automatically generated Red Hat Update Agent "\
                "config file, do not edit.\n")
        f.write("# Format: 1.0\n")
        f.write("")
        for key in self.dict.keys():
            (comment, value) = self.dict[key]
            f.write(sstr(u"%s[comment]=%s\n" % (key, comment)))
            if type(value) != type([]):
                value = [ value ]
            if key in FileOptions:
                value = map(os.path.abspath, value)
            f.write(sstr(u"%s=%s\n" % (key, ';'.join(map(str, value)))))
            f.write("\n")
        f.close()
        os.rename(self.fileName+'.new', self.fileName)

    # dictionary interface
    def __contains__(self, name):
        return name in self.dict

    def has_key(self, name):
        # obsoleted, left for compatibility with older python
        return name in self

    def keys(self):
        return self.dict.keys()

    def values(self):
        return [a[1] for a in self.dict.values()]

    def update(self, dict):
        self.dict.update(dict)

    # we return None when we reference an invalid key instead of
    # raising an exception
    def __getitem__(self, name):
        if name in self.dict:
            return self.dict[name][1]
        return None

    def __setitem__(self, name, value):
        if name in self.dict:
            val = self.dict[name]
        else:
            val = (None, None)
        self.dict[name] = (val[0], value)

    # we might need to expose the comments...
    def info(self, name):
        if name in self.dict:
            return self.dict[name][0]
        return ""


# a superclass for the ConfigFile that also handles runtime-only
# config values
class Config:
    def __init__(self, filename = None):
        self.stored = ConfigFile()
        self.stored.update(Defaults)
        if filename:
            self.stored.load(filename)
        self.runtime = {}

    # classic dictionary interface: we prefer values from the runtime
    # dictionary over the ones from the stored config
    def __contains__(self, name):
        if name in self.runtime:
            return True
        if name in self.stored:
            return True
        return False

    def has_key(self, name):
        # obsoleted, left for compatibility with older python
        return name in self

    def keys(self):
        ret = list(self.runtime.keys())
        for k in self.stored.keys():
            if k not in ret:
                ret.append(k)
        return ret

    def values(self):
        ret = []
        for k in self.keys():
            ret.append(self.__getitem__(k))
        return ret

    def items(self):
        ret = []
        for k in self.keys():
            ret.append((k, self.__getitem__(k)))
        return ret

    def __len__(self):
        return len(self.keys())

    def __setitem__(self, name, value):
        self.runtime[name] = value

    # we return None when nothing is found instead of raising and exception
    def __getitem__(self, name):
        if name in self.runtime:
            return self.runtime[name]
        if name in self.stored:
            return self.stored[name]
        return None

    # These function expose access to the peristent storage for
    # updates and saves
    def info(self, name): # retrieve comments
        return self.stored.info(name)

    def save(self):
        self.stored.save()

    def load(self, filename):
        self.stored.load(filename)
        # make sure the runtime cache is not polluted
        for k in self.stored.keys():
            if not k in self.runtime:
                continue
            # allow this one to pass through
            del self.runtime[k]

    # save straight in the persistent storage
    def set(self, name, value):
        self.stored[name] = value
        # clean up the runtime cache
        if name in self.runtime:
            del self.runtime[name]


def getProxySetting():
    """ returns proxy string in format hostname:port
    hostname is converted to Punycode (RFC3492) if needed
    """
    cfg = initUp2dateConfig()
    proxy = None
    proxyHost = cfg["httpProxy"]

    if proxyHost:
        if proxyHost[:7] == "http://":
            proxyHost = proxyHost[7:]
        parts = proxyHost.split(':')
        parts[0] = str(idn_ascii_to_puny(parts[0]))
        proxy = ':'.join(parts)

    return proxy

def convert_url_to_puny(url):
    """ returns url where hostname is converted to Punycode (RFC3492) """
    s = urlsplit(url)
    return sstr(urlunsplit((s[0], ustr(idn_ascii_to_puny(s[1])), s[2], s[3], s[4])))

def convert_url_from_puny(url):
    """ returns url where hostname is converted from Punycode (RFC3492). Returns unicode string. """
    s = urlsplit(url)
    return ustr(urlunsplit((s[0], idn_puny_to_unicode(s[1]), s[2], s[3], s[4])))

def getServerlURL():
    """ return list of serverURL from config
        Note: in config may be one value or more values, but this
        function always return list
    """
    cfg = initUp2dateConfig()
    # serverURL may be a list in the config file, so by default, grab the
    # first element.
    if type(cfg['serverURL']) == type([]):
        return [convert_url_to_puny(i) for i in cfg['serverURL']]
    else:
        return [convert_url_to_puny(cfg['serverURL'])]

def setServerURL(serverURL):
    """ Set serverURL in config """
    cfg = initUp2dateConfig()
    cfg.set('serverURL', serverURL)

def setSSLCACert(sslCACert):
    """ Set sslCACert in config """
    cfg = initUp2dateConfig()
    cfg.set('sslCACert', sslCACert)


def initUp2dateConfig(cfg_file = "/etc/sysconfig/rhn/up2date"):
    """This function is the right way to get at the up2date config."""
    global cfg
    try:
        cfg
    except NameError:
        cfg = None

    if cfg == None:
        cfg = Config(cfg_file)
        cfg["isatty"] = False
        if sys.stdout.isatty():
            cfg["isatty"] = True

    return cfg

Spamworldpro Mini