import re import subprocess # 不自动处理的规则 ex = ("SA4006", "structcheck") # 不自动处理的规则 # 忽略error返回白名单,白名单才处理 errwl = { "ymetrics": "_ = ", "yalert": "_ = ", "proto.Unmarshal": "_ = ", "json.Unmarshal": "_ = ", "jsoniter.Unmarshal": "_ = ", "c.Send": " _ = ", } # strings忽略大小写的比较 reef = re.compile("^strings\\.\\w+\\(([\\.\\w]+)\\) == [\\.\\w]+\\(([\\.\\w]+)\\)") files = {} # 获取文件名、行号、错误类型 def getfilelinetype(l): ss = l.split(":") print(ss) return ss[0], int(ss[1]) - 1, int(ss[2]) # startswith,跳过\t def sw(l, p): return l.strip().startswith(p) # endswith def ew(l, s): return l.strip().endswith(s) # 处理一个文件 class File: def __init__(self, name): self.name = name fh = open(name, "r") self.ls = fh.readlines() fh.close() self.comments = set({}) # 被注释的行 self.dels = set({}) # 被删除的行 def Unused(self, l, offset): if not ew(self.ls[l], "{"): self.comments.add(l) else: for i in range(l, len(self.ls), 1): self.comments.add(i) if self.ls[i].startswith("}"): # 不能用sw,不用中间的block也会有一个独立的} break elif len(self.ls[i]) > offset and self.ls[i][offset:].startswith("}"): break def RemoveBlankReceiver(self, l): self.ls[l] = self.ls[l].replace(", _", "") def MapSimple(self, l): self.ls[l] = self.ls[l].replace(", 0)", ")") def EqualFold(self, l, offset): self.ls[l] = self.ls[l][:offset] + re.sub( reef, "strings.EqualFold(\\1, \\2)", self.ls[l][offset:] ) def ErrorCheck(self, l): # 白名单的函数才会处理,其他的还是得看看需不需要判断 if sw(self.ls[l], "defer "): # defer的这种,需要加个函数体才行,这里的修改不一定对,返回值的数量都默认按1个处理了 ts = False nts = False for i in range(l, len(self.ls)): if self.ls[i].find("time.Now().Unix()") >= 0: ts = True self.ls[i] = self.ls[i].replace("time.Now().Unix()", "ts", -1) if self.ls[i].find("time.Now().UnixNano()") >= 0: nts = True self.ls[i] = self.ls[i].replace("time.Now().UnixNano()", "nts", -1) if ew(self.ls[i], ")"): p1 = "" p2 = "" if ts: if nts: p1 = "ts, nts int64" p2 = "time.Now().Unix(), time.Now().UnixNano()" else: p1 = "ts int64" ps = "time.Now().Unix()" elif nts: p1 = "nts int64" p2 = "time.Now().UnixNano()" self.ls[l] = self.ls[l].replace( "defer ", "defer func({}) {{ _ = ".format(p1) ) self.ls[i] = self.ls[i].replace("\n", "}} ({})\n".format(p2)) break elif sw(self.ls[l], "go "): self.ls[l] = self.ls[l].replace("go ", "go func() { _ = ") for i in range(l, len(self.ls)): if ew(self.ls[i], ")"): self.ls[i] = self.ls[i].replace("\n", "} ()\n") break else: for w in errwl: if sw(self.ls[l], w): self.ls[l] = errwl[w] + self.ls[l] break def ContextNil(self, l, offset): self.ls[l] = self.ls[l][:offset] + self.ls[l][offset:].replace( "nil", "context.Background()", 1 ) def SprintConst(self, l, offset): self.ls[l] = self.ls[l][:offset] + self.ls[l][offset:].replace( "fmt.Sprintf(", "", 1 ).replace('")', '"', 1) def SprintNoArg(self, l, offset): self.ls[l] = self.ls[l][:offset] + self.ls[l][offset:].replace( "fmt.Sprintf(", "", 1 ).replace(")", "", 1) def SimpleReturn( self, l, line ): # should use 'return status.IsVideoOpen' instead of 'if status.IsVideoOpen { return true }; return false' (gosimple) idx = line.find(" should use '") line = line[idx + len(" should use '") :] idx = line.find("'") simple = line[:idx] idx = line.rfind("return ") p = line[idx:] idx = p.find("'") end = p[:idx] # print('simple is:',simple, ' end is:', end) if self.ls[l].find(end) > 0: self.ls[l] = simple + "\n" else: self.ls[l] = simple + "\n" for i in range(l + 1, len(self.ls)): self.dels.add(i) if self.ls[i].find(end) >= 0: break def Write(self): fh = open(self.name, "w") for i, l in enumerate(self.ls): if i in self.dels: continue elif i in self.comments: fh.write("//") fh.write(l) fh.close() def getfile(name): if name in files: return files[name] f = File(name) files[name] = f return f def process(k): for i in range(0, len(k), 3): line = k[i].decode("utf-8").strip() notcheck = False for t in ex: if line.find(t) >= 0: notcheck = True if notcheck: continue name, l, t = getfilelinetype(line) f = getfile(name) if ( line.find("is unused (deadcode)") >= 0 or line.find("is unused (unused)") >= 0 ): # unused f.Unused(l, len(k[i + 2]) - 1) elif line.find("redundant `return` statement") >= 0: # 多余的return f.Unused(l, 0) elif ( line.find("unnecessary assignment to the blank identifier") >= 0 ): # 取map的变量的时候,用了a,_ := m[xxx] f.RemoveBlankReceiver(l) elif line.find("should use make(map") >= 0: f.MapSimple(l) elif line.find("should use strings.EqualFold instead") >= 0: f.EqualFold(l, len(k[i + 2]) - 1) elif line.find("is not checked (errcheck)") >= 0: f.ErrorCheck(l) elif ( line.find("pass context.TODO if you are unsure about which Context to use") >= 0 ): f.ContextNil(l, len(k[i + 2]) - 1) elif line.find("unnecessary use of fmt.Sprintf (gosimple)") >= 0: f.SprintConst(l, len(k[i + 2]) - 1) elif ( line.find( "printf-style function with dynamic format string and no further arguments should use print-style function instead" ) >= 0 ): f.SprintNoArg(l, len(k[i + 2]) - 1) elif line.find("should use 'return ") >= 0 and line.endswith("(gosimple)"): f.SimpleReturn(l, line) cnt = 0 lk = 0 while True: cnt += 1 files = {} print("running golangci-lint(第{}次) ...".format(cnt), end="", flush=True) ret = subprocess.run( [ "golangci-lint", "run", "--tests=false", "--max-issues-per-linter=0", "--max-same-issues=0", ], stdout=subprocess.PIPE, ) k = ret.stdout.splitlines() print("done. has {} issues".format(len(k) / 3)) if lk == len(k): print("issue 数量稳定。lint结束") break else: lk = len(k) print("fixing issues ...", end="") process(k) for f in files: files[f].Write() print("done") print("running goimports ...", end="", flush=True) subprocess.run(["goimports", "-w", "./"], stdout=subprocess.PIPE) print("done") print("running gofmt ...", end="", flush=True) subprocess.run(["gofmt", "-w", "./"], stdout=subprocess.PIPE) print("done")