fakubicmal
Gold Üye
- Katılım
- 12 Tem 2024
- Mesajlar
- 318
- Beğeniler
- 53
birkaç ay önce yapay zekaya kodlatmıştım şu an denediğimde dosyayı seçip başlatıyorum 0 proxy buluyor önceden çalışıyordu kodu burda
import tkinter as tkfrom tkinter import filedialog, messagebox, scrolledtextimport threadingimport socksimport refrom concurrent.futures import ThreadPoolExecutorimport osfrom datetime import datetimeclass ProxyCheckerApp: def __init__(self, root): self.root = root root.title("Proxy Checker") root.geometry("900x580") root.configure(bg="black") self.select_btn = tk.Button(root, text="Proxy Dosyası Seç", command=self.select_file, fg="lime", bg="black") self.select_btn.place(x=10, y=10) self.file_label = tk.Label(root, text="Henüz dosya seçilmedi", fg="red", bg="black") self.file_label.place(x=10, y=40) tk.Label(root, text="Thread Sayısı Ayarı", fg="lime", bg="black").place(x=10, y=70) self.thread_count_var = tk.IntVar(value=20) self.thread_spin = tk.Spinbox(root, from_=1, to=100, width=5, textvariable=self.thread_count_var) self.thread_spin.place(x=10, y=95) self.start_btn = tk.Button(root, text="Başlat", command=self.start_test, fg="lime", bg="black") self.start_btn.place(x=10, y=130) self.stop_btn = tk.Button(root, text="Durdur", command=self.stop_test, state="disabled", fg="lime", bg="black") self.stop_btn.place(x=10, y=170) self.resume_btn = tk.Button(root, text="Devam Et", command=self.resume_test, state="disabled", fg="lime", bg="black") self.resume_btn.place(x=10, y=210) self.cancel_btn = tk.Button(root, text="İptal Et Ve Kaydet", command=self.cancel_and_save, state="disabled", fg="lime", bg="black") self.cancel_btn.place(x=10, y=250) self.log_area = scrolledtext.ScrolledText(root, height=25, width=80, state='disabled', bg="black", fg="lime") self.log_area.place(x=200, y=10) self.proxy_file_path = None self.executor = None self.futures = [] self.working = [] self.not_working = [] self.running = False self.stopped = False self.cancelled = False self.paused_proxies = [] self.lock = threading.Lock() def l0g(self, message, color="lime"): self.log_area.config(state='normal') self.log_area.insert(tk.END, message + "\n") self.log_area.tag_add(color, "end-2l", "end-1l") self.log_area.tag_config("lime", foreground="lime") self.log_area.tag_config("blue", foreground="cyan") self.log_area.tag_config("red", foreground="red") self.log_area.see(tk.END) self.log_area.config(state='disabled') def select_file(self): path = filedialog.askopenfilename(filetypes=[("Text files", "*.txt")]) if path: self.proxy_file_path = path self.file_label.config(text=f"Seçilen Dosya: {os.path.basename(path)}", fg="green") self.start_btn.config(state="normal") def start_test(self): if not self.proxy_file_path: messagebox.showerror("Hata", "Lütfen önce proxy dosyasını seçin.") return if self.running: self.l0g("[!] Zaten test çalışıyor.") return self.running = True self.stopped = False self.cancelled = False self.paused_proxies.clear() self.working.clear() self.not_working.clear() self.futures.clear() self.start_btn.config(state="disabled") self.stop_btn.config(state="normal") self.resume_btn.config(state="disabled") self.cancel_btn.config(state="normal") self.log_area.config(state='normal') self.log_area.delete(1.0, tk.END) self.log_area.config(state='disabled') threading.Thread(target=self.run_check, daemon=True).start() def stop_test(self): self.stopped = True self.stop_btn.config(state="disabled") self.resume_btn.config(state="normal") def resume_test(self): self.stopped = False self.stop_btn.config(state="normal") self.resume_btn.config(state="disabled") threading.Thread(target=self.process_proxies_chunk, daemon=True).start() def cancel_and_save(self): with self.lock: self.cancelled = True self.save_results() self.finish_run() def is_cancelled(self): with self.lock: return self.cancelled def run_check(self): try: with open(self.proxy_file_path, "r", encoding="utf-8") as f: raw = f.read() except Exception as e: self.l0g(f"[HATA] Dosya okunamadı: {e}", color="red") self.finish_run() return proxies = self.extract_proxies(raw) self.l0g(f"Toplam {len(proxies)} proxy bulundu. Test başlatılıyor...", color="lime") self.paused_proxies = proxies.copy() self.process_proxies_chunk() def process_proxies_chunk(self): if self.is_cancelled(): self.save_results() self.finish_run() return if self.stopped: self.root.after(500, self.process_proxies_chunk) return if not self.paused_proxies: self.save_results() self.finish_run() return max_threads = self.thread_count_var.get() max_threads = min(max(max_threads, 1), 100) chunk = [] with self.lock: while self.paused_proxies and len(chunk) < max_threads: chunk.append(self.paused_proxies.pop(0)) self.executor = ThreadPoolExecutor(max_workers=max_threads) self.futures = [self.executor.submit(self.test_proxy, proxy) for proxy in chunk] def callback(fut): if self.is_cancelled(): return try: proxy, result = fut.result() if result: self.l0g(f"[✔] {result.upper()} → {proxy}", color="blue") self.working.append((proxy, result)) else: self.l0g(f"[✖] ÇALIŞMIYOR → {proxy}", color="red") self.not_working.append(proxy) except Exception as e: self.l0g(f"[!] Hata: {e}", color="red") with self.lock: self.futures.remove(fut) if not self.futures: self.executor.shutdown(wait=False) self.root.after(100, self.process_proxies_chunk) for fut in self.futures: fut.add_done_callback(callback) def extract_proxies(self, text): proxies = [] for line in text.strip().splitlines(): match = re.search(r'(\d{1,3}(?:\.\d{1,3}){3})[\t :]+(\d{2,5})', line) if match: proxies.append(f"{match.group(1)}:{match.group(2)}") return proxies def test_proxy(self, proxy): ip, port = proxy.split(":") port = int(port) types_to_try = ['socks5', 'socks4', 'http', 'https'] for t in types_to_try: try: s = socks.socksocket() s.settimeout(5) if t == 'socks5': s.set_proxy(socks.SOCKS5, ip, port) elif t == 'socks4': s.set_proxy(socks.SOCKS4, ip, port) else: s.set_proxy(socks.HTTP, ip, port) s.connect(("8.8.8.8", 53)) s.close() return (proxy, t) except: try: s.close() except: pass return (proxy, None) def save_results(self): zaman = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") türlere_göre = { "socks5": [], "socks4": [], "http": [], "https": [], "fail": [] } for proxy, tür in self.working: if tür: türlere_göre[tür].append(proxy) else: türlere_göre["fail"].append(proxy) for tür in ["socks5", "socks4", "http", "https"]: if türlere_göre[tür]: with open(f"{tür}_{zaman}.txt", "w") as f: for p in türlere_göre[tür]: f.write(p + "\n") with open(f"calismayanlar_{zaman}.txt", "w") as f: for p in self.not_working + türlere_göre["fail"]: f.write(p + "\n") self.l0g("\n
Tüm türlerde test tamamlandı ve türlerine göre ayrıldı.", color="lime") def finish_run(self): self.running = False self.stopped = False self.cancelled = False self.start_btn.config(state="normal") self.stop_btn.config(state="disabled") self.resume_btn.config(state="disabled") self.cancel_btn.config(state="disabled") self.paused_proxies.clear()def main(): root = tk.Tk() app = ProxyCheckerApp(root) root.mainloop()if __name__ == "__main__": main()
sizce sorun nedir
import tkinter as tkfrom tkinter import filedialog, messagebox, scrolledtextimport threadingimport socksimport refrom concurrent.futures import ThreadPoolExecutorimport osfrom datetime import datetimeclass ProxyCheckerApp: def __init__(self, root): self.root = root root.title("Proxy Checker") root.geometry("900x580") root.configure(bg="black") self.select_btn = tk.Button(root, text="Proxy Dosyası Seç", command=self.select_file, fg="lime", bg="black") self.select_btn.place(x=10, y=10) self.file_label = tk.Label(root, text="Henüz dosya seçilmedi", fg="red", bg="black") self.file_label.place(x=10, y=40) tk.Label(root, text="Thread Sayısı Ayarı", fg="lime", bg="black").place(x=10, y=70) self.thread_count_var = tk.IntVar(value=20) self.thread_spin = tk.Spinbox(root, from_=1, to=100, width=5, textvariable=self.thread_count_var) self.thread_spin.place(x=10, y=95) self.start_btn = tk.Button(root, text="Başlat", command=self.start_test, fg="lime", bg="black") self.start_btn.place(x=10, y=130) self.stop_btn = tk.Button(root, text="Durdur", command=self.stop_test, state="disabled", fg="lime", bg="black") self.stop_btn.place(x=10, y=170) self.resume_btn = tk.Button(root, text="Devam Et", command=self.resume_test, state="disabled", fg="lime", bg="black") self.resume_btn.place(x=10, y=210) self.cancel_btn = tk.Button(root, text="İptal Et Ve Kaydet", command=self.cancel_and_save, state="disabled", fg="lime", bg="black") self.cancel_btn.place(x=10, y=250) self.log_area = scrolledtext.ScrolledText(root, height=25, width=80, state='disabled', bg="black", fg="lime") self.log_area.place(x=200, y=10) self.proxy_file_path = None self.executor = None self.futures = [] self.working = [] self.not_working = [] self.running = False self.stopped = False self.cancelled = False self.paused_proxies = [] self.lock = threading.Lock() def l0g(self, message, color="lime"): self.log_area.config(state='normal') self.log_area.insert(tk.END, message + "\n") self.log_area.tag_add(color, "end-2l", "end-1l") self.log_area.tag_config("lime", foreground="lime") self.log_area.tag_config("blue", foreground="cyan") self.log_area.tag_config("red", foreground="red") self.log_area.see(tk.END) self.log_area.config(state='disabled') def select_file(self): path = filedialog.askopenfilename(filetypes=[("Text files", "*.txt")]) if path: self.proxy_file_path = path self.file_label.config(text=f"Seçilen Dosya: {os.path.basename(path)}", fg="green") self.start_btn.config(state="normal") def start_test(self): if not self.proxy_file_path: messagebox.showerror("Hata", "Lütfen önce proxy dosyasını seçin.") return if self.running: self.l0g("[!] Zaten test çalışıyor.") return self.running = True self.stopped = False self.cancelled = False self.paused_proxies.clear() self.working.clear() self.not_working.clear() self.futures.clear() self.start_btn.config(state="disabled") self.stop_btn.config(state="normal") self.resume_btn.config(state="disabled") self.cancel_btn.config(state="normal") self.log_area.config(state='normal') self.log_area.delete(1.0, tk.END) self.log_area.config(state='disabled') threading.Thread(target=self.run_check, daemon=True).start() def stop_test(self): self.stopped = True self.stop_btn.config(state="disabled") self.resume_btn.config(state="normal") def resume_test(self): self.stopped = False self.stop_btn.config(state="normal") self.resume_btn.config(state="disabled") threading.Thread(target=self.process_proxies_chunk, daemon=True).start() def cancel_and_save(self): with self.lock: self.cancelled = True self.save_results() self.finish_run() def is_cancelled(self): with self.lock: return self.cancelled def run_check(self): try: with open(self.proxy_file_path, "r", encoding="utf-8") as f: raw = f.read() except Exception as e: self.l0g(f"[HATA] Dosya okunamadı: {e}", color="red") self.finish_run() return proxies = self.extract_proxies(raw) self.l0g(f"Toplam {len(proxies)} proxy bulundu. Test başlatılıyor...", color="lime") self.paused_proxies = proxies.copy() self.process_proxies_chunk() def process_proxies_chunk(self): if self.is_cancelled(): self.save_results() self.finish_run() return if self.stopped: self.root.after(500, self.process_proxies_chunk) return if not self.paused_proxies: self.save_results() self.finish_run() return max_threads = self.thread_count_var.get() max_threads = min(max(max_threads, 1), 100) chunk = [] with self.lock: while self.paused_proxies and len(chunk) < max_threads: chunk.append(self.paused_proxies.pop(0)) self.executor = ThreadPoolExecutor(max_workers=max_threads) self.futures = [self.executor.submit(self.test_proxy, proxy) for proxy in chunk] def callback(fut): if self.is_cancelled(): return try: proxy, result = fut.result() if result: self.l0g(f"[✔] {result.upper()} → {proxy}", color="blue") self.working.append((proxy, result)) else: self.l0g(f"[✖] ÇALIŞMIYOR → {proxy}", color="red") self.not_working.append(proxy) except Exception as e: self.l0g(f"[!] Hata: {e}", color="red") with self.lock: self.futures.remove(fut) if not self.futures: self.executor.shutdown(wait=False) self.root.after(100, self.process_proxies_chunk) for fut in self.futures: fut.add_done_callback(callback) def extract_proxies(self, text): proxies = [] for line in text.strip().splitlines(): match = re.search(r'(\d{1,3}(?:\.\d{1,3}){3})[\t :]+(\d{2,5})', line) if match: proxies.append(f"{match.group(1)}:{match.group(2)}") return proxies def test_proxy(self, proxy): ip, port = proxy.split(":") port = int(port) types_to_try = ['socks5', 'socks4', 'http', 'https'] for t in types_to_try: try: s = socks.socksocket() s.settimeout(5) if t == 'socks5': s.set_proxy(socks.SOCKS5, ip, port) elif t == 'socks4': s.set_proxy(socks.SOCKS4, ip, port) else: s.set_proxy(socks.HTTP, ip, port) s.connect(("8.8.8.8", 53)) s.close() return (proxy, t) except: try: s.close() except: pass return (proxy, None) def save_results(self): zaman = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") türlere_göre = { "socks5": [], "socks4": [], "http": [], "https": [], "fail": [] } for proxy, tür in self.working: if tür: türlere_göre[tür].append(proxy) else: türlere_göre["fail"].append(proxy) for tür in ["socks5", "socks4", "http", "https"]: if türlere_göre[tür]: with open(f"{tür}_{zaman}.txt", "w") as f: for p in türlere_göre[tür]: f.write(p + "\n") with open(f"calismayanlar_{zaman}.txt", "w") as f: for p in self.not_working + türlere_göre["fail"]: f.write(p + "\n") self.l0g("\n
sizce sorun nedir