import ast import importlib import os from functools import total_ordering from pathlib import Path import requests from git import Repo from git.exc import InvalidGitRepositoryError from loguru import logger from pygount import SourceAnalysis from odoo_openupgrade_wizard.configuration_version_dependant import ( get_apriori_file_relative_path, get_coverage_relative_path, get_openupgrade_analysis_files, ) from odoo_openupgrade_wizard.tools.tools_odoo import ( get_odoo_addons_path, get_odoo_env_path, ) class Analysis(object): def __init__(self, ctx): self.modules = [] self.initial_version = ctx.obj["config"]["odoo_versions"][0] self.final_version = ctx.obj["config"]["odoo_versions"][-1] self.all_versions = [x for x in ctx.obj["config"]["odoo_versions"]] def analyse_module_version(self, ctx, module_list): self._generate_module_version_first_version(ctx, module_list) for count in range(len(self.all_versions) - 1): previous_version = self.all_versions[count] current_version = self.all_versions[count + 1] self._generate_module_version_next_version( ctx, previous_version, current_version ) def analyse_openupgrade_state(self, ctx): logger.info("Parsing openupgrade module coverage for each migration.") coverage_analysis = {} for version in self.all_versions[1:]: coverage_analysis[version] = {} relative_path = get_coverage_relative_path(version) env_folder_path = get_odoo_env_path(ctx, version) coverage_path = env_folder_path / relative_path with open(coverage_path) as f: lines = f.readlines() for line in [x for x in lines if "|" in x]: clean_line = ( line.replace("\n", "") .replace("|del|", "") .replace("|new|", "") ) splited_line = [x.strip() for x in clean_line.split("|") if x] if len(splited_line) == 2: coverage_analysis[version][splited_line[0]] = splited_line[ 1 ] if len(splited_line) == 3: coverage_analysis[version][splited_line[0]] = ( splited_line[1] + " " + splited_line[2] ).strip() elif len(splited_line) > 3: raise ValueError( "Incorrect value in openupgrade analysis" f" file {coverage_path} for line {line}" ) for odoo_module in filter( lambda x: x.module_type == "odoo", self.modules ): for module_version in list(odoo_module.module_versions.values()): module_version.analyse_openupgrade_state(coverage_analysis) for version in self.all_versions[1:]: odoo_env_path = get_odoo_env_path(ctx, version) openupgrade_analysis_files = get_openupgrade_analysis_files( odoo_env_path, version ) openupgrade_analysis_files = openupgrade_analysis_files for odoo_module in filter( lambda x: x.module_type == "odoo", self.modules ): module_version = odoo_module.get_module_version(version) if module_version: module_version.analyse_openupgrade_work( openupgrade_analysis_files ) def analyse_missing_module(self): for odoo_module in filter( lambda x: x.module_type != "odoo", self.modules ): last_module_version = odoo_module.module_versions.get( self.final_version, False ) if ( not last_module_version.addon_path and last_module_version.state not in ["renamed", "merged", "normal_loss"] ): last_module_version.analyse_missing_module() def estimate_workload(self, ctx): logger.info("Estimate workload ...") for odoo_module in self.modules: for module_version in odoo_module.module_versions.values(): module_version.estimate_workload(ctx) def get_requirements(self, ctx): logger.info("Get requirements ...") result = {x: {"python": {}, "bin": {}} for x in self.all_versions} for odoo_module in self.modules: for module_version in odoo_module.module_versions.values(): module_result = module_version.get_requirements(ctx) python_result = result[module_result["version"]]["python"] module_name = module_result["module_name"] for python_lib in module_result["python"]: if python_lib not in python_result: python_result[python_lib] = [module_name] else: python_result[python_lib].append(module_name) return result def _generate_module_version_first_version(self, ctx, module_list): logger.info(f"Analyse version {self.initial_version}. (First version)") # Instanciate a new odoo_module for module_name in module_list: addon_path = OdooModule.get_addon_path( ctx, module_name, self.initial_version ) if addon_path: repository_name = OdooModule.get_repository_name(addon_path) if f"{repository_name}.{module_name}" not in self.modules: logger.debug( f"Discovering module '{module_name}'" f" in {repository_name}" f" for version {self.initial_version}" ) else: repository_name = False logger.error( f"Module {module_name} not found" f" for version {self.initial_version}." ) new_odoo_module = OdooModule( ctx, self, module_name, repository_name ) new_module_version = OdooModuleVersion( self.initial_version, new_odoo_module, addon_path ) new_odoo_module.module_versions.update( {self.initial_version: new_module_version} ) self.modules.append(new_odoo_module) def _generate_module_version_next_version( self, ctx, previous_version, current_version ): logger.info( f"Analyse change between {previous_version} and {current_version}" ) # Get changes between the two versions ( apriori_module_name, apriori_relative_path, ) = get_apriori_file_relative_path(current_version) apriori_module_path = OdooModule.get_addon_path( ctx, apriori_module_name, current_version ) if not apriori_module_path: raise ValueError( f"Unable to find the path of the module {apriori_module_name}" f" for the version {current_version}." ) apriori_absolute_path = ( apriori_module_path / Path(apriori_module_name) / apriori_relative_path ) module_spec = importlib.util.spec_from_file_location( "package", str(apriori_absolute_path) ) module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) renamed_modules = module.renamed_modules merged_modules = module.merged_modules for odoo_module in self.modules: state = False new_module_name = False if odoo_module.name in renamed_modules: state = "renamed" new_module_name = renamed_modules[odoo_module.name] logger.debug( f"{previous_version} -> {current_version} :" f" {odoo_module.name} renamed into {new_module_name}" ) elif odoo_module.name in merged_modules: state = "merged" new_module_name = merged_modules[odoo_module.name] logger.debug( f"{previous_version} -> {current_version} :" f" {odoo_module.name} merged into {new_module_name}" ) # Handle new module if state and new_module_name != odoo_module.name: # Ensure that the module exists in self.modules new_addon_path = OdooModule.get_addon_path( ctx, new_module_name, current_version ) if not new_addon_path: raise ValueError( f"The module {new_module_name} has not been found" f" in the version {current_version}." " Analyse can not be done." ) else: new_repository_name = OdooModule.get_repository_name( new_addon_path ) if ( "%s.%s" % (new_repository_name, new_module_name) not in self.modules ): logger.debug( f"Discovering module '{new_module_name}'" f" in {new_repository_name}" f" for version {current_version}" ) new_odoo_module = OdooModule( ctx, self, new_module_name, new_repository_name ) self.modules.append(new_odoo_module) new_odoo_module.module_versions.update( { current_version: OdooModuleVersion( current_version, new_odoo_module, new_addon_path, ) } ) # Get the previous version of the module previous_module_version = odoo_module.get_module_version( previous_version ) # if the previous version has been renamed or merged # the loss is normal if previous_module_version and previous_module_version.state in [ "merged", "renamed", "normal_loss", ]: state = "normal_loss" new_addon_path = OdooModule.get_addon_path( ctx, odoo_module.name, current_version ) odoo_module.module_versions.update( { current_version: OdooModuleVersion( current_version, odoo_module, new_addon_path, state=state, target_module=new_module_name, ) } ) def get_module_qty(self, module_type=False): if module_type: odoo_modules = [ x for x in filter( lambda x: x.module_type == module_type, self.modules ) ] else: odoo_modules = self.modules return len(odoo_modules) def workload_hour_text(self, module_type=False): if module_type: odoo_modules = [ x for x in filter( lambda x: x.module_type == module_type, self.modules ) ] else: odoo_modules = self.modules total = 0 for odoo_module in odoo_modules: for module_version in list(odoo_module.module_versions.values()): total += module_version.workload return "%d h" % (int(round(total / 60))) @total_ordering class OdooModule(object): def __init__(self, ctx, analyse, module_name, repository_name): self.analyse = analyse self.name = module_name self.repository = repository_name self.unique_name = f"{repository_name}.{module_name}" self.ignored = self.is_ignored(ctx, module_name) self.module_versions = {} if not repository_name: self.module_type = "not_found" elif repository_name == "odoo/odoo": self.module_type = "odoo" elif repository_name.startswith("oca"): self.module_type = "oca" else: self.module_type = "custom" @property def workload(self): return sum( round(module_version.workload) for _, module_version in self.module_versions.items() ) def is_ignored(self, ctx, module_name): """Return true if module should be ignored""" settings = ctx.obj["config"]["workload_settings"] return module_name in settings["ignored_module_list"] def get_module_version(self, current_version): res = self.module_versions.get(current_version, False) return res def get_odoo_apps_url(self): logger.info(f"Searching {self.name} in the Odoo appstore ...") url = ( f"https://apps.odoo.com/apps/modules/" f"{self.analyse.initial_version}/{self.name}/" ) try: response = requests.get(url) except requests.exceptions.RequestException as err: logger.warning(f"Error when trying to get {url}: {err}") return False if response.status_code == 200: return url return False def get_odoo_code_search_url(self): logger.info(f"Searching {self.name} in Odoo-Code-Search ...") url = ( f"https://odoo-code-search.com/ocs/search?" f"q=name%3A%3D{self.name}+version%3A{self.analyse.initial_version}" ) result = requests.get(url) if '404' in result.text: return False return url @classmethod def get_addon_path(cls, ctx, module_name, current_version): """Search the module in all the addons path of a given version and return the addon path of the module, or False if not found. For exemple find_repository(ctx, 'web_responsive', 12.0) '/PATH_TO_LOCAL_ENV/src/oca/web' """ # Try to find the repository that contains the module main_path = get_odoo_env_path(ctx, current_version) addons_path, _ = get_odoo_addons_path( ctx, main_path, {"version": current_version, "execution_context": "openupgrade"}, ) for addon_path in addons_path: if (main_path / addon_path / module_name).exists(): return main_path / addon_path return False @classmethod def get_repository_name(cls, addon_path): """Given an addons path that contains odoo modules in a folder that has been checkouted via git, return a repository name with the following format org_name/repo_name. For exemple 'oca/web' or 'odoo/odoo' """ # TODO, make the code cleaner and more resiliant # for the time being, the code will fail for # - github url set with git+http... # - gitlab url # - if odoo code is not in a odoo folder in the repos.yml file... odoo_odoo_addons = ( "odoo/odoo/addons", "odoo/openerp/addons", "openupgrade/odoo/addons", "openupgrade/openerp/addons", ) odoo_addons = ( "odoo/addons", "openupgrade/addons", ) if str(addon_path).endswith(odoo_odoo_addons): path = addon_path.parent.parent elif str(addon_path).endswith(odoo_addons): path = addon_path.parent else: path = addon_path try: repo = Repo(str(path)) except InvalidGitRepositoryError as err: logger.critical(f"{path} is not a Git Repository.") raise err github_url_prefixes = ( "https://github.com/", "git@github.com:", ) repository_names = [] for remote in repo.remotes: # Standardize all repository_name to lower case repository_name = remote.url.lower() for github_url_prefix in github_url_prefixes: repository_name = repository_name.replace( github_url_prefix, "" ) if repository_name.endswith(".git"): repository_name = repository_name[: -len(".git")] repository_names.append(repository_name) # find main repository_name main_repository_name = next( ( repo_name for repo_name in repository_names if repo_name.startswith("oca") ), None, ) if not main_repository_name: main_repository_name = repository_names[0] if main_repository_name == "oca/openupgrade": return "odoo/odoo" else: return main_repository_name def __eq__(self, other): if isinstance(other, str): return self.unique_name == other elif isinstance(other, OdooModule): return self.unique_name == other.unique_name def __lt__(self, other): if self.module_type != other.module_type: if self.module_type == "odoo": return True elif self.module_type == "oca" and other.module_type in [ "custom", "not_found", ]: return True elif ( self.module_type == "custom" and other.module_type == "not_found" ): return True else: return False elif self.repository != other.repository: return self.repository < other.repository else: return self.name < other.name class OdooModuleVersion(object): _exclude_directories = [ "lib", "demo", "test", "tests", "doc", "description", ] _manifest_files = ["__openerp__.py", "__manifest__.py"] _file_extensions = [".py", ".xml", ".js"] def __init__( self, version, odoo_module, addon_path, state=False, target_module=False, ): self.version = version self.odoo_module = odoo_module self.addon_path = addon_path self.state = "ignored" if odoo_module.ignored else state self.target_module = target_module self.openupgrade_state = "" self.python_code = 0 self.xml_code = 0 self.javascript_code = 0 self._workload = 0 self.analysis_file = False self.openupgrade_model_lines = 0 self.openupgrade_field_lines = 0 self.openupgrade_xml_lines = 0 @property def workload(self): return int(round(self._workload)) @workload.setter def workload(self, value): self._workload = int(round(value)) def get_last_existing_version(self): if self.odoo_module.module_type != "not_found": versions = list(self.odoo_module.module_versions.values()) return [x for x in filter(lambda x: x.addon_path, versions)][-1] else: return False def get_requirements(self, ctx): result = { "python": [], "bin": [], "module_name": self.odoo_module.name, "version": self.version, } manifest_path = False for manifest_name in self._manifest_files: if not self.odoo_module.name or not self.addon_path: continue manifest_path = ( self.addon_path / self.odoo_module.name / manifest_name ) if manifest_path.exists(): break if not manifest_path or not manifest_path.exists(): return result with manifest_path.open(mode="r", encoding="utf-8") as f_manifest: manifest = ast.literal_eval(f_manifest.read()) python_libs = manifest.get("external_dependencies", {}).get( "python", [] ) bin_libs = manifest.get("external_dependencies", {}).get("bin", []) result["bin"] = bin_libs # Handle specific replacement in the setup folder setup_path = ( self.addon_path / "setup" / self.odoo_module.name / "setup.py" ) if setup_path.exists(): with setup_path.open(mode="r", encoding="utf-8") as f_setup: tree = ast.parse(source=f_setup.read()) for node in ast.walk(tree): if ( node.__class__.__name__ == "Dict" and "external_dependencies_override" in [k.value for k in node.keys] ): python_replacements = ast.literal_eval( ast.unparse(node) )["external_dependencies_override"] break else: python_replacements = {} for k, v in python_replacements.items(): if k in python_libs: python_libs.remove(k) result["python"].append(v) result["python"] += python_libs return result def estimate_workload(self, ctx): settings = ctx.obj["config"]["workload_settings"] port_minimal_time = settings["port_minimal_time"] port_per_version = settings["port_per_version"] port_per_python_line_time = settings["port_per_python_line_time"] port_per_javascript_line_time = settings[ "port_per_javascript_line_time" ] port_per_xml_line_time = settings["port_per_xml_line_time"] open_upgrade_minimal_time = settings["open_upgrade_minimal_time"] openupgrade_model_line_time = settings["openupgrade_model_line_time"] openupgrade_field_line_time = settings["openupgrade_field_line_time"] openupgrade_xml_line_time = settings["openupgrade_xml_line_time"] if self.state in ["merged", "renamed", "normal_loss", "ignored"]: # The module has been moved, nothing to do return if self.odoo_module.module_type == "odoo": if self.version == self.odoo_module.analyse.initial_version: # No work to do for the initial version return if self.openupgrade_state and ( self.openupgrade_state.lower().startswith("done") or self.openupgrade_state.lower().startswith("nothing to do") or self.openupgrade_state.lower().startswith( "no db layout changes" ) ): return else: self.workload = ( # Minimal openupgrade time open_upgrade_minimal_time # Add model time + ( openupgrade_model_line_time * self.openupgrade_model_lines ) # Add field Time + ( openupgrade_field_line_time * self.openupgrade_field_lines ) # Add XML Time + (openupgrade_xml_line_time * self.openupgrade_xml_lines) ) # OCA / Custom Module if self.version != self.odoo_module.analyse.final_version: # No need to work for intermediate version (in theory ;-)) return if self.addon_path: # The module has been ported, nothing to do return previous_module_version = self.get_last_existing_version() if not previous_module_version: return self.workload = ( # Minimal port time port_minimal_time # Add time per version + (self.version - previous_module_version.version) * port_per_version # Add python time + (port_per_python_line_time * previous_module_version.python_code) # Add XML Time + (port_per_xml_line_time * previous_module_version.xml_code) # Add Javascript Time + ( port_per_javascript_line_time * previous_module_version.javascript_code ) ) def analyse_size(self): self.python_code = 0 self.xml_code = 0 self.javascript_code = 0 # compute file list to analyse file_list = [] for root, dirs, files in os.walk( self.addon_path / Path(self.odoo_module.name), followlinks=True ): relative_path = os.path.relpath(Path(root), self.addon_path) if set(Path(relative_path).parts) & set(self._exclude_directories): continue for name in files: if name in self._manifest_files: continue filename, file_extension = os.path.splitext(name) if file_extension in self._file_extensions: file_list.append( (os.path.join(root, name), file_extension) ) # Analyse files for file_path, file_ext in file_list: file_res = SourceAnalysis.from_file( file_path, "", encoding="utf-8" ) if file_ext == ".py": self.python_code += file_res.code elif file_ext == ".xml": self.xml_code += file_res.code elif file_ext == ".js": self.javascript_code += file_res.code def analyse_openupgrade_state(self, coverage_analysis): if self.version == self.odoo_module.analyse.initial_version: return self.openupgrade_state = coverage_analysis[self.version].get( self.odoo_module.name, False ) def analyse_openupgrade_work(self, analysis_files): if self.version == self.odoo_module.analyse.initial_version: return analysis_file = analysis_files.get(self.odoo_module.name, False) if not analysis_file: return self.analysis_file = analysis_file with open(analysis_file, "r") as input_file: line_type = False for line in input_file.readlines(): if line.startswith("---Models in module"): line_type = "model" continue elif line.startswith("---Fields in module"): line_type = "field" continue elif line.startswith("---XML records in module"): line_type = "xml" continue elif line.startswith("---nothing has changed in this module"): continue elif line.startswith("---"): raise Exception(f"comment {line} not understood") if line_type == "model": self.openupgrade_model_lines += 1 elif line_type == "field": self.openupgrade_field_lines += 1 elif line_type == "xml": self.openupgrade_xml_lines += 1 def workload_hour_text(self): if not self.workload: return "" hour = int(self.workload // 60) minute = round(self.workload % 60) return f"{hour}h{minute:02}" def get_size_text(self): data = { "Python": self.python_code, "XML": self.xml_code, "JavaScript": self.javascript_code, } # Remove empty values data = {k: v for k, v in data.items() if v} if not data: return "" else: return ", ".join(["%s: %s" % (a, b) for a, b in data.items()]) def get_analysis_text(self): data = { "Model": self.openupgrade_model_lines, "Field": self.openupgrade_field_lines, "XML": self.openupgrade_xml_lines, } # Remove empty values data = {k: v for k, v in data.items() if v} if not data: return "" else: return ", ".join(["%s: %s" % (a, b) for a, b in data.items()]) def analysis_url(self): return os.path.relpath(self.analysis_file, Path(os.getcwd())) def analyse_missing_module(self): last_existing_version = self.get_last_existing_version() if not last_existing_version: return last_existing_version.analyse_size() def get_bg_color(self): if self.addon_path: if ( self.odoo_module.module_type == "odoo" and self.version != self.odoo_module.analyse.initial_version ): if self.openupgrade_state and ( self.openupgrade_state.lower().startswith("done") or self.openupgrade_state.lower().startswith( "nothing to do" ) or self.openupgrade_state.lower().startswith( "no db layout changes" ) ): return "lightgreen" else: return "orange" return "lightgreen" else: # The module doesn't exist in the current version if self.state in ["merged", "renamed", "normal_loss", "ignored"]: # Normal case, the previous version has been renamed # or merged return "lightgray" if self.odoo_module.module_type == "odoo": # A core module disappeared and has not been merged # or renamed return "red" elif self.version != self.odoo_module.analyse.final_version: return "lightgray" elif self.odoo_module.module_type == "not_found": return "lightgray" else: return "orange" def get_text(self): if self.addon_path: if ( self.odoo_module.module_type == "odoo" and self.version != self.odoo_module.analyse.initial_version ): if self.openupgrade_state: return self.openupgrade_state else: return "To analyse" else: if self.state == "merged": return f"Merged into {self.target_module}" elif self.state == "renamed": return f"Renamed into {self.target_module}" elif self.state == "ignored": return "Ignored" elif self.state == "normal_loss": return "" if self.odoo_module.module_type == "odoo": # A core module disappeared and has not been merged # or renamed return "Module lost" else: last_existing_version = self.get_last_existing_version() if not last_existing_version: return "Unknown" elif self.version != self.odoo_module.analyse.final_version: return "Unported" else: return f"To port from {last_existing_version.version}"