import os import pprint import requests import json import subprocess crCommitBranch = os.environ.get("CI_COMMIT_REF_NAME", "NOTFOUND") apiUrl = os.environ.get("CI_API_V4_URL", "https://forge.3gpp.org/rep/api/v4") projectId = os.environ.get("CI_PROJECT_ID", "13") def gapi (query): url = f"{apiUrl}/projects/{projectId}/{query}" r = requests.get(url) return json.loads(r.text) def do (commandline): #print (" Attempting: " + commandline) completedProc = subprocess.run(commandline, capture_output=True, shell=True) #print (" STDOUT > " + ("empty" if completedProc.stdout is None else completedProc.stdout.decode('utf-8'))) #print (" STDERR > " + ("empty" if completedProc.stderr is None else completedProc.stderr.decode('utf-8'))) #print (f" Completed with code {completedProc.returncode}") return (completedProc.returncode == 0, completedProc.stdout.decode('utf-8')) print ("Searching for corresponding MR...") mrs = gapi(f"merge_requests?source_branch={crCommitBranch}&state=opened") if len(mrs) == 0: print ("No MR found... aborting") exit() if len(mrs) > 1: print (f"{len(mrs)} MRs found, 1 expected - aborting") for m in mrs: pprint.pprint(m) exit(-1) mr = mrs[0] print (f"Found MR {mr['reference']} ({mr['title']})") print (f"Target branch is {mr['target_branch']}") print ("Searching for open MRs targeting same branch...") mrs = gapi(f"merge_requests?target_branch={mr['target_branch']}&state=opened") mrs = [m for m in mrs if m['reference'] != mr['reference']] print (f"{len(mrs)} MRs found") mergeConflicts = {} for mr in mrs: source_branch = mr['source_branch'] print (source_branch) try: do(f"git fetch origin {source_branch}:{source_branch}") success, errStr = do(f"git merge --no-commit {source_branch}") if not success: print ("Merge NOT OK") mergeConflicts[source_branch] = errStr else: print ("Merge OK") except Exception as ex: mergeConflicts[source_branch] = str(ex) raise finally: do("git merge --abort") print (f"Merge conflicts with following branches: {mergeConflicts}") exit(len(mergeConflicts.keys()))