Myluzh Blog

Strive to become a dream architect.

AWVS批量添加扫描站点 python3脚本

发布时间: 2021-5-9 文章作者: myluzh 分类名称: Python


经测试在AWVS13、AWVS14中可用。
说明;
1、self.scanner替换成自己AWVS地址
2、self.api替换成自己AWVS里的APIkey
3、self.speed设置扫描的速度(sequential|slow|moderate|fast),默认为fast

在脚本同目录下新建一个awvs.txt的文件,在里面导入你要批量扫描的网址,运行脚本即可。
import json
import queue
import requests

requests.packages.urllib3.disable_warnings()


def run():
    print('Github:https://github.com/BetterDefender/AwvsBatchImport.git')
    print('Author:BetterDefender')
    print('Version:1.1')


class AwvsScan(object):
    def __init__(self):
        self.scanner = 'https://xxx.xxx.xxx.xxx:3443'  # Modify URL
        self.api = '1986ad8c0a5b3df4d7028d5f3c06x936c60b6ecab14de49189bd3d8497feec371'  # Modify API
        self.ScanMode = '11111111-1111-1111-1111-111111111111'  # ScanMode
        self.headers = {'X-Auth': self.api, 'content-type': 'application/json'}
        self.targets_id = queue.Queue()
        self.scan_id = queue.Queue()
        self.site = queue.Queue()
        self.speed = 'fast'  # 修改扫描速度为sequential|slow|moderate|fast即可,默认为fast

    def main(self):
        print("")
        print("|" + '=' * 35 + "|")
        print("|Please select the function to use:|")
        print(
            """|  1.Add scan task using awvs.txt   |\n|  2.Delete all tasks               |\n|  3.Regulae all tasks's speed |""")
        print("|" + '=' * 35 + "|")
        choice = input(">")
        if choice == '1':
            self.scans()
        if choice == '2':
            self.del_targets()
        if choice == '3':
            self.speed_regulate()
        self.main()

    def openfile(self):
        with open('awvs.txt') as cent:
            for web_site in cent:
                web_site = web_site.strip('\n\r')
                self.site.put(web_site)

    def targets(self):
        self.openfile()
        while not self.site.empty():
            website = self.site.get()
            try:
                data = {'address': website,
                        'description': 'awvs-auto',
                        'criticality': '10'}
                response = requests.post(self.scanner + '/api/v1/targets', data=json.dumps(data), headers=self.headers,
                                         verify=False)
                cent = json.loads(response.content)
                target_id = cent['target_id']  # 获取任务target_id
                self.targets_id.put(target_id)
            except Exception as e:
                print('Error:Target is not website! {}'.format(website))
                print("Please check if the URL in awvs.txt is correct!")
                exit()

    def scans(self):
        self.targets()
        while not self.targets_id.empty():
            data = {'target_id': self.targets_id.get(),
                    'profile_id': self.ScanMode,
                    'schedule': {'disable': False, 'start_date': None, 'time_sensitive': False}}

            response = requests.post(self.scanner + '/api/v1/scans', data=json.dumps(data), headers=self.headers,
                                     allow_redirects=False, verify=False)
            if response.status_code == 201:
                cent = response.headers['Location'].replace('/api/v1/scans/', '')
                # print(cent)

    def get_targets_id(self):
        response = requests.get(self.scanner + "/api/v1/targets", headers=self.headers, verify=False)
        content = json.loads(response.content)
        for cent in content['targets']:
            self.targets_id.put([cent['address'], cent['target_id']])

    def del_targets(self):
        while True:
            self.get_targets_id()
            if self.targets_id.qsize() == 0:
                break
            else:
                while not self.targets_id.empty():
                    targets_info = self.targets_id.get()
                    response = requests.delete(self.scanner + "/api/v1/targets/" + targets_info[1],
                                               headers=self.headers, verify=False)
                    if response.status_code == 204:
                        print('delete targets {}'.format(targets_info[0]))

    def speed_regulate(self):
        while True:
            self.get_targets_id()
            if self.targets_id.qsize() == 0:
                break
            else:
                if not self.targets_id.empty():
                    for i in range(self.targets_id.qsize()):
                        targets_info = self.targets_id.get()
                        data = {'scan_speed': self.speed}
                        response = requests.patch(
                            self.scanner + "/api/v1/targets/" + targets_info[1] + '/configuration',
                            data=json.dumps(data), headers=self.headers, verify=False)
                        if response.status_code == 204:
                            print('Regulate targets {}'.format(targets_info[0]))
                    break


if __name__ == '__main__':
    run()
    Scan = AwvsScan()
    Scan.main() 

标签: awvs

发表评论