Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions aproxyrelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ def __init__(
# Configure the logger
logging.basicConfig(level=logging.INFO if not debug else logging.DEBUG)
self.logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG if debug else logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

# Initialize Core
AProxyRelayCore.__init__(self)
Expand Down
4 changes: 3 additions & 1 deletion aproxyrelay/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ async def get_proxies(self) -> None:
if self.scrape:
async with ClientSession(conn_timeout=self.timeout) as session:
await self._fetch_proxy_page(scrapes, session)
self.logger.info(f'[aProxyRelay] Scraper: Found {self._queue_filter.qsize()} competent proxy servers')
self.logger.info(f'[aProxyRelay] Scraper: Found ({self._queue_filter.qsize()}) competent proxy servers')
else:
self.logger.info('[aProxyRelay] Scraper: Skip discovery of new proxy servers ...')

if self.filter and self.scrape:
self.logger.info(f'[aProxyRelay] Validating: Proxies ({self._queue_filter.qsize()}), checking if proxies meet connection requirements ...') # noqa: B950
self.logger.info(f'[aProxyRelay] Keep an eye on "pending task name" once it reaches ({self._queue_filter.qsize()}) all tests have been completed') # noqa: B950
self.logger.info('[aProxyRelay] Grab some coffee ... please wait ...')
await self._test_all_proxies()
self.logger.info(f'[aProxyRelay] Filter: Found {self._filtered_failed} incompetent and {self._filtered_available} available proxy servers in {datetime.now(UTC) - self.started}') # noqa: B950
else:
Expand Down
16 changes: 14 additions & 2 deletions aproxyrelay/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def __init__(self) -> None:
"""
self._queue_result = Queue() # Holds target results

def _chunk_list(self, lst, chunk_size):
"""Chunks a list in its desired size"""
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

async def _process_targets_main(self) -> None:
"""
Start the Proxy Relay Processor. Proxies in the queue are nothing less than burners.
Expand All @@ -41,8 +45,16 @@ async def _process_targets_main(self) -> None:
# Append the coroutine object to the tasks list
tasks.append(self._obtain_targets(proxy, target))

# Use asyncio.gather to concurrently execute all tasks
await gather(*tasks)
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
chunks = self._chunk_list(tasks, 10000)
i = 0
for chunk in chunks:
self.logger.info(f"[aProxyRelay] Processing ({i}/{len(tasks)}) ... please wait ...")
i += int(len(chunk))
# Use asyncio.gather to concurrently execute all tasks
await gather(*chunk)
# # Use asyncio.gather to concurrently execute all tasks
# await gather(*tasks)

self.logger.info(f'[aProxyRelay] Processing ({self._queue_target_process.qsize()}) items in Queue using ({self.proxies.qsize()}) proxies ... Please wait...') # noqa: B950

Expand Down
21 changes: 17 additions & 4 deletions aproxyrelay/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def __init__(self) -> None:
"""
self.logger.info("[aProxyRelay] Request module initialized!")

def _chunk_list(self, lst, chunk_size):
"""Chunks a list in its desired size"""
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

async def _fetch_proxy_page(self, urls, session):
"""
Use asyncio.gather to run multiple requests concurrently by executing `self._request_proxy_page`.
Expand Down Expand Up @@ -84,16 +88,25 @@ async def _test_all_proxies(self):
Use asyncio.gather to run multiple requests concurrently by executing `self._test_proxy_link`.
"""
# Use asyncio.gather to run multiple tests concurrently
to_filter = []
raw = []
while not self._queue_filter.empty():
_target = self._queue_filter.get()
_target['proxy'] = f"{_target['protocol'].replace('https', 'http')}://{_target['ip']}:{_target['port']}"
to_filter.append(_target)
raw.append(_target)

# Remove duplicate entries
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in to_filter]))]
to_filter = [dict(x) for x in list(set([tuple(item.items()) for item in raw]))]
self.logger.info(f"[aProxyRelay] Found ({int(len(raw)) - int(len(to_filter))}) duplicates which have been removed")
tasks = [self._test_proxy_link(proxy['proxy'], proxy) for proxy in to_filter]
await gather(*tasks)
# We have to chunk our tasks, otherwise the internet bandwitdh might be compromised
chunks = self._chunk_list(tasks, 10000)
i = 0
for chunk in chunks:
self.logger.info(f"[aProxyRelay] Brewing ({i}/{len(tasks)}) ... please wait ...")
i += int(len(chunk))
# Use asyncio.gather to concurrently execute all tasks
await gather(*chunk)
# await gather(*tasks)

async def _test_proxy_link(self, proxy_url, data) -> None:
"""
Expand Down
Loading