Server IP : 192.158.238.246 / Your IP : 18.117.158.174 Web Server : LiteSpeed System : Linux uniform.iwebfusion.net 4.18.0-553.27.1.lve.1.el8.x86_64 #1 SMP Wed Nov 20 15:58:00 UTC 2024 x86_64 User : jenniferflocom ( 1321) PHP Version : 8.1.32 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/difflib/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import logging import os import shutil import tempfile import time from functools import cached_property from io import BytesIO from pathlib import Path from peewee import DoesNotExist from defence360agent.utils import safe_fileops from imav.contracts.config import Malware as Config from imav.malwarelib.config import MalwareHitStatus, MalwareScanResourceType from imav.malwarelib.cleanup.cleaner import MalwareCleaner from imav.malwarelib.cleanup.storage import CleanupStorage from imav.malwarelib.model import MalwareHit from imav.utils import get_files_diff logger = logging.getLogger(__name__) IMUNIFY_USER = "_imunify" IMUNIFY_GROUP = "_imunify" class DiffError(Exception): pass class SafeFilePath(os.PathLike): def __init__(self, path, user=None, missing_ok=False): self._path = Path(path) self._user = user self._missing_ok = missing_ok def __str__(self): return str(self._path) def __fspath__(self): return self.__str__() def __getattr__(self, attr): return getattr(self._path, attr) def check_readability(self) -> bool: """ Return True if the file is readable by the user or raise UnsafeFileOperation otherwise """ with self.safe_open(): return True def safe_open(self, mode="rb"): if self._missing_ok and not self._path.exists(): return BytesIO(b"") if self._user: return safe_fileops.safe_open_file( self._path, mode=mode, user=self._user, respect_homedir=False, ) else: return self.open(mode) class MalwareHitDiff: """ Used to compare infected and cleaned versions of a malicious file. """ def __init__(self, id: int, user: str = None): self._id = id self._user = user self._cleaner = MalwareCleaner( loop=None, sink=None, watch_progress=False ) @cached_property def hit(self): try: return MalwareHit.get( MalwareHit.id == self._id, MalwareHit.resource_type == MalwareScanResourceType.FILE.value, MalwareHit.malicious == True, # noqa: E712 *([MalwareHit.user == self._user] * bool(self._user)), ) except DoesNotExist: raise DiffError( f"No malware file hit found (id={self._id}," f" user={self._user})." ) async def get_unified_diff_for_cleaned_file(self) -> bytes: diff = b"" # compare the current cleaned version with the original file if self.hit.status in MalwareHitStatus.CLEANED: cleaned_file_path = SafeFilePath( self.hit.orig_file_path, user=self._user, missing_ok=True, ) infected_file_path = SafeFilePath( CleanupStorage.get_hit_store_path(self.hit), user=None, ) diff = await self._get_diff( infected_file_path, cleaned_file_path, cleaned_at=self.hit.cleaned_at, ) else: logger.warning( "Malware hit has unexpected status=%s. Use the empty diff.", self.hit.status, ) return diff async def clean_and_get_unified_diff(self) -> bytes: diff = b"" if self.hit.status == MalwareHitStatus.FOUND: # infected # clean copy of file and compare with the original file infected_file_path = SafeFilePath( self.hit.orig_file_path, user=self._user ) # do not attempt any of the following actions # if the user does not have read permissions infected_file_path.check_readability() with tempfile.NamedTemporaryFile( mode="w+", dir=Config.TEMP_CLEANUP_DIR ) as temp_file: cleaned_file_path = SafeFilePath( temp_file.name, user=None, missing_ok=True ) await safe_fileops.safe_move( self.hit.orig_file, cleaned_file_path, src_unlink=False, dst_overwrite=True, safe_src=False, safe_dst=True, ) # so that procu2.php has access to the file shutil.chown( cleaned_file_path, user=IMUNIFY_USER, group=IMUNIFY_GROUP ) result, error, cmd = await self._cleaner.start( IMUNIFY_USER, [str(cleaned_file_path)] ) hit_result = result.get(str(cleaned_file_path)) if hit_result and ( hit_result.is_cleaned() or hit_result.is_removed() ): diff = await self._get_diff( infected_file_path, cleaned_file_path, cleaned_at=time.time(), ) else: logger.warning( "File %s was not cleaned to check diff: %s, %s, %s", self.hit.orig_file, result, error, cmd, ) else: logger.warning( "Malware hit has unexpected status=%s. Use the empty diff.", self.hit.status, ) return diff async def _get_diff( self, infected_file_path: SafeFilePath, cleaned_file_path: SafeFilePath, *, cleaned_at: float, ): if not infected_file_path.exists(): raise FileNotFoundError( f"Original file not found for hit(id={self.hit.id})." ) if ( cleaned_file_path.exists() and cleaned_file_path.stat().st_ctime > cleaned_at ): raise DiffError( "The file was modified after cleaning, diff is not valid." ) with infected_file_path.safe_open() as infected_file, cleaned_file_path.safe_open() as cleaned_file: # don't block the whole loop while reading files loop = asyncio.get_event_loop() return await loop.run_in_executor( None, get_files_diff, infected_file, cleaned_file )