|
1 | 1 | import requests
|
2 |
| -import json |
3 | 2 | import configparser
|
| 3 | +import json |
| 4 | +import os |
4 | 5 | import argparse
|
5 |
| -import codecs |
6 | 6 |
|
7 |
| -def REFRESH_USER_ACCESS_TOKEN(token=None, refresh=None): |
| 7 | +def REFRESH_USER_ACCESS_TOKEN(app_access_token=None, refresh_token=None): |
| 8 | + # 配置文件路径 |
| 9 | + config_path='feishu-config.ini' |
| 10 | + |
| 11 | + # 确保配置文件存在 |
| 12 | + if not os.path.exists(config_path): |
| 13 | + with open(config_path, 'w') as f: |
| 14 | + pass |
| 15 | + |
| 16 | + # 读取配置文件 |
8 | 17 | config = configparser.ConfigParser()
|
9 |
| - with codecs.open('feishu-config.ini', 'r', encoding='utf-8') as f: |
10 |
| - config.read_file(f) |
| 18 | + config.read(config_path, encoding='utf-8') |
11 | 19 |
|
| 20 | + # 确保 TOKEN 部分存在 |
12 | 21 | if not config.has_section('TOKEN'):
|
13 | 22 | config.add_section('TOKEN')
|
14 | 23 |
|
15 |
| - if 'app_access_token' not in config['TOKEN']: # Use 'app_access_token' instead of 'app_token' |
16 |
| - config.set('TOKEN', 'app_access_token', '') |
17 |
| - |
18 |
| - if 'user_access_token' not in config['TOKEN']: |
19 |
| - config.set('TOKEN', 'user_access_token', '') |
20 |
| - |
21 |
| - with codecs.open('feishu-config.ini', 'w', encoding='utf-8') as configfile: |
22 |
| - config.write(configfile) |
| 24 | + # 从配置文件或者参数获取令牌 |
| 25 | + app_access_token = app_access_token or config.get('TOKEN', 'app_access_token', fallback=None) |
| 26 | + refresh_token = refresh_token or config.get('TOKEN', 'refresh_token', fallback=None) |
23 | 27 |
|
24 |
| - token = config.get('TOKEN', 'app_access_token') if token is None else token # Use 'app_access_token' instead of 'app_token' |
25 |
| - print(f"Using token: {token}") |
26 |
| - refresh = config.get('TOKEN', 'user_access_token') if refresh is None else refresh |
27 |
| - print(f"Using refresh token: {refresh}") |
| 28 | + # 如果任一令牌不存在,需要用户先获取它 |
| 29 | + if not app_access_token or not refresh_token: |
| 30 | + raise ValueError("Both app_access_token and refresh_token are required. Please get them first.") |
28 | 31 |
|
| 32 | + # 构建请求URL和请求头 |
29 | 33 | url = "https://open.feishu.cn/open-apis/authen/v1/refresh_access_token"
|
30 |
| - headers = {"Content-Type": "application/json"} |
| 34 | + headers = { |
| 35 | + 'Content-Type': 'application/json; charset=utf-8', |
| 36 | + } |
31 | 37 |
|
| 38 | + # 构建请求体 |
32 | 39 | payload = {
|
33 |
| - "app_access_token": token, |
34 | 40 | "grant_type": "refresh_token",
|
35 |
| - "refresh_token": refresh |
| 41 | + "refresh_token": refresh_token, |
| 42 | + "app_access_token": app_access_token |
36 | 43 | }
|
37 | 44 |
|
| 45 | + # 发起请求 |
38 | 46 | response = requests.post(url, headers=headers, data=json.dumps(payload))
|
39 |
| - data = response.json() |
40 |
| - if data["code"] != 0: |
41 |
| - print(f"Error: {data['msg']}") |
| 47 | + response_json = response.json() |
| 48 | + |
| 49 | + # 检查响应状态码 |
| 50 | + if response.status_code != 200: |
| 51 | + print(f"HTTP Error: {response_json}") |
| 52 | + return None |
| 53 | + |
| 54 | + # 检查响应体中的 code |
| 55 | + if response_json.get('code') != 0: |
| 56 | + print(f"Response Error: {response_json}") |
42 | 57 | return None
|
43 | 58 |
|
44 |
| - user_access_token = data['data']['user_access_token'] |
45 |
| - print(f"Refreshed User Access Token: {user_access_token}") |
46 |
| - |
47 |
| - config.set('TOKEN', 'user_access_token', user_access_token) |
48 |
| - with codecs.open('feishu-config.ini', 'w', encoding='utf-8') as configfile: |
49 |
| - config.write(configfile) |
| 59 | + # 更新配置文件 |
| 60 | + if 'data' in response_json and 'access_token' in response_json['data']: |
| 61 | + config.set('TOKEN', 'user_access_token', response_json['data']['access_token']) |
| 62 | + config.set('TOKEN', 'refresh_token', response_json['data']['refresh_token']) |
| 63 | + with open(config_path, 'w', encoding='utf-8') as configfile: |
| 64 | + config.write(configfile) |
50 | 65 |
|
51 |
| - return user_access_token |
| 66 | + return response_json.get('data', {}).get('access_token') |
52 | 67 |
|
53 | 68 | def REFRESH_USER_ACCESS_TOKEN_CMD():
|
54 | 69 | parser = argparse.ArgumentParser(description='Refresh user access token.')
|
55 |
| - parser.add_argument('-t', '--token', type=str, help='The token.') |
| 70 | + parser.add_argument('-a', '--app_token', type=str, help='The app access token.') |
56 | 71 | parser.add_argument('-r', '--refresh', type=str, help='The refresh token.')
|
57 | 72 | args = parser.parse_args()
|
58 | 73 |
|
59 |
| - user_access_token = REFRESH_USER_ACCESS_TOKEN(args.token, args.refresh) |
| 74 | + user_access_token = REFRESH_USER_ACCESS_TOKEN(args.app_token, args.refresh) |
60 | 75 | print(f'User Access Token: {user_access_token}')
|
61 | 76 |
|
62 |
| -if __name__ == '__main__': |
| 77 | +if __name__ == "__main__": |
63 | 78 | REFRESH_USER_ACCESS_TOKEN_CMD()
|
0 commit comments