Skip to content
This repository was archived by the owner on Feb 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ redis==2.10.5
giturl.py==0.2.1
GitPython==2.1.5
scancode-toolkit==2.0.1
requests==2.12.4

# for testing
pytest
pytest-django
pytest-django



34 changes: 34 additions & 0 deletions scanapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@

import json
import logging
import os
import subprocess
import tarfile
import urllib
from cStringIO import StringIO
from urlparse import urlparse

import StringIO
import requests
import zipfile
from django.utils import timezone

from scanapp.celery import app
Expand Down Expand Up @@ -59,6 +66,33 @@ def scan_code_async(url, scan_id, path, file_name):
apply_scan_async.delay(path, scan_id)


@app.task
def handle_archive_url(url, scan_id, path, file_name):
"""
Create and save a file at `path` present at `url` using `scan_id` and bare `path` and
`file_name` and apply the scan.
"""
r = requests.get(url)
path = path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is path repeated here?

url_parse = urlparse(url)
os.chdir(path)

if r.status_code == 200:
if url_parse.path.endswith('zip'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have code from extractcode in scancode that can do extractions on different archive types which may be helpful, for example https://github.com/nexB/scancode-toolkit/blob/develop/src/extractcode/extract.py#L101

z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()

else:
file_tmp = urllib.urlretrieve(url, filename=None)[0]
base_name = os.path.basename(url)

file_name, file_extension = os.path.splitext(base_name)
tar = tarfile.open(file_tmp)
tar.extractall(file_name)

apply_scan_async.delay(path, scan_id)


@app.task
def handle_special_urls(url, scan_id, path, host):
"""
Expand Down
31 changes: 29 additions & 2 deletions scanapp/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import logging
import os
import subprocess
from urlparse import urlparse

from django.contrib.auth.models import User
from django.core.files.storage import FileSystemStorage
Expand All @@ -48,6 +49,7 @@
from scanapp.serializers import AllModelSerializerHelper
from scanapp.tasks import apply_scan_async
from scanapp.tasks import create_scan_id
from scanapp.tasks import handle_archive_url
from scanapp.tasks import handle_special_urls
from scanapp.tasks import scan_code_async

Expand Down Expand Up @@ -152,6 +154,19 @@ def post(self, request, *args, **kwargs):
scan_start_time = timezone.now()
git_url_parser = GitURL(url)

allowed_exts = ('zip', 'tar', 'tar.gz', 'rar', 'tgz', 'tar.Z', 'tar.bz2',
'tbz2', 'tar.lzma', 'tlz', 'gz')
url_parse = urlparse(url)

is_zip_url = False

try:
for i in allowed_exts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have some code that identify whether or not files are archives or not. I will ask @pombredanne

if url_parse.path.endswith(i):
is_zip_url = True
finally:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is try-finally used?

logger.info('smooth work')

if git_url_parser.host == 'github.com':
file_name = git_url_parser.repo
scan_directory = file_name
Expand All @@ -163,6 +178,19 @@ def post(self, request, *args, **kwargs):

handle_special_urls.delay(url, scan_id, path, git_url_parser.host)
logger.info('git repo detected')

elif is_zip_url:
logger.info('zip url detected')
scan_directory = None
scan_id = create_scan_id(user, url, scan_directory, scan_start_time)
current_scan = Scan.objects.get(pk=scan_id)
path = '/'.join([path, '{}'.format(current_scan.pk)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use os.path.join() to ensure consistency when joining paths


os.makedirs(path)

file_name = '{}'.format(current_scan.pk)
handle_archive_url.delay(url, scan_id, path, file_name)

else:
scan_directory = None
scan_id = create_scan_id(user, url, scan_directory, scan_start_time)
Expand All @@ -174,10 +202,9 @@ def post(self, request, *args, **kwargs):
file_name = '{}'.format(current_scan.pk)
scan_code_async.delay(url, scan_id, path, file_name)

return HttpResponseRedirect('/resultscan/' + '{}'.format(current_scan.pk))
return HttpResponseRedirect('/resultscan/' + '{}'.format(current_scan.pk)) # API views


# API views
class ScanApiView(APIView):
def get(self, request, format=None, **kwargs):
scan_id = kwargs['pk']
Expand Down