From 6257688600c9a86ba5517955c60b96d0af967031 Mon Sep 17 00:00:00 2001 From: 0x78f1935 Date: Mon, 25 Nov 2024 14:45:48 +0100 Subject: [PATCH 1/3] CHANGE: Chunk tasks to preserve bandwidth --- aproxyrelay/process.py | 16 ++++++++++++++-- aproxyrelay/req.py | 21 +++++++++++++++++---- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/aproxyrelay/process.py b/aproxyrelay/process.py index c0f8a81..c371175 100644 --- a/aproxyrelay/process.py +++ b/aproxyrelay/process.py @@ -22,6 +22,10 @@ def __init__(self) -> None: """ self._queue_result = Queue() # Holds target results + def _chunk_list(self, lst, chunk_size): + """Chunks a list in its desired size""" + return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] + async def _process_targets_main(self) -> None: """ Start the Proxy Relay Processor. Proxies in the queue are nothing less than burners. @@ -41,8 +45,16 @@ async def _process_targets_main(self) -> None: # Append the coroutine object to the tasks list tasks.append(self._obtain_targets(proxy, target)) - # Use asyncio.gather to concurrently execute all tasks - await gather(*tasks) + # We have to chunk our tasks, otherwise the internet bandwitdh might be compromised + chunks = self._chunk_list(tasks, 10000) + i = 0 + for chunk in chunks: + self.logger.info(f"[aProxyRelay] Processing ({i}/{len(tasks)}) ... please wait ...") + i += int(len(chunk)) + # Use asyncio.gather to concurrently execute all tasks + await gather(*chunk) + # # Use asyncio.gather to concurrently execute all tasks + # await gather(*tasks) self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950 diff --git a/aproxyrelay/req.py b/aproxyrelay/req.py index 556df70..31c7e39 100644 --- a/aproxyrelay/req.py +++ b/aproxyrelay/req.py @@ -31,6 +31,10 @@ def __init__(self) -> None: """ self.logger.info("[aProxyRelay] Request module initialized!") + def _chunk_list(self, lst, chunk_size): + """Chunks a list in its desired size""" + return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] + async def _fetch_proxy_page(self, urls, session): """ Use asyncio.gather to run multiple requests concurrently by executing `self._request_proxy_page`. @@ -84,16 +88,25 @@ async def _test_all_proxies(self): Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`. """ # Use asyncio.gather to run multiple tests concurrently - to_filter = [] + raw = [] while not self._queue_filter.empty(): _target = self._queue_filter.get() _target['proxy'] = f"{_target['protocol'].replace('https', 'http')}://{_target['ip']}:{_target['port']}" - to_filter.append(_target) + raw.append(_target) # Remove duplicate entries - to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))] + to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in raw]))] + self.logger.info(f"[aProxyRelay] Found ({int(len(raw)) - int(len(to_filter))}) duplicates which have been removed") tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter] - await gather(*tasks) + # We have to chunk our tasks, otherwise the internet bandwitdh might be compromised + chunks = self._chunk_list(tasks, 10000) + i = 0 + for chunk in chunks: + self.logger.info(f"[aProxyRelay] Brewing ({i}/{len(tasks)}) ... please wait ...") + i += int(len(chunk)) + # Use asyncio.gather to concurrently execute all tasks + await gather(*chunk) + # await gather(*tasks) async def _test_proxy_link(self, proxy_url, data) -> None: """ From 1f06271038a60006c2803728ef17afdfaf729cde Mon Sep 17 00:00:00 2001 From: 0x78f1935 Date: Mon, 25 Nov 2024 14:46:13 +0100 Subject: [PATCH 2/3] DELETE: Console logger --- aproxyrelay/__init__.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/aproxyrelay/__init__.py b/aproxyrelay/__init__.py index a8e82c4..c17dac4 100644 --- a/aproxyrelay/__init__.py +++ b/aproxyrelay/__init__.py @@ -61,11 +61,6 @@ def __init__( # Configure the logger logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG) self.logger = logging.getLogger(__name__) - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.DEBUG if debug else logging.INFO) - formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - console_handler.setFormatter(formatter) - self.logger.addHandler(console_handler) # Initialize Core AProxyRelayCore.__init__(self) From 9cc0d2ca05c261428bd818ebf482abde95541f10 Mon Sep 17 00:00:00 2001 From: 0x78f1935 Date: Mon, 25 Nov 2024 14:46:22 +0100 Subject: [PATCH 3/3] ADD: Additional log statements --- aproxyrelay/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aproxyrelay/core.py b/aproxyrelay/core.py index 4979380..793cbcc 100644 --- a/aproxyrelay/core.py +++ b/aproxyrelay/core.py @@ -73,12 +73,14 @@ async def get_proxies(self) -> None: if self.scrape: async with ClientSession(conn_timeout=self.timeout) as session: await self._fetch_proxy_page(scrapes, session) - self.logger.info(f'[aProxyRelay] Scraper: Found {self._queue_filter.qsize()} competent proxy servers') + self.logger.info(f'[aProxyRelay] Scraper: Found ({self._queue_filter.qsize()}) competent proxy servers') else: self.logger.info('[aProxyRelay] Scraper: Skip discovery of new proxy servers ...') if self.filter and self.scrape: self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950 + self.logger.info(f'[aProxyRelay] Keep an eye on "pending task name" once it reaches ({self._queue_filter.qsize()}) all tests have been completed') # noqa: B950 + self.logger.info('[aProxyRelay] Grab some coffee ... please wait ...') await self._test_all_proxies() self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950 else: