作为一个RedTeam,经常会在GitHub上找到一些很有用的项目并将其标记为star。然而,有时这些仓库可能会突然被删除,导致无法再访问这些宝贵的资源。为了解决这个问题,我制作了一个能够备份标星项目到本地的脚本。
特性:
自动获取在GitHub上标星的所有项目。
备份项目的源代码、发布版本和附件到您的本地。
自动同步最新代码,为防止项目删除,自动压缩备份代码到back目录,并实现备份的自动清理(保留最新10条备份)。
环境要求
Linux python 3x 由于文件系统特性,不支持Windows (部分项目文件符号会导致报错,大部分情况可备份项目)
安装依赖copypip install gitpython PyGithub
from git import Repo
from github import Github
import os
import requests
import shutil
import zipfile
import time
import logging
from concurrent.futures import ThreadPoolExecutor
def setup_logging():
log_filename = time.strftime('%Y-%m-%d-%H-%M-%S') + '.log'
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
err_handler = logging.FileHandler('err.log')
err_handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(err_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(console_handler)
def backup_source(repo_name):
try:
backup_dir = os.path.join(repo_name, 'backup')
os.makedirs(backup_dir, exist_ok=True)
backup_name = time.strftime('%Y-%m-%d') + '.zip'
backup_path = os.path.join(backup_dir, backup_name)
if os.path.exists(backup_path):
print(f'Today\'s backup for {repo_name} already exists. Skipping backup...')
logging.info(f'Today\'s backup for {repo_name} already exists. Skipping backup...')
else:
with zipfile.ZipFile(backup_path, 'w') as zipf:
for root, _, files in os.walk(os.path.join(repo_name, 'source')):
for file in files:
zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(repo_name, 'source')))
backups = sorted(os.listdir(backup_dir), reverse=True)
for backup in backups[10:]:
os.remove(os.path.join(backup_dir, backup))
except Exception as e:
logging.error(f'Error backing up {repo_name}: {str(e)}')
def download_release_assets(repo, repo_path):
max_retries = 10
releases = list(repo.get_releases())[:10]
for release in releases:
release_date_folder = os.path.join(repo_path, 'assets', release.published_at.strftime('%Y-%m-%d') + '-' + release.tag_name)
os.makedirs(release_date_folder, exist_ok=True)
assets = release.get_assets()
for asset in assets:
asset_path = os.path.join(release_date_folder, asset.name)
if os.path.exists(asset_path):
print(f"Asset {asset.name} already exists. Skipping download...")
logging.info(f"Asset {asset.name} already exists. Skipping download...")
continue
for attempt in range(max_retries):
try:
r = requests.get(asset.browser_download_url, stream=True)
with open(asset_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
break # break the loop if download successful
except Exception as e:
logging.warning(f'Retry {attempt + 1}/{max_retries} for Asset {asset.name} failed with error: {str(e)}')
if attempt + 1 == max_retries: # If failed all attempts
logging.error(f'Failed to download asset {asset.name} after {max_retries} retries. Writing to fail.txt.')
print(f'Failed to download asset {asset.name}. Writing to download_release_assets_fail.txt.')
with open('download_release_assets_fail.txt', 'a') as fail_file: # Open file in append mode
fail_file.write(f'Asset {asset.name} from repo {repo.full_name} failed to download with error: {str(e)}\n')
time.sleep(3)
def download_and_sync_repo(repo):
repo_name = repo.name
repo_path = os.path.join(repo_name, 'source')
try:
if os.path.exists(repo_name):
backup_source(repo_name)
local_repo = Repo(repo_path)
print(f'Syncing {repo.full_name} to latest...')
logging.info(f'Syncing {repo.full_name} to latest...')
local_repo.remotes.origin.pull()
else:
print(f'Cloning {repo.full_name}...')
logging.info(f'Cloning {repo.full_name}...')
os.makedirs(repo_path, exist_ok=True)
Repo.clone_from(repo.clone_url, repo_path)
print(f'Downloading releases and assets for {repo.full_name}...')
logging.info(f'Downloading releases and assets for {repo.full_name}...')
download_release_assets(repo, repo_name)
except Exception as e:
logging.error(f'Error processing {repo.full_name}: {str(e)}')
def process_repo(repo):
max_retries = 10 # Set maximum retries
for attempt in range(max_retries):
try:
download_and_sync_repo(repo)
break # If successful, exit loop
except Exception as e:
logging.warning(f'Retry {attempt + 1}/{max_retries} for {repo.full_name} failed with error: {str(e)}')
if attempt + 1 == max_retries: # If failed all attempts
logging.error(f'Failed to process {repo.full_name} after {max_retries} retries. Writing to process_repo_fail.txt.')
print(f'Failed to process {repo.full_name}. Writing to process_repo_fail.txt.')
with open('process_repo_fail.txt', 'a') as fail_file: # Open file in append mode
fail_file.write(f'Repo {repo.full_name} failed with error: {str(e)}\n')
time.sleep(3)
def main():
setup_logging()
GITHUB_TOKEN = 'xxxxx'
g = Github(GITHUB_TOKEN)
user = g.get_user()
starred_repos = user.get_starred()
with ThreadPoolExecutor() as executor:
for repo in starred_repos:
executor.submit(process_repo, repo)
print('Done!')
logging.info('Done!')
if __name__ == '__main__':
main()