批量爬取中国大学MOOC网站的媒体资源

质量声明:原创文章,内容质量问题请评论吐槽。如对您产生干扰,可私信删除。
主要参考:https://github.com/Dayunxi/getMOOCmedia


三点说明:

  1. 感谢 中国大学MOOC 平台和众多高校老师的课程分享!你们的努力,让我们在互联网时代接触到了更加丰富的世界。MOOC网站提供了多平台的客户端,完全满足日常的学习需要,随时随地交流讨论。爬取资源实在是没有必要!源码只供学习交流,后期不再维护
  2. 这个爬虫脚本是我在2018年暑假学习Python的时候,随手练习的第一个小Demo,程序主体Fork自 Dayunxi ,感谢分享!本来作者有个博客说明,但最近几天我在做项目整理的时候发现链接(http://www.adamyt.com/blog/20170323-getMOOCpdf/)已经无法访问了,比较遗憾。
  3. 我实现的这个爬虫脚本,与原版相比做了很大改动,但由于近期我把源码整理到了Github仓库的二级目录下边(链接在这里),不能被Github识别搜索了,索性写篇说明,以供参考。截至目前(2019/09/04),脚本运行正常。

摘要: 通过requests库爬取中国大学MOOC网站, 以正则表达式re解析网页, 实现批量视频/课件下载.



第三方依赖

  • requests、urllib:爬取页面
  • re:解析页面
  • prettytable:美化输出

功能描述

根据输入的课程关键字,在中国大学MOOC网站上进行检索(包括停止课程),列出检索结果后,显示选定课程的简介,如需下载视频或课件,则可根据提示操作。运行截图如下:

主要逻辑

根据关键字爬取搜索页面

  • url是Dayunxi 抓包得到的,截至目前(2019/09/04)仍然可用。
  • 这里只用到基本的 requests 库,即可实现页面爬取,简单明了。
def search_course(keyword, pageIndex=1):
    url = 'https://www.icourse163.org/dwr/call/plaincall/MocSearchBean.searchMocCourse.dwr'
    status = 30
    pageSize = 20
    data = {'callCount': '1',
            'scriptSessionId': '${scriptSessionId}190',
            'httpSessionId': 'bd4f183dd74746aa83b2cced56a0795b',
            'c0-scriptName': 'MocSearchBean',
            'c0-methodName': 'searchMocCourse',
            'c0-id': '0',
            'c0-e1': 'string:' + quote(keyword),
            'c0-e2': 'number:{}'.format(pageIndex),
            'c0-e3': 'boolean:true',
            'c0-e4': 'null:null',
            'c0-e5': 'number:0',
            'c0-e6': 'number:{}'.format(status),  # 0-已结束; 10-正在进行; 20-即将开始; 30-所有课程
            'c0-e7': 'number:{}'.format(pageSize),
            'c0-param0': 'Object_Object:{keyword:reference:c0-e1,pageIndex:reference:c0-e2,highlight:reference:c0-e3,categoryId:reference:c0-e4,orderBy:reference:c0-e5,stats:reference:c0-e6,pageSize:reference:c0-e7}',
            'batchId': '1528898317310'}
    try:
        r = requests.post(url, headers=headers, data=data)
        r.raise_for_status()
        # test.detect_encoding(r) # 检测到响应的编码时'ascii'
        page = r.text.encode('utf-8').decode('unicode_escape')  # 解码为 unicode_escape 便于print将汉字打印输出
        # test.outputHTML(page, '搜索页面第 ' + str(pageIndex) + ' 页') # 测试所用
        return page
    except requests.HTTPError as ex:
        print('课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex))
        raise

解析搜索页面

通过正则表达式查找,得到搜索结果统计、课程信息和课程状态,相关信息有:

  • 总页数、当前页码、课程总数
  • 课程名、授课教师、所在院校
  • 结束时间、参加人数、课程介绍、开始时间
def parse_search(page):
    # 页面信息解析
    global pageIndex, totleCount, totlePageCount, curPageCount
    # 搜索结果统计
    re_pageInfo = r'pageIndex=(\d+);.*totleCount=(\d+);.*totlePageCount=(\d+);'
    list_pageInfo = re.findall(re_pageInfo, page[-10000:])  # 得到一个多维列表形式的匹配结果
    if len(list_pageInfo) == 0:
        print("未爬取到相关信息,请根据搜索页面修正 Regular Expression")
        test.outputHTML(searchPage, "搜索页面")
        return None, None
    pageIndex = int(list_pageInfo[0][0])
    totleCount = int(list_pageInfo[0][1])
    totlePageCount = int(list_pageInfo[0][2])
    # 课程信息解析
    # 0 - cid(无用); 1 - 课程名; 2 - 授课教师; 3 - 院校; 4 - tid,termId
    page = re.sub(r'({##)|(##})', '', page)  # 删除page中的#{}符号
    re_courseInfo = r'cid=(\d+);.*highlightName="(.+)";.*highlightTeacherNames="(.+)";.*highlightUniversity="(.+)";' \
                    r'.+\W{0,4}.+currentTermId=(\d+);'
    list_courseInfo = re.findall(re_courseInfo, page)
    # 课程状态解析
    # 0 - 结束时间; 1 - 参加人数; 2 - 介绍 3 - 开始时间;
    re_courseStat = r'endTime=(\d+);.*?enrollCount=(\d+);.*?jsonContent="(.+[\s\S]{0,120}.+)";.*startTime=(\d+);'
    list_courseStat = re.findall(re_courseStat, page)
    curPageCount = len(list_courseInfo)
    return list_courseInfo, list_courseStat

根据选定课程爬取资源页面

  • 原本我是抓包得到的url,但用了一段时间发现失效了,怀疑是MOOC改版了。只能通过查getLastLearnedMocTermDto.dwr得到资源列表,所以依然用Dayunxi的链接吧。
  • 这里也是只用到基本的 requests 库。
def get_source_list(tid):
    url = 'http://www.icourse163.org/dwr/call/plaincall/CourseBean.getMocTermDto.dwr'  
    data = {'callCount': '1',
            'scriptSessionId': '${scriptSessionId}190',
            'c0-scriptName': 'CourseBean',
            'c0-methodName': 'getMocTermDto',
            'c0-id': 0,
            'c0-param0': 'number:' + tid,  # tid,termId
            'c0-param1': 'number:1',
            'c0-param2': 'boolean:true',
            'batchId': unixtime.now()}
    try:
        r = requests.post(url, headers=headers, data=data)
        r.raise_for_status()
        # test.detect_encoding(r) # 检测到响应的编码时'ascii'
        page = r.text.encode('utf-8').decode('unicode_escape')  
        return page
    except requests.HTTPError as ex:
        print('>>> 课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex))
        raise

解析资源页面

通过正则表达式查找,得到资源的下载链接列表。

def parse_source(page, sourceType):
    # 3代表文档,1代表视频
    ch = '段视频' if sourceType is 1 else '份课件'
    # 0 - cid; 1 - id; 2 - name
    re_sourceList = r'anchorQuestions=.*contentId=(\d*);.*contentType={};.*id=(\d*);.*name="(.*)";'.format(
        sourceType)
    sourceList = re.findall(re_sourceList, page)
    if not sourceList:
        print('>>> Source List is Empty!')
    else:
        print('>>> 本课程共有', len(sourceList), ch, end=',')
    return sourceList

批量下载

选择路径

def select_direction(courseName):
    currentDir = os.getcwd()
    currentDir = currentDir.replace("\\", "/") # 美化显示
    path = input(f'>>> 请输入保存路径:(默认在当前路径{currentDir}下创建"{courseName}"文件夹)\n>>> ')  # 获得当前文件夹
    if not path:
        path = currentDir + "/" + courseName
    if not os.path.isdir(path):  # 检测是否是文件夹
        os.mkdir(path)  # 在当前目录下创建文件夹,path = 相对路径
    return path

选择文件类型

if sourceType is 1:  # 视频下载
    qualityList = ['Hd', 'Sd', 'Shd', 'Hd', 'Sd', 'Shd']
    formatList = ['flv', 'flv', 'flv', 'mp4', 'mp4', 'mp4']
    while True:
        index = input('>>> 请选择视频格式:\n\t0-FLV高清,1-FLV标清,2-FLV超清\n\t3-MP4高清,4-MP4标清,5-MP4超清\n>>> ')
        if re.match(r'\d', index):
            index = int(index)  # 将字符串数字转为数值
            if 0 <= index <= 5:
                quality = qualityList[index]
                fileFormat = formatList[index]
                break
        else:
            print('>>> 选择错误!')
else:
    quality = None
    fileFormat = 'pdf'

解析下载链接

def get_download_info(dataList, sourceType, Quality=None, fileFormat=None):
	# 1代表视频, 3代表文档
    url = 'http://www.icourse163.org/dwr/call/plaincall/CourseBean.getLessonUnitLearnVo.dwr'
    content_id = dataList[0]
    file_id = dataList[1]
    file_name = re.sub(r'[/\\*|<>:?"]', '', dataList[2])  # 移除Windows文件名非法字符
    data = {'callCount': '1',
            'scriptSessionId': '${scriptSessionId}190',
            'c0-scriptName': 'CourseBean',
            'c0-methodName': 'getLessonUnitLearnVo',
            'c0-id': '0',
            'c0-param0': 'number:' + content_id,  # contentId
            'c0-param1': 'number:{}'.format(sourceType),
            'c0-param2': 'number:0',
            'c0-param3': 'number:' + file_id,  # 文件id
            'batchId': unixtime.now()}
    try:
        r = requests.post(url, headers=headers, data=data)
        r.raise_for_status()
        page = r.text
        # test.outputHTML(page,'下载链接')
    except requests.HTTPError as ex:
        print('课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex))
        raise
    if Quality:  # 进行视频文件的解析
        re_videoLink = r'{}{}Url="(.+?)";'.format(fileFormat, Quality)
        video_url = re.findall(re_videoLink, page)
        re_srtLink = r's\d+\.name="([\w\\]+?)";s\d+\.url="(.+?)";'
        srt_url = re.findall(re_srtLink, page)
        if video_url:
            if srt_url:
                return [video_url[0], srt_url[0][1]], file_name
            else:
                return [video_url[0]], file_name
        else:
            return [], file_name

文件下载

  • 区分下载模式,小文件(pdf/字幕)直接下载,大文件(视频)分块下载
  • 允许用户中断下载,Kill Shell即可,通常是输入^C
def download(url, direction, fileName, fileType, mode="bigfile"):
    # 文件的绝对路径,如 D:\Program Files\Python36\python.exe
    abs_fileName = '{}/{}.{}'.format(direction, fileName, fileType)
    renameCount = 0
    while True:  # 检查是否重名
        if os.path.exists(abs_fileName):
            renameCount += 1
            abs_fileName = '{}/{}-{}.{}'.format(direction, fileName, renameCount, fileType)
        else:
            break
    # 小文件模式:直接下载
    if mode is not 'bigfile':
        try:
            r = requests.get(url)
            r.raise_for_status()
            with open(abs_fileName, 'wb') as file:
                file.write(r.content)
        except requests.HTTPError as ex:
            print('[-]ERROR: %s' % ex)
        except KeyboardInterrupt:
            os.remove(abs_fileName)
            raise
        return
    # 大文件模式:分块下载
    try:
        r = requests.get(url, stream=True)
        r.raise_for_status()
        if 'Content-Length' not in r.headers:
            raise requests.HTTPError('No Content Length')
        file_size = int(r.headers['Content-Length'])   # 文件大小:B
        if file_size < 10 * 1024 * 1024:
            chunk_size = 1024 * 1024    # 分块大小 B
        else:
            chunk_size = 3 * 1024 * 1024
        download_size = 0   # 已下载大小:B
        with open(abs_fileName, 'wb') as file:
            for chunk in r.iter_content(chunk_size=chunk_size):
                progress = download_size / file_size * 100  # 下载进度
                prompt_bar = '[{:50}] {:.1f}%\tSize: {:.2f}MB'.format(
                    '=' * int(progress / 2), progress, download_size / 1024 / 1024)
                print(prompt_bar, end='\r')  # \r 代表打印头归位,回到某一行的开头
                file.write(chunk)
                download_size += chunk_size
            print('[{:50}] 100% Done!\tSize: {:.2f}MB'.format('=' * 50, file_size / 1024 / 1024))
    except error.HTTPError as ex:
        print('[-]ERROR: %s' % ex)
    except KeyboardInterrupt:
        os.remove(path)
        raise

最后,再次附上完整源码:停止维护,只供参考

全部评论

相关推荐

头像
点赞 评论 收藏
转发
点赞 1 评论
分享
牛客网
牛客企业服务