HEX
Server: nginx/1.18.0
System: Linux oas2 6.8.0-1039-oracle #40~22.04.1-Ubuntu SMP Wed Oct 29 05:11:00 UTC 2025 aarch64
User: root (0)
PHP: 8.1.2-1ubuntu2.23
Disabled: NONE
Upload Files
File: //proc/826/root/usr/libexec/oracle-cloud-agent/ocatools/diagnostic
#!/usr/bin/env python3
import tarfile
import time
import grp
from json import dump
from os import environ, listdir, popen, unlink, path, stat
from pwd import getpwnam
from stat import S_IMODE

environ['TZ'] = 'UTC'


class Tar:

    def __init__(self, filename, ops):
        self.filename = filename
        self.tar = tarfile.open(self.filename, 'w:gz')
        self.ops = ops

    def __call__(self):
        cmds = {}
        checks = {}
        fname = 'cmd-result.json'
        cname = 'check-result.json'
        for operation in self.ops:
            operation(self, cmds, checks)
        dump(cmds,
             open(fname, 'w'),
             sort_keys=True,
             indent=4,
             separators=(',', ': '))
        dump(checks,
             open(cname, 'w'),
             sort_keys=True,
             indent=4,
             separators=(',', ': '))
        self.tar.add(fname)
        self.tar.add(cname)
        unlink(fname)
        unlink(cname)

    def add(self, filename):
        self.tar.add(filename)

    def __del__(self):
        self.tar.close()

    def close(self):
        del self


class Copydir:

    def __init__(self, directory):
        self.files = []
        for fname in listdir(directory):
            self.files.append(directory + '/' + fname)

    def __call__(self, tar, cmdDict, checkDict):
        for fname in self.files:
            tar.add(fname)


class Copyfile:

    def __init__(self, filename):
        self.filename = filename

    def __call__(self, tar, cmdDict, checkDict):
        if path.exists(self.filename):
            tar.add(path.realpath(self.filename))


class Cmd:

    def __init__(self, cmd, sudo=False):
        self.cmd = cmd
        if not sudo:
            self.sudo = ''
        else:
            self.sudo = '/usr/bin/sudo'

    def __call__(self, tar, cmdDict, checkDict):
        cmdDict[self.cmd] = popen(self.sudo + ' ' + self.cmd).read().strip()


class Check:

    def __init__(self, function, key, error_repr=None):
        self.function = function
        self.key = key
        self.error_repr = error_repr

    def __call__(self, tar, cmdDict, checkDict):
        try:
            checkDict[self.key] = self.function()
        except Exception as e:
            if self.error_repr is None:
                checkDict[self.key] = repr(e)
            else:
                checkDict[self.key] = self.error_repr


class Pmap:

    def __init__(self, process_name=None):
        self.process_name = process_name
        self.pids = {}
        for process in listdir('/proc'):
            if process[0] >= '0' and process[0] <= '9':
                self.pids[process] = open('/proc/%s/cmdline' % process).read()

    def __call__(self, tar, cmdDict, checkDict):
        for pid in self.pids:
            pname = self.pids[pid].split('\000')[0].split('/')[-1]
            if self.process_name == pname:
                cmd = 'pmap -x %s # pmap for %s' % (pid, pname)
                Cmd(cmd, True)(tar, cmdDict, checkDict)


class RunState(Pmap):

    def __init__(self, match, cmdSuffix='RunState'):
        Pmap.__init__(self)
        self.match = match
        self.cmdSuffix = cmdSuffix

    def __call__(self, tar, cmdDict, checkDict):
        res = ''
        for process in self.pids:
            if self.match in self.pids[process]:
                for line in open('/proc/' + process + '/status').readlines():
                    if line[0:len('State:')] == 'State:':
                        res = res + self.pids[process] + ' ' + line
        cmdDict['Internal(%s)' % self.cmdSuffix] = res


class If:

    def __init__(self,
                 cond_,
                 then_,
                 else_=lambda tar, cmdDict, checkDict: None):
        self.cond_ = cond_
        self.then_ = then_
        self.else_ = else_

    def __call__(self, tar, cmdDict, checkDict):
        if callable(self.cond_):
            if self.cond_(tar, cmdDict, checkDict):
                return self.then_(tar, cmdDict, checkDict)
        elif self.cond_:
            return self.then_(tar, cmdDict, checkDict)

        return self.else_(tar, cmdDict, checkDict)


class Progn:

    def __init__(self, *ops):
        self.ops = ops

    def __call__(self, tar, cmdDict, checkDict):
        for op in self.ops:
            op(tar, cmdDict, checkDict)


class Or:

    def __init__(self, *ops):
        self.ops = ops

    def __call__(self, tar, cmdDict, checkDict):
        for op in self.ops:
            if callable(op):
                res = op(tar, cmdDict, checkDict)
                if res:
                    return res
            else:
                if op:
                    return op

        return False


def correct_ownership_p(owner, group, directory):
    dirstat = stat(directory)
    if getpwnam(owner).pw_uid == dirstat.st_uid and getpwnam(
            group).pw_gid == dirstat.st_gid:
        return 'expected'

    return 'unexpected'


