diff --git a/README.md b/README.md index abb5be2..29dc205 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ targets = [ # Initialize proxy relay proxy_relay = AProxyRelay( targets=targets, - timeout=5, + timeout=30, scrape=True, filter=True, zones=['us'], diff --git a/aproxyrelay/__init__.py b/aproxyrelay/__init__.py index d9a64ca..a8e82c4 100644 --- a/aproxyrelay/__init__.py +++ b/aproxyrelay/__init__.py @@ -14,10 +14,11 @@ """ from asyncio import get_event_loop, gather from datetime import datetime, UTC -from logging import basicConfig, INFO, DEBUG, getLogger from typing import Callable from queue import Queue +import logging + from .core import AProxyRelayCore @@ -25,7 +26,7 @@ class AProxyRelay(AProxyRelayCore): def __init__( self, targets: list[str], - timeout: int = 5, + timeout: int = 30, scrape: bool = True, filter: bool = True, zones: list[str] = ['US'], # noqa: B006 @@ -48,7 +49,7 @@ def __init__( ```py proxy_relay = AProxyRelay( targets=targets, - timeout=5, + timeout=30, scrape=True, filter=True, zones=['US', 'DE'], @@ -58,8 +59,13 @@ def __init__( ``` """ # Configure the logger - basicConfig(level=INFO if not debug else DEBUG) - self.logger = getLogger(__name__) + logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG) + self.logger = logging.getLogger(__name__) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG if debug else logging.INFO) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) # Initialize Core AProxyRelayCore.__init__(self) diff --git a/aproxyrelay/core.py b/aproxyrelay/core.py index efbb8b4..4979380 100644 --- a/aproxyrelay/core.py +++ b/aproxyrelay/core.py @@ -79,8 +79,7 @@ async def get_proxies(self) -> None: if self.filter and self.scrape: self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950 - async with ClientSession(conn_timeout=15) as session: - await self._test_all_proxies(session) + await self._test_all_proxies() self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950 else: while not self._queue_filter.empty(): diff --git a/aproxyrelay/process.py b/aproxyrelay/process.py index 971cde9..c0f8a81 100644 --- a/aproxyrelay/process.py +++ b/aproxyrelay/process.py @@ -11,8 +11,6 @@ Process class, once all proxies have been received, we are going to obtain the data for the targets. This class contains the core mechanics for scraping the targets. """ -from aiosocks2.connector import ProxyConnector, ProxyClientRequest -from aiohttp import ClientSession from asyncio import gather from queue import Queue @@ -32,31 +30,29 @@ async def _process_targets_main(self) -> None: """ self.logger.info('[aProxyRelay] Processing ...') - async with ClientSession( - connector=ProxyConnector(remote_resolve=True), - request_class=ProxyClientRequest, - conn_timeout=self.timeout - ) as session: - tasks = [] + tasks = [] - while not self._queue_target_process.empty(): - proxy = self.proxies.get() - if isinstance(proxy, dict): - proxy = f"{proxy['protocol'].replace('https', 'http')}://{proxy['ip']}:{proxy['port']}" - target = self._queue_target_process.get() + while not self._queue_target_process.empty(): + proxy = self.proxies.get() + if isinstance(proxy, dict): + proxy = f"{proxy['protocol'].replace('https', 'http')}://{proxy['ip']}:{proxy['port']}" + target = self._queue_target_process.get() - # Append the coroutine object to the tasks list - tasks.append(self._obtain_targets(proxy, target, session)) - self.proxies.put(proxy) + # Append the coroutine object to the tasks list + tasks.append(self._obtain_targets(proxy, target)) - self.proxies = Queue() - # Use asyncio.gather to concurrently execute all tasks - await gather(*tasks) + # Use asyncio.gather to concurrently execute all tasks + await gather(*tasks) - self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue ... Please wait...') + self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950 + # Proxy queue is empty but targets are available if self.proxies.empty() and self._queue_target_process.qsize() > 0: + self.logger.info( + f'[aProxyRelay] All Proxies exhausted ({self._queue_target_process.qsize()}) items left in Queue ... Please wait...' + ) await self.get_proxies() await self.process_targets() + # Proxy queue has proxies targets are available elif not self.proxies.empty() and self._queue_target_process.qsize() > 0: await self.process_targets() diff --git a/aproxyrelay/req.py b/aproxyrelay/req.py index b5589ee..556df70 100644 --- a/aproxyrelay/req.py +++ b/aproxyrelay/req.py @@ -10,15 +10,15 @@ Class which handles all requests made throughout the library. """ -from aiohttp.client_exceptions import ClientHttpProxyError, \ - ServerDisconnectedError, \ - ClientProxyConnectionError, \ +from ssl import SSLCertVerificationError, SSLError +from aiohttp import ClientSession, ClientTimeout +from aiohttp.client_exceptions import ServerDisconnectedError, \ ClientResponseError, \ ClientOSError, \ - ServerTimeoutError, \ - InvalidURL -from aiosocks2.errors import SocksError -from asyncio import gather, TimeoutError + InvalidURL, \ + ConnectionTimeoutError +from aiohttp_socks import ProxyConnectionError, ProxyConnector, ProxyError +from asyncio import IncompleteReadError, gather, TimeoutError from json import dumps from .scrapers import proxy_list @@ -79,12 +79,9 @@ async def _request_proxy_page(self, url, session) -> None: else: self.proxies.put(row) - async def _test_all_proxies(self, session): + async def _test_all_proxies(self): """ Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`. - - Args: - session: aiohttp session without proxy support """ # Use asyncio.gather to run multiple tests concurrently to_filter = [] @@ -95,10 +92,10 @@ async def _test_all_proxies(self, session): # Remove duplicate entries to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))] - tasks = [self._test_proxy_link(proxy['proxy'], proxy, session) for proxy in to_filter] + tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter] await gather(*tasks) - async def _test_proxy_link(self, proxy_url, data, session) -> None: + async def _test_proxy_link(self, proxy_url, data) -> None: """ Asynchronously call gg.my-dev.app, a website built by the creator of this package. If the connection was successful, the proxy works! @@ -109,31 +106,48 @@ async def _test_proxy_link(self, proxy_url, data, session) -> None: proxy_url: The URL of the proxy to be tested. data: Additional data for the proxy test. """ + # If port is empty, assume port 80 + if data['port'] == '': + data['port'] = '80' + # Make sure port is range + if int(data['port']) < 0 or int(data['port']) > 65535: return try: - async with session.post( - 'https://gg.my-dev.app/api/v1/proxies/validate/lib', - proxy=proxy_url, - headers={ - **self._get_header(), - 'Content-Type': 'application/json' - }, - data=dumps(data) - ) as response: - if response.status == 200: - self.proxies.put(data) - self._filtered_available = self._filtered_available + 1 - else: - self._filtered_failed = self._filtered_failed + 1 + self.logger.debug(f'[aProxyRelay] Processing: {proxy_url} -> Added to queue') + connector = ProxyConnector.from_url(proxy_url.replace('unknown', 'socks4')) + timeout = ClientTimeout(total=self.timeout, connect=self.timeout) + async with ClientSession(connector=connector, timeout=timeout) as session: + async with session.post( + 'https://gg.my-dev.app/api/v1/proxies/validate/lib', + headers={ + **self._get_header(), + 'Content-Type': 'application/json' + }, + data=dumps(data) + ) as response: + if response.status == 200: + self.proxies.put(data) + self._filtered_available = self._filtered_available + 1 + self.logger.debug(f'[aProxyRelay] Succeed: {proxy_url} -> Freshly Discovered') + else: + self._filtered_failed = self._filtered_failed + 1 + self.logger.debug(f'[aProxyRelay] Succeed: {proxy_url} -> Addres Known') except ( - ClientHttpProxyError, - ServerDisconnectedError, - ClientProxyConnectionError, - ClientResponseError, ClientOSError, - ServerTimeoutError, InvalidURL, ConnectionResetError, - ): + ProxyError, + SSLCertVerificationError, + ProxyConnectionError, + ConnectionTimeoutError, + IncompleteReadError, + UnicodeEncodeError, + SSLError, + ConnectionAbortedError, + ServerDisconnectedError, + ClientResponseError, + TimeoutError + ) as e: + self.logger.debug(f'[aProxyRelay] Failed: {proxy_url} -> {repr(e)}') self._filtered_failed = self._filtered_failed + 1 async def _fetch_proxy_servers(self, urls, session): @@ -172,7 +186,7 @@ async def _request_proxy_servers(self, url, session) -> None: self.proxies.put(row) self._filtered_ggs = self._filtered_ggs + 1 - async def _obtain_targets(self, proxy_url, target, session) -> None: + async def _obtain_targets(self, proxy_url, target) -> None: """ Asynchronously fetch the targets with our proxies. The 'steam' variable should be defaulted to False and should only be used when targeting Steam. @@ -182,37 +196,44 @@ async def _obtain_targets(self, proxy_url, target, session) -> None: proxy_url: The URL of the proxy to be used for the request. """ try: - async with session.get( - target, - proxy=proxy_url, - headers={ - **self._get_header(), - 'Content-Type': 'application/json' - }, - ) as response: - status = response.status - if status in (200, 202,): - self.proxies.put(proxy_url) - data = await response.json() - if data: - if pack := self.unpack(data, target): - self._queue_result.put(pack) + connector = ProxyConnector.from_url(proxy_url.replace('unknown', 'socks4')) + timeout = ClientTimeout(total=self.timeout, connect=self.timeout) + async with ClientSession(connector=connector, timeout=timeout) as session: + async with session.get( + target, + headers={ + **self._get_header(), + 'Content-Type': 'application/json' + }, + ) as response: + status = response.status + if status in (200, 202,): + self.proxies.put(proxy_url) + data = await response.json() + if data: + if pack := self.unpack(data, target): + self._queue_result.put(pack) + else: + self.logger.warning(f'[aProxyRelay] Could not unpack data for: {target}') else: - self.logger.warning(f'[aProxyRelay] Could not unpack data for: {target}') + self.logger.warning(f'[aProxyRelay] Target {target} Data seems to be None: {data}') else: - self.logger.warning(f'[aProxyRelay] Target {target} Data seems to be None: {data}') - else: - self._queue_target_process.put(target) - + self._queue_target_process.put(target) except ( - ClientHttpProxyError, - ServerDisconnectedError, - ClientProxyConnectionError, - ClientResponseError, ClientOSError, - ServerTimeoutError, InvalidURL, - SocksError, - TimeoutError, - ): + ConnectionResetError, + ProxyError, + SSLCertVerificationError, + ProxyConnectionError, + ConnectionTimeoutError, + IncompleteReadError, + UnicodeEncodeError, + SSLError, + ConnectionAbortedError, + ServerDisconnectedError, + ClientResponseError, + TimeoutError + ) as e: + self.logger.debug(f'[aProxyRelay] Failed: {target} -> {repr(e)}') self._queue_target_process.put(target) diff --git a/example.py b/example.py index c941b08..b13bb85 100644 --- a/example.py +++ b/example.py @@ -9,7 +9,7 @@ # Initialize proxy relay proxy_relay = AProxyRelay( targets=targets, - timeout=5, + timeout=30, scrape=True, filter=True, zones=['us'], diff --git a/setup.py b/setup.py index 68a2686..6f257df 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ packages=find_packages(), install_requires=[ 'aiohttp', - 'aiosocks2', + 'aiohttp_socks', 'beautifulsoup4', ], extras_require={