def group_available():
    gmax = -1
    gsmax = -1
    gmin = 123456789
    gsmin = 123456789
    for line in open('/etc/login.defs').readlines():
        line = line.strip().split()
        if len(line) == 2:
            if line[0] == 'SYS_GID_MAX':
                if int(line[1]) > gsmax:
                    gsmax = int(line[1])
            if line[0] == 'GID_MAX':
                if int(line[1]) > gmax:
                    gmax = int(line[1])
            if line[0] == 'SYS_GID_MIN':
                if int(line[1]) < gsmin:
                    gsmin = int(line[1])
            if line[0] == 'GID_MIN':
                if int(line[1]) < gmin:
                    gmin = int(line[1])

    avail = False

    if gmin == 123456789:
        gmin = 1000
    if gmax == -1:
        gmax = 60000
    if gsmax == -1:
        gsmax = gmin - 1
    if gsmin == 123456789:
        gsmin = 101

    if gsmax <= gsmin:
        return 'unexpected(login.defs configured incorrectly)'
    for gid in range(gsmax, gsmin - 1, -1):
        try:
            grp.getgrgid(gid)
        except:
            avail = True
            break

    if avail:
        return 'expected'

    return 'unexpected(all GIDs used)'


def clock_skew():
    with open('/var/log/oracle-cloud-agent/agent.log', 'r') as f:
        for line in f:
            if 'is not within allowed clock skew' in line:
                return 'detected'
    return 'not detected'


def check_output(cmd, expected_output):
    return "expected" if popen(
        cmd).read().strip() == expected_output.strip() else "unexpected"


def check_cmd_output(cmd, string):
    cmd_result = popen(cmd).read().strip()
    return True if cmd_result == string else False


operations = [
    # Save logs of OCA and all running plugins
    Copydir('/var/log/oracle-cloud-agent'),
    Copyfile('/etc/os-release'),
    Cmd('uname -a'),
    Cmd('uptime'),
    Cmd('snap list --color=never --unicode=never --all oracle-cloud-agent'),
    Cmd('snap info --color=never --unicode=never --verbose oracle-cloud-agent'),
    Cmd('systemctl is-enabled snap.oracle-cloud-agent.oracle-cloud-agent-updater.service'
       ),
    Cmd('systemctl is-active snap.oracle-cloud-agent.oracle-cloud-agent-updater.service'
       ),
    Cmd('systemctl is-enabled snap.oracle-cloud-agent.oracle-cloud-agent.service'
       ),
    Cmd('systemctl is-active snap.oracle-cloud-agent.oracle-cloud-agent.service'
       ),
    Cmd('curl -s -H "Authorization: Bearer Oracle" http://169.254.169.254/opc/v2/instance/'
       ),
    Cmd('ps auxww'),
    Cmd('visudo -c 2>&1', True),
    Cmd('tail -100 /var/log/oracle-cloud-agent/agent.log'),
    Cmd('tail -100 /var/log/oracle-cloud-agent/updater.log'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log').st_mode) == 0o775 else "unexpected",
        '/var/log directory permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent').st_mode) == 0o755 else
        "unexpected", '/var/log/oracle-cloud-agent directory permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent/agent.log').st_mode
                  ) == 0o644 else 'unexpected',
        '/var/log/oracle-cloud-agent/agent.log permissions'),
    Check(
        lambda: "expected"
        if S_IMODE(stat('/var/log/oracle-cloud-agent/updater.log').st_mode
                  ) == 0o644 else 'unexpected',
        '/var/log/oracle-cloud-agent/updater.log permissions'),
    Check(lambda: correct_ownership_p('root', 'syslog', '/var/log'),
          '/var/log directory ownership'),
    Check(
        lambda: correct_ownership_p('snap_daemon', 'snap_daemon',
                                    '/var/log/oracle-cloud-agent'),
        '/var/log/oracle-cloud-agent directory ownership'),
    Check(
        lambda: correct_ownership_p('snap_daemon', 'snap_daemon',
                                    '/var/log/oracle-cloud-agent/agent.log'),
        '/var/log/oracle-cloud-agent/agent.log file ownership'),
    Check(
        lambda: correct_ownership_p('snap_daemon', 'snap_daemon',
                                    '/var/log/oracle-cloud-agent/updater.log'),
        '/var/log/oracle-cloud-agent/updater.log file ownership'),
    Check(group_available, 'checking GID availability'),
    Check(clock_skew, 'checking clock skew'),
    Pmap('oracle-cloud-agent'),
    Pmap('oracle-cloud-agent-updater'),
    Copyfile('/etc/multipath.conf'),
    RunState('oracle-cloud-agent')
]

if __name__ == '__main__':
    outfile = time.strftime('oca-diag-%Y-%m-%d.%H-%M-%S.tar.gz',
                            time.localtime())
    tar_file = Tar(outfile, operations)
    tar_file()
    del tar_file
    print('Diagnostics collected in ' + outfile)