Compare commits

..

143 Commits

Author SHA1 Message Date
Dot.L ca2682fb22 Merge pull request #311 from APIParkLab/feature/liujian-1.8
update docker run.sh
2025-05-26 19:33:51 +08:00
Liujian 2bd1d4a423 update docker run.sh 2025-05-26 19:32:10 +08:00
Dot.L b2baa711c2 Merge pull request #309 from APIParkLab/feature/1.8-cx
chore: Add annotations
2025-05-23 18:31:33 +08:00
Dot.L b55675e5a5 Merge pull request #308 from APIParkLab/feature/liujian-1.8
Feature/liujian 1.8
2025-05-23 18:24:32 +08:00
Liujian 9a33992a0b Merge remote-tracking branch 'github-pro/main' into feature/liujian-1.8
# Conflicts:
#	.gitlab-ci.yml
2025-05-23 18:23:52 +08:00
Liujian 07ae37eb5f update gitlab-ci.yml 2025-05-23 18:21:50 +08:00
Liujian c36726f25f update gitlab-ci.yml 2025-05-23 17:45:57 +08:00
Liujian e2e9abeb4c update gitlab-ci.yml 2025-05-23 16:19:28 +08:00
Liujian 0fcc2215f7 Merge remote-tracking branch 'origin/main' into main-github-pro 2025-05-23 15:42:51 +08:00
刘健 ce559c4643 Update .gitlab-ci.yml file 2025-05-23 15:30:54 +08:00
刘健 8d4b13f633 Update .gitlab-ci.yml file 2025-05-23 15:30:13 +08:00
Liujian 19a3378fa3 Merge branch 'main' into main-github-pro 2025-05-23 15:16:29 +08:00
Liujian cef1250199 Merge tag 'v1.8.0-beta' 2025-05-23 15:16:02 +08:00
刘健 3c85658931 Update .gitlab-ci.yml file 2025-05-23 15:05:39 +08:00
刘健 ba7022bc2d Update .gitlab-ci.yml file 2025-05-23 15:02:52 +08:00
刘健 fb24abc111 Update .gitlab-ci.yml file 2025-05-23 15:02:23 +08:00
刘健 0e3fb84e7c Update .gitlab-ci.yml file 2025-05-23 15:01:47 +08:00
刘健 5d6d949ca4 Update .gitlab-ci.yml file 2025-05-23 15:00:44 +08:00
刘健 3578182343 Update .gitlab-ci.yml file 2025-05-23 14:32:11 +08:00
刘健 28bad2d963 Update .gitlab-ci.yml file 2025-05-23 14:31:31 +08:00
刘健 384bd239fa Update .gitlab-ci.yml file 2025-05-23 14:21:58 +08:00
刘健 98710ad296 Update .gitlab-ci.yml file 2025-05-23 14:12:49 +08:00
刘健 4806e12907 Update .gitlab-ci.yml file 2025-05-23 14:12:09 +08:00
刘健 9097760a0f Update .gitlab-ci.yml file 2025-05-23 14:10:08 +08:00
刘健 a5639bff60 Update .gitlab-ci.yml file 2025-05-23 14:01:10 +08:00
刘健 1d66ed84f3 Update .gitlab-ci.yml file 2025-05-23 13:55:45 +08:00
刘健 249ac3ea1c Update .gitlab-ci.yml file 2025-05-23 13:55:22 +08:00
刘健 b02db8020d Update .gitlab-ci.yml file 2025-05-23 13:55:03 +08:00
刘健 4105540686 Update .gitlab-ci.yml file 2025-05-23 13:54:37 +08:00
刘健 36d10c5cfd Update .gitlab-ci.yml file 2025-05-23 13:54:17 +08:00
Dot.L f77bd76a14 Merge pull request #301 from APIParkLab/feature/liujian-1.8
Feature/liujian 1.8
2025-05-06 18:53:11 +08:00
ningyv cef548ce7d Merge pull request #299 from APIParkLab/feature/1.8-cx
Feature/1.8 cx
2025-05-06 18:52:12 +08:00
lichunxian 5efd19ef7c Merge branch 'feature/1.8-cx' into 'main'
feat: feature/1.8-Improve system observability

See merge request apipark/APIPark!377
2025-05-06 17:58:28 +08:00
lichunxian 28cd4fd91c Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!376
2025-05-06 16:06:16 +08:00
刘健 82f4089f42 Merge branch 'feature/liujian-1.8' into 'main'
update build.sh

See merge request apipark/APIPark!375
2025-05-06 16:02:17 +08:00
Liujian 00ef4d2cfc update build.sh 2025-05-06 16:01:51 +08:00
刘健 cd33448446 Merge branch 'feature/liujian-1.8' into 'main'
fix bug

See merge request apipark/APIPark!373
2025-05-06 15:31:23 +08:00
Liujian ddd70b0ff5 fix: log detail bug 2025-05-06 15:31:07 +08:00
lichunxian e5b50a7073 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!374
2025-05-06 14:41:00 +08:00
Liujian 5a1baadf3b fix bug 2025-05-06 14:38:54 +08:00
刘健 10bd352bf4 Merge branch 'feature/liujian-1.8' into 'main'
update data

See merge request apipark/APIPark!372
2025-05-06 14:12:01 +08:00
Liujian cbea45e6e0 update data 2025-05-06 14:11:36 +08:00
刘健 e5c6e4fa82 Merge branch 'feature/liujian-1.8' into 'main'
Feature/liujian 1.8

See merge request apipark/APIPark!371
2025-05-06 12:04:47 +08:00
Liujian f05457fd2c update data 2025-05-06 12:04:11 +08:00
Liujian bc6875fe9f Merge remote-tracking branch 'origin/feature/1.8-cx' into feature/liujian-1.8 2025-05-06 11:18:15 +08:00
lichunxian 1572e03dd1 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!370
2025-05-06 11:09:42 +08:00
刘健 61025763ed Merge branch 'feature/liujian-1.8' into 'main'
Feature/liujian 1.8

See merge request apipark/APIPark!369
2025-05-06 10:49:33 +08:00
Liujian ef1c48e395 Modify the monitoring table to return field types 2025-05-06 10:43:09 +08:00
lichunxian 00905e4167 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!368
2025-04-30 19:09:44 +08:00
Liujian 9572c4157e Merge remote-tracking branch 'github-pro/feature/1.8-cx' into feature/liujian-1.8 2025-04-30 18:55:09 +08:00
Liujian fef49eb32c tmp commit 2025-04-30 18:55:01 +08:00
lichunxian a5a895e42d Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!367
2025-04-30 17:18:00 +08:00
lichunxian ed1d19532b Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!366
2025-04-30 16:33:49 +08:00
Liujian 9c1b19a1c7 Merge remote-tracking branch 'github-pro/feature/1.8-cx' into feature/liujian-1.8 2025-04-30 15:57:59 +08:00
刘健 bf990517dc Merge branch 'feature/liujian-1.8' into 'main'
update service logs

See merge request apipark/APIPark!365
2025-04-30 15:53:11 +08:00
Liujian 0cf7f952e2 update service logs 2025-04-30 15:52:48 +08:00
lichunxian 83873c8c92 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!364
2025-04-30 15:49:50 +08:00
刘健 a61e6ba67f Merge branch 'feature/liujian-1.8' into 'main'
Optimize chart data

See merge request apipark/APIPark!363
2025-04-30 15:06:24 +08:00
Liujian 94d881cc18 Optimize chart data 2025-04-30 15:05:47 +08:00
刘健 e081580786 Merge branch 'feature/liujian-1.8' into 'main'
fix:ai token monitor bug

See merge request apipark/APIPark!362
2025-04-30 14:00:22 +08:00
Liujian 8927211ea2 fix:ai token monitor bug 2025-04-30 13:59:50 +08:00
lichunxian 813905ca40 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!361
2025-04-30 10:25:41 +08:00
lichunxian 66be761d18 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!360
2025-04-30 09:46:26 +08:00
lichunxian 6b6fa5bd40 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!359
2025-04-30 09:30:12 +08:00
lichunxian 943ef4f9b0 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!358
2025-04-30 09:20:32 +08:00
lichunxian 78d9a1c23c Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!357
2025-04-30 09:09:26 +08:00
刘健 342d022c43 Merge branch 'feature/liujian-1.8' into 'main'
Feature/liujian 1.8

See merge request apipark/APIPark!356
2025-04-30 00:24:33 +08:00
Liujian 1d36f4b821 fix monitor bug 2025-04-30 00:24:02 +08:00
Liujian 4e459168df Merge remote-tracking branch 'origin/feature/1.8-cx' into feature/liujian-1.8 2025-04-29 22:56:28 +08:00
lichunxian a8c14ee839 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!355
2025-04-29 19:40:30 +08:00
lichunxian 23c40efe0d Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!354
2025-04-29 19:33:43 +08:00
lichunxian a76941ea17 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!353
2025-04-29 19:28:59 +08:00
刘健 0c392d2092 Merge branch 'feature/liujian-1.8' into 'main'
Log information returns the newly added Body

See merge request apipark/APIPark!352
2025-04-29 19:28:04 +08:00
Liujian e5f0423a90 Log information returns the newly added Body 2025-04-29 19:23:35 +08:00
刘健 46caf49f18 Merge branch 'feature/liujian-1.8' into 'main'
fix bug

See merge request apipark/APIPark!351
2025-04-29 19:16:54 +08:00
Liujian 7dc8d65235 fix bug 2025-04-29 19:16:21 +08:00
刘健 74c87ec308 Merge branch 'feature/liujian-1.8' into 'main'
finish service log module

See merge request apipark/APIPark!350
2025-04-29 19:08:43 +08:00
Liujian 604a8312ef finish service log module 2025-04-29 19:08:02 +08:00
lichunxian 8b318caa0b Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!349
2025-04-29 19:06:52 +08:00
lichunxian 6bad1c3c7c Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!348
2025-04-29 18:24:17 +08:00
刘健 1333d4ed02 Merge branch 'feature/liujian-1.8' into 'main'
finish log list

See merge request apipark/APIPark!347
2025-04-29 17:49:59 +08:00
Liujian a3bebde83c finish log list 2025-04-29 17:44:53 +08:00
lichunxian d3e91b04a2 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!346
2025-04-29 17:33:16 +08:00
lichunxian f4400c0130 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!345
2025-04-29 17:27:37 +08:00
lichunxian cc5d677d67 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!344
2025-04-29 11:52:46 +08:00
刘健 e4c3cbc99b Merge branch 'feature/liujian-1.8' into 'main'
update service overview

See merge request apipark/APIPark!343
2025-04-29 10:20:17 +08:00
Liujian 771c86229d update service overview 2025-04-29 10:18:02 +08:00
刘健 5c1db00d7e Merge branch 'feature/liujian-1.8' into 'main'
Feature/liujian 1.8

See merge request apipark/APIPark!342
2025-04-29 00:35:41 +08:00
Liujian cff536710e finish: monitor overview 2025-04-29 00:34:58 +08:00
lichunxian cd91f4bdb9 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!341
2025-04-27 17:44:31 +08:00
lichunxian 79860bc665 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!340
2025-04-27 14:55:12 +08:00
lichunxian 4623ba6fba Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!339
2025-04-27 14:50:59 +08:00
lichunxian 1f4acdc99e Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!338
2025-04-27 14:31:19 +08:00
lichunxian 0307282dbd Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!337
2025-04-27 14:15:29 +08:00
lichunxian 7dbc2a1a78 Merge branch 'feature/1.8-cx' into 'main'
feature/1.8-Improve system observability

See merge request apipark/APIPark!336
2025-04-27 10:53:53 +08:00
lichunxian 12d42c4247 Merge branch 'feature/1.8-cx' into 'main'
Feature/1.8 cx

See merge request apipark/APIPark!335
2025-04-27 10:45:08 +08:00
Dot.L a22759136e Merge pull request #298 from APIParkLab/feature/1.7-liujian
update docker build script
2025-04-24 16:03:14 +08:00
Liujian b8ebbac2b8 update docker build script 2025-04-24 16:02:32 +08:00
Dot.L 9c4590db07 Merge pull request #297 from APIParkLab/feature/1.7-liujian
Fix: Apikey getting md5 when calling MCP Server at service level
2025-04-22 18:08:51 +08:00
Liujian 7ba8a57793 Fix: Apikey getting md5 when calling MCP Server at service level 2025-04-22 18:08:24 +08:00
lichunxian 4eb3368875 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP-analytics-table-optimize

See merge request apipark/APIPark!334
2025-04-16 15:47:13 +08:00
刘健 4478e6823a Merge branch 'feature/1.7-liujian' into 'main'
Fix: Dragging to modify the order of service categories will fail when there...

See merge request apipark/APIPark!333
2025-04-16 15:21:28 +08:00
lichunxian ab5bffea87 Merge branch 'feature/1.7-cxx' into 'main'
Feature/1.7 cxx

See merge request apipark/APIPark!332
2025-04-16 14:36:37 +08:00
刘健 8a48828a76 Merge branch 'feature/1.7-liujian' into 'main'
Fix: Issue of API duplicate publishing when publishing services

See merge request apipark/APIPark!331
2025-04-16 14:34:39 +08:00
刘健 c2b70e23e4 Merge branch 'feature/1.7-liujian' into 'main'
update

See merge request apipark/APIPark!330
2025-04-16 14:12:52 +08:00
lichunxian eb46a4365c Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!329
2025-04-16 14:07:50 +08:00
lichunxian 058a8f7974 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!328
2025-04-16 13:48:41 +08:00
lichunxian b5585f548a Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!327
2025-04-16 13:45:08 +08:00
lichunxian e4a3e1a1a2 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!326
2025-04-16 11:54:13 +08:00
刘健 674a15ef32 Merge branch 'feature/1.7-liujian' into 'main'
update api portal interface

See merge request apipark/APIPark!325
2025-04-16 11:11:14 +08:00
lichunxian d82d665280 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!324
2025-04-16 10:19:11 +08:00
lichunxian 752db42b3b Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!323
2025-04-16 09:53:24 +08:00
lichunxian 10aaf85a26 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!322
2025-04-16 09:47:57 +08:00
lichunxian 5c97ef9416 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!321
2025-04-16 09:18:16 +08:00
刘健 5fc84299f1 Merge branch 'feature/1.7-liujian' into 'main'
update path

See merge request apipark/APIPark!320
2025-04-16 00:01:29 +08:00
刘健 2eeeebf7c2 Merge branch 'feature/1.7-liujian' into 'main'
MCP Server supports multiple languages.

See merge request apipark/APIPark!319
2025-04-15 22:42:51 +08:00
lichunxian 155ad537a9 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!318
2025-04-15 18:58:14 +08:00
lichunxian 4a1430c62a Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP-Breadcrumb

See merge request apipark/APIPark!317
2025-04-14 17:45:58 +08:00
lichunxian 8cc0d038bd Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP-Breadcrumb

See merge request apipark/APIPark!316
2025-04-14 17:33:06 +08:00
lichunxian 165759398e Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!315
2025-04-14 14:09:47 +08:00
lichunxian 1091d4e086 Merge branch 'feature/1.7-cxx' into 'main'
Feature/1.7 cxx

See merge request apipark/APIPark!314
2025-04-14 13:37:01 +08:00
刘健 0523f13dfb Merge branch 'feature/1.7-liujian' into 'main'
Feature/1.7 liujian

See merge request apipark/APIPark!313
2025-04-14 11:57:40 +08:00
lichunxian 256c04f5bb Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!312
2025-04-14 11:33:52 +08:00
lichunxian a9dcc78db6 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!311
2025-04-14 11:12:24 +08:00
刘健 1f6c173e18 Merge branch 'feature/1.7-liujian' into 'main'
Fix: API forwarding header setting failure issue

See merge request apipark/APIPark!310
2025-04-14 10:20:17 +08:00
lichunxian b593e8b57b Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!309
2025-04-14 09:54:30 +08:00
lichunxian 1ec00de03c Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!308
2025-04-11 18:41:00 +08:00
lichunxian bad7fbadda Merge branch 'feature/1.7-cxx' into 'main'
Feature/1.7 cxx

See merge request apipark/APIPark!307
2025-04-11 18:31:30 +08:00
lichunxian 7a506fc15e Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!306
2025-04-11 17:28:46 +08:00
lichunxian dec2c3a23e Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!305
2025-04-11 16:49:06 +08:00
刘健 2093541c37 Merge branch 'feature/1.7-liujian' into 'main'
Add the openapiaddress field to the API portal to obtain detailed service information

See merge request apipark/APIPark!304
2025-04-11 14:43:07 +08:00
lichunxian 40c7ba4305 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!303
2025-04-11 14:18:13 +08:00
刘健 a95bca31e2 Merge branch 'feature/1.7-liujian' into 'main'
service detail add invoke_count

See merge request apipark/APIPark!302
2025-04-11 11:58:33 +08:00
刘健 a541e45a53 Merge branch 'feature/1.7-liujian' into 'main'
Feature/1.7 liujian

See merge request apipark/APIPark!301
2025-04-11 11:04:56 +08:00
lichunxian b20c66b311 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!300
2025-04-11 10:06:06 +08:00
lichunxian 729e1f105c Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!299
2025-04-10 17:48:12 +08:00
lichunxian 6aa96a2ae9 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!298
2025-04-10 16:29:48 +08:00
lichunxian 5093c98656 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!297
2025-04-10 16:22:52 +08:00
lichunxian f4b70d4e71 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!296
2025-04-10 16:11:01 +08:00
刘健 eeb36f43a4 Update .gitlab-ci.yml file 2025-04-10 15:52:44 +08:00
lichunxian 5a59a6d378 Merge branch 'feature/1.7-cxx' into 'main'
feature/1.7-MCP

See merge request apipark/APIPark!295
2025-04-10 15:27:13 +08:00
lichunxian c0045d17e2 Merge branch 'feature/1.7-cxx' into 'main'
Feature/1.7 cxx

See merge request apipark/APIPark!294
2025-04-10 10:47:54 +08:00
刘健 42963d3ee5 Merge branch 'feature/1.7-liujian' into 'main'
Revert "Auxiliary commit to revert individual files from da05525cbbf2510a2cbc37d7eed6bfb8248e448b"

See merge request apipark/APIPark!291
2025-04-10 10:00:30 +08:00
52 changed files with 4317 additions and 903 deletions
+2 -1
View File
@@ -7,4 +7,5 @@
/.vscode/
.air.toml
/tmp/
/work
/work
/cmd/
+88
View File
@@ -0,0 +1,88 @@
variables:
PATH: /opt/go-1.23/go/bin/:/opt/node-1.22/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
GOROOT: /opt/go-1.23/go
GOPROXY: https://goproxy.cn
VERSION: $CI_COMMIT_SHORT_SHA
APP: apipark
APP_PRE: ${APP}_${VERSION}
BUILD_DIR: ${APP}-build
DEPLOY_DESC: "DEV 环境"
VIEW_ADDR: http://172.18.166.219:8288
SAVE_DIR: /opt/${APP}
NODE_OPTIONS: --max_old_space_size=8192
stages:
# - notice
- build
- deploy
- webhook
#
#feishu-informer: # 飞书回调
# stage: notice
# variables:
# DIFF_URL: "$CI_MERGE_REQUEST_PROJECT_URL/-/merge_requests/$CI_MERGE_REQUEST_IID/diffs"
# rules:
# - if: $CI_PIPELINE_SOURCE=="merge_request_event" && $CI_COMMIT_BRANCH =~ "main-github-pro"
# script:
# - echo "merge request"
# - |
# curl -X POST -H "Content-Type: application/json" \
# -d "{\"msg_type\":\"text\",\"content\":{\"text\":\"项目:${CI_PROJECT_NAME}\\n提交人:${GITLAB_USER_NAME}\\n提交信息:${CI_MERGE_REQUEST_TITLE}\\n合并分支信息:${CI_MERGE_REQUEST_SOURCE_BRANCH_NAME} -> ${CI_MERGE_REQUEST_TARGET_BRANCH_NAME}\\n差异性地址:${DIFF_URL}\\n请及时review代码\"}}" \
# ${FEISHU_WEBHOOK}
builder:
stage: build
rules:
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
script:
- set -e
- |
if [ ! -d "../artifacts" ]; then
mkdir -p ../artifacts
fi
if [ -d "../artifacts/dist" ]; then
cp -r ../artifacts/dist frontend/dist
fi
- |
if [ -n "$(git diff --name-status HEAD~1 HEAD -- frontend)" ]; then
./scripts/build.sh $BUILD_DIR ${VERSION} all ""
else
./scripts/build.sh $BUILD_DIR ${VERSION}
fi
if [ -d "frontend/dist" ]; then
echo "copy frontend/dist to artifacts/dist"
rm -fr ../artifacts/dist
cp -r frontend/dist ../artifacts/dist
fi
cp $BUILD_DIR/${APP_PRE}_linux_amd64.tar.gz ${SAVE_DIR}
deployer:
stage: deploy
rules:
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
variables:
APIPARK_GUEST_MODE: allow
APIPARK_GUEST_ID: dklejrfbhjqwdh
script:
- cd ${SAVE_DIR};mkdir -p ${APP_PRE};tar -zxvf ${APP_PRE}_linux_amd64.tar.gz -C ${APP_PRE};cd ${APP_PRE};./install.sh ${SAVE_DIR};./run.sh restart;cd ${SAVE_DIR} && ./clean.sh ${APP_PRE}
when: on_success
success:
stage: webhook
rules:
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
script:
- |
curl -X POST -H "Content-Type: application/json" \
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署完成.\\n访问地址:${VIEW_ADDR}\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
${FEISHU_WEBHOOK}
when: on_success
failure:
stage: webhook
rules:
- if: $CI_COMMIT_BRANCH == "main-github-pro" || $CI_COMMIT_BRANCH == "main"
script:
- |
curl -X POST -H "Content-Type: application/json" \
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署失败,请及时到gitlab上查看\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
${FEISHU_WEBHOOK}
when: on_failure
+77
View File
@@ -0,0 +1,77 @@
package common
import (
"fmt"
"strconv"
)
func FormatCountInt64(count int64) string {
switch {
case count < 1000:
return strconv.FormatInt(count, 10)
case count < 1000000:
return fmt.Sprintf("%.1fK", float64(count)/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", float64(count)/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", float64(count)/1000000000)
default:
return fmt.Sprintf("%.1fT", float64(count)/1000000000000)
}
}
func FormatCountFloat64(count float64) string {
switch {
case count < 1000:
return fmt.Sprintf("%.1f", count)
case count < 1000000:
return fmt.Sprintf("%.1fK", count/1000)
case count < 1000000000:
return fmt.Sprintf("%.1fM", count/1000000)
case count < 1000000000000:
return fmt.Sprintf("%.1fB", count/1000000000)
default:
return fmt.Sprintf("%.1fT", count/1000000000000)
}
}
func FormatTime(t int64) string {
if t < 1000 {
return strconv.FormatInt(t, 10) + "ms"
}
if t < 1000000 {
return fmt.Sprintf("%.1fs", float64(t)/1000)
}
if t < 1000000000 {
return fmt.Sprintf("%.1fmin", float64(t)/1000000)
}
if t < 1000000000000 {
return fmt.Sprintf("%.1fhour", float64(t)/1000000000)
}
return fmt.Sprintf("%.1D", float64(t)/1000000000000)
}
func FormatByte(b int64) string {
const (
KB = 1000
MB = KB * 1000
GB = MB * 1000
TB = GB * 1000
PB = TB * 1000
)
switch {
case b < KB:
return fmt.Sprintf("%dB", b)
case b < MB:
return fmt.Sprintf("%.1fKB", float64(b)/KB)
case b < GB:
return fmt.Sprintf("%.1fMB", float64(b)/MB)
case b < TB:
return fmt.Sprintf("%.1fGB", float64(b)/GB)
case b < PB:
return fmt.Sprintf("%.1fTB", float64(b)/TB)
default:
return fmt.Sprintf("%.1fPB", float64(b)/PB)
}
}
+2
View File
@@ -29,6 +29,8 @@ func FmtIntFromInterface(val interface{}) int64 {
return int64(ret)
case int:
return int64(ret)
case float64:
return int64(ret)
default:
return 0
}
+61
View File
@@ -2,6 +2,7 @@ package monitor
import (
"fmt"
"strconv"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
@@ -17,6 +18,66 @@ type imlMonitorStatisticController struct {
module monitor.IMonitorStatisticModule `autowired:""`
}
func (i *imlMonitorStatisticController) ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.RestChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
return i.module.AIChartOverview(ctx, "", s, e)
}
func (i *imlMonitorStatisticController) AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "ai")
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlMonitorStatisticController) RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
l, err := strconv.Atoi(limit)
if err != nil {
if limit == "" {
l = 10
} else {
return nil, nil, fmt.Errorf("parse limit %s error: %w", limit, err)
}
}
return i.module.Top(ctx, "", s, e, l, "rest")
}
func (i *imlMonitorStatisticController) Statistics(ctx *gin.Context, dataType string, input *monitor_dto.StatisticInput) (interface{}, error) {
switch dataType {
case monitor_dto.DataTypeApi:
+5
View File
@@ -22,6 +22,11 @@ type IMonitorStatisticController interface {
InvokeTrendInner(ctx *gin.Context, dataType string, typ string, api string, provider string, subscriber string, input *monitor_dto.CommonInput) (*monitor_dto.MonInvokeCountTrend, string, error)
StatisticsInner(ctx *gin.Context, dataType string, typ string, id string, input *monitor_dto.StatisticInput) (interface{}, error)
ChartRestOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartRestOverview, error)
ChartAIOverview(ctx *gin.Context, start string, end string) (*monitor_dto.ChartAIOverview, error)
AITopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
RestTopN(ctx *gin.Context, start string, end string, limit string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigController interface {
+208 -18
View File
@@ -5,9 +5,13 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/APIParkLab/APIPark/module/monitor"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
ai_provider_local "github.com/APIParkLab/APIPark/ai-provider/local"
subscribe_dto "github.com/APIParkLab/APIPark/module/subscribe/dto"
@@ -55,10 +59,6 @@ import (
"github.com/google/uuid"
)
//var (
// ollamaConfig = "{\n \"mirostat\": 0,\n \"mirostat_eta\": 0.1,\n \"mirostat_tau\": 5.0,\n \"num_ctx\": 4096,\n \"repeat_last_n\":64,\n \"repeat_penalty\": 1.1,\n \"temperature\": 0.7,\n \"seed\": 42,\n \"num_predict\": 42,\n \"top_k\": 40,\n \"top_p\": 0.9,\n \"min_p\": 0.5\n}\n"
//)
var (
_ IServiceController = (*imlServiceController)(nil)
@@ -66,20 +66,210 @@ var (
)
type imlServiceController struct {
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
transaction store.ITransaction `autowired:""`
module service.IServiceModule `autowired:""`
docModule service.IServiceDocModule `autowired:""`
subscribeModule subscribe.ISubscribeModule `autowired:""`
aiAPIModule ai_api.IAPIModule `autowired:""`
routerModule router.IRouterModule `autowired:""`
apiDocModule api_doc.IAPIDocModule `autowired:""`
providerModule ai.IProviderModule `autowired:""`
aiLocalModel ai_local.ILocalModelModule `autowired:""`
appModule service.IAppModule `autowired:""`
upstreamModule upstream.IUpstreamModule `autowired:""`
settingModule system.ISettingModule `autowired:""`
teamModule team.ITeamModule `autowired:""`
catalogueModule catalogue.ICatalogueModule `autowired:""`
monitorModule monitor.IMonitorStatisticModule `autowired:""`
monitorConfigModule monitor.IMonitorConfigModule `autowired:""`
transaction store.ITransaction `autowired:""`
}
func (i *imlServiceController) RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
return i.module.RestLogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
return i.module.AILogInfo(ctx, serviceId, logId)
}
func (i *imlServiceController) AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.AILogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, 0, err
}
if serviceId == "" {
return nil, 0, fmt.Errorf("service id is empty")
}
if page == "" {
page = "1"
}
if size == "" {
size = "20"
}
p, err := strconv.Atoi(page)
if err != nil {
return nil, 0, err
}
ps, err := strconv.Atoi(size)
if err != nil {
return nil, 0, err
}
return i.module.RestLogs(ctx, serviceId, s, e, p, ps)
}
func (i *imlServiceController) ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error) {
o, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return o, nil
}
statistics, err := i.monitorModule.ProviderStatistics(ctx, &monitor_dto.StatisticInput{
Services: []string{serviceId},
CommonInput: &monitor_dto.CommonInput{
Start: time.Now().Add(-24 * 30 * time.Hour).Unix(),
End: time.Now().Unix(),
},
})
if err != nil {
return nil, err
}
if len(statistics) < 1 {
return o, nil
}
o.InvokeNum = statistics[0].RequestTotal
return o, nil
}
func (i *imlServiceController) AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
so, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
result := &monitor_dto.ServiceChartAIOverview{
EnableMCP: so.EnableMCP,
SubscriberNum: so.SubscriberNum,
APINum: so.APINum,
ServiceKind: so.ServiceKind,
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return result, nil
}
o, err := i.monitorModule.AIChartOverview(ctx, serviceId, s, e)
if err != nil {
return nil, err
}
result.AvailableMonitor = true
result.ChartAIOverview = o
return result, nil
}
func (i *imlServiceController) RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error) {
s, e, err := formatTime(start, end)
if err != nil {
return nil, err
}
if serviceId == "" {
return nil, fmt.Errorf("service is required")
}
so, err := i.module.ServiceOverview(ctx, serviceId)
if err != nil {
return nil, err
}
result := &monitor_dto.ServiceChartRestOverview{
EnableMCP: so.EnableMCP,
SubscriberNum: so.SubscriberNum,
APINum: so.APINum,
ServiceKind: so.ServiceKind,
}
cfg, err := i.monitorConfigModule.GetMonitorConfig(ctx)
if err != nil {
return nil, err
}
if len(cfg.Config) < 1 {
return result, nil
}
o, err := i.monitorModule.RestChartOverview(ctx, serviceId, s, e)
if err != nil {
return nil, err
}
result.AvailableMonitor = true
result.ChartRestOverview = o
return result, nil
}
func formatTime(start string, end string) (int64, int64, error) {
s, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse start time %s error: %w", start, err)
}
e, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse end time %s error: %w", end, err)
}
return s, e, nil
}
func (i *imlServiceController) Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
if serviceId == "" {
return nil, nil, fmt.Errorf("serviceId is required")
}
info, err := i.module.Get(ctx, serviceId)
if err != nil {
return nil, nil, err
}
s, e, err := formatTime(start, end)
if err != nil {
return nil, nil, err
}
return i.monitorModule.Top(ctx, serviceId, s, e, 10, info.ServiceKind)
}
func (i *imlServiceController) QuickCreateAIService(ctx *gin.Context, input *service_dto.QuickCreateAIService) error {
+15
View File
@@ -3,6 +3,8 @@ package service
import (
"reflect"
monitor_dto "github.com/APIParkLab/APIPark/module/monitor/dto"
service_dto "github.com/APIParkLab/APIPark/module/service/dto"
"github.com/gin-gonic/gin"
@@ -32,6 +34,19 @@ type IServiceController interface {
Swagger(ctx *gin.Context)
ExportSwagger(ctx *gin.Context)
Top10(ctx *gin.Context, serviceId string, start string, end string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
AIChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartAIOverview, error)
RestChartOverview(ctx *gin.Context, serviceId string, start string, end string) (*monitor_dto.ServiceChartRestOverview, error)
ServiceOverview(ctx *gin.Context, serviceId string) (*service_dto.Overview, error)
AILogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx *gin.Context, serviceId string, start string, end string, page string, size string) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx *gin.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IAppController interface {
+1 -1
View File
@@ -7,7 +7,7 @@ toolchain go1.23.6
require (
github.com/eolinker/ap-account v1.0.15
github.com/eolinker/eosc v0.18.3
github.com/eolinker/go-common v1.1.6
github.com/eolinker/go-common v1.1.7
github.com/gabriel-vasile/mimetype v1.4.4
github.com/getkin/kin-openapi v0.127.0
github.com/gin-contrib/gzip v1.0.1
+2 -2
View File
@@ -32,8 +32,8 @@ github.com/eolinker/ap-account v1.0.15 h1:n6DJeL6RHZ8eLlZUcY2U3H4d/GPaA5oelAx3R0
github.com/eolinker/ap-account v1.0.15/go.mod h1:zm/Ivs6waJ/M/nEszhpPmM6g50y/MKO+5eABFAdeD0g=
github.com/eolinker/eosc v0.18.3 h1:3IK5HkAPnJRfLbQ0FR7kWsZr6Y/OiqqGazvN1q2BL5A=
github.com/eolinker/eosc v0.18.3/go.mod h1:O9PQQXFCpB6fjHf+oFt/LN6EOAv779ItbMixMKCfTfk=
github.com/eolinker/go-common v1.1.6 h1:s+NaQL0InjX/MwWY53+8y8qzAgsULIUc4U6nWXWQ2Nw=
github.com/eolinker/go-common v1.1.6/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
github.com/eolinker/go-common v1.1.7 h1:bi7wDmlCYQGjS3k8Bz/o+Mo9aMJAzmPsBLXWurxPfwk=
github.com/eolinker/go-common v1.1.7/go.mod h1:Kb/jENMN1mApnodvRgV4YwO9FJby1Jkt2EUjrBjvSX4=
github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
github.com/gabriel-vasile/mimetype v1.4.4/go.mod h1:JwLei5XPtWdGiMFB5Pjle1oEeoSeEuJfJE+TtfvdB/s=
github.com/getkin/kin-openapi v0.127.0 h1:Mghqi3Dhryf3F8vR370nN67pAERW+3a95vomb3MAREY=
+2 -1
View File
@@ -9,7 +9,8 @@ import (
type ILogDriver interface {
LogInfo(clusterId string, id string) (*LogInfo, error)
LogCount(clusterId string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Log, int64, error)
Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*LogItem, int64, error)
LogRecords(clusterId string, start time.Time, end time.Time) ([]*LogItem, error)
}
var (
+14 -2
View File
@@ -4,22 +4,34 @@ import (
"time"
)
type Log struct {
type LogItem struct {
ID string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type LogInfo struct {
ID string
*LogItem
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}
+34 -19
View File
@@ -49,29 +49,44 @@ type LogCount struct {
}
type LogInfo struct {
Stream *LogDetail `json:"stream"`
Stream *LogDetail `json:"stream"`
Values []interface{} `json:"values"`
}
type LogDetail struct {
Api string `json:"api"`
Application string `json:"application"`
Strategy string `json:"strategy"`
ContentType string `json:"content_type"`
Cluster string `json:"cluster"`
Msec string `json:"msec"`
Node string `json:"node"`
RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"`
RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"`
type LogBodyDetail struct {
RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"`
ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"`
Provider string `json:"provider"`
Authorization string `json:"authorization"`
SrcIp string `json:"src_ip"`
Status string `json:"status"`
}
type LogDetail struct {
Api string `json:"api"`
Application string `json:"application"`
Strategy string `json:"strategy"`
ContentType string `json:"content_type"`
Cluster string `json:"cluster"`
Msec string `json:"msec"`
Node string `json:"node"`
RequestId string `json:"request_id"`
RequestMethod string `json:"request_method"`
RequestScheme string `json:"request_scheme"`
RequestHeader string `json:"request_header"`
RequestTime string `json:"request_time"`
RequestUri string `json:"request_uri"`
RequestBody string `json:"request_body"`
ProxyBody string `json:"proxy_body"`
ResponseBody string `json:"response_body"`
ResponseHeader string `json:"response_header"`
ProxyResponseBody string `json:"proxy_response_body"`
Service string `json:"service"`
Provider string `json:"provider"`
Authorization string `json:"authorization"`
SrcIp string `json:"src_ip"`
Status string `json:"status"`
AIProvider string `json:"ai_provider"`
AIModel string `json:"ai_model"`
AIModelInputToken interface{} `json:"ai_model_input_token"`
AIModelOutputToken interface{} `json:"ai_model_output_token"`
AIModelTotalToken interface{} `json:"ai_model_total_token"`
}
+97 -21
View File
@@ -81,13 +81,39 @@ func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, erro
return nil, fmt.Errorf("no log found")
}
stream := list[0].Stream
requestBody := stream.RequestBody
proxyRequestBody := stream.ProxyBody
proxyResponseBody := stream.ProxyResponseBody
responseBody := stream.ResponseBody
if len(list[0].Values) > 0 {
switch t := list[0].Values[0].(type) {
case []interface{}:
if len(t) > 1 {
v, ok := t[1].(string)
if !ok {
break
}
var tmp LogBodyDetail
err = json.Unmarshal([]byte(v), &tmp)
if err == nil {
requestBody = tmp.RequestBody
proxyRequestBody = tmp.ProxyBody
responseBody = tmp.ResponseBody
proxyResponseBody = tmp.ProxyBody
}
}
}
}
msec, _ := strconv.ParseInt(stream.Msec, 10, 64)
return &log_driver.LogInfo{
ID: stream.RequestId,
LogItem: ToLogItem(stream, msec),
ContentType: stream.ContentType,
RequestBody: stream.RequestBody,
ProxyBody: stream.ProxyBody,
ProxyResponseBody: stream.ProxyResponseBody,
ResponseBody: stream.ResponseBody,
RequestBody: requestBody,
ProxyBody: proxyRequestBody,
ProxyResponseBody: proxyResponseBody,
ResponseBody: responseBody,
RequestHeader: stream.RequestHeader,
ResponseHeader: stream.ResponseHeader,
}, nil
}
@@ -132,7 +158,25 @@ func (d *Driver) LogCount(clusterId string, conditions map[string]string, spendH
return result, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.Log, int64, error) {
func (d *Driver) LogRecords(clusterId string, start time.Time, end time.Time) ([]*log_driver.LogItem, error) {
if start.After(end) {
return nil, fmt.Errorf("start time is greater than end time")
}
queries := url.Values{}
queries.Set("query", fmt.Sprintf("{cluster=\"%s\"} | json", clusterId))
queries.Set("direction", "backward")
queries.Set("start", strconv.FormatInt(start.UnixNano(), 10))
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
log.Debug("query is ", queries.Get("query"))
logs, err := d.recuseLogs(queries, end, 1)
if err != nil {
return nil, err
}
return logs, nil
}
func (d *Driver) Logs(clusterId string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*log_driver.LogItem, int64, error) {
if start.After(end) {
return nil, 0, fmt.Errorf("start time is greater than end time")
}
@@ -177,7 +221,30 @@ func (d *Driver) Logs(clusterId string, conditions map[string]string, start time
return logs, count, nil
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.Log, error) {
func ToLogItem(detail *LogDetail, msec int64) *log_driver.LogItem {
return &log_driver.LogItem{
ID: detail.RequestId,
Strategy: detail.Strategy,
Service: detail.Provider,
API: detail.Api,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
InputToken: parseToInt64(detail.AIModelInputToken),
OutputToken: parseToInt64(detail.AIModelOutputToken),
TotalToken: parseToInt64(detail.AIModelTotalToken),
AIProvider: detail.AIProvider,
AIModel: detail.AIModel,
StatusCode: parseToInt64(detail.Status),
ResponseTime: parseToInt64(detail.RequestTime),
Traffic: int64(len(detail.ResponseBody) + len(detail.RequestBody)),
RecordTime: time.UnixMilli(msec),
}
}
func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]*log_driver.LogItem, error) {
queries.Set("end", strconv.FormatInt(end.UnixNano(), 10))
list, err := send[LogInfo](http.MethodGet, fmt.Sprintf("%s/loki/api/v1/query_range", d.url), d.headers, queries, "")
if err != nil {
@@ -198,24 +265,13 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
}
return d.recuseLogs(queries, time.UnixMilli(msec), offset-1)
}
logs := make([]*log_driver.Log, 0, len(list))
logs := make([]*log_driver.LogItem, 0, len(list))
for _, l := range list {
if l.Stream == nil {
continue
}
detail := l.Stream
msec, _ := strconv.ParseInt(detail.Msec, 10, 64)
logs = append(logs, &log_driver.Log{
ID: detail.RequestId,
Service: detail.Provider,
Method: detail.RequestMethod,
Url: detail.RequestUri,
RemoteIP: detail.SrcIp,
Consumer: detail.Application,
Authorization: detail.Authorization,
RecordTime: time.UnixMilli(msec),
})
msec, _ := strconv.ParseInt(l.Stream.Msec, 10, 64)
logs = append(logs, ToLogItem(l.Stream, msec))
}
sort.Slice(logs, func(i, j int) bool {
return logs[i].RecordTime.After(logs[j].RecordTime)
@@ -223,6 +279,26 @@ func (d *Driver) recuseLogs(queries url.Values, end time.Time, offset int64) ([]
return logs, nil
}
func parseToInt64(v interface{}) int64 {
switch t := v.(type) {
case int:
return int64(t)
case int64:
return t
case string:
if v == "" {
return 0
}
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return 0
}
return i
default:
return 0
}
}
func (d *Driver) logCount(clusterId string, conditions map[string]string, start time.Time, end time.Time) (int64, error) {
// 先查在这段时间内符合条件的日志数量
queries := url.Values{}
+5 -5
View File
@@ -44,12 +44,12 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
// t.Log(time.Now().Sub(a))
// t.LogItem(time.Now().Sub(a))
// data, err := json.Marshal(result)
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogCount(t *testing.T) {
@@ -67,7 +67,7 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
//
//func TestLokiLogs(t *testing.T) {
@@ -83,7 +83,7 @@ func TestLoki(t *testing.T) {
// queries.Set("limit", "1")
// now = time.Now()
// result, err := send[map[string]interface{}](http.MethodGet, "http://localhost:3100/loki/api/v1/query_range", headers, queries, "")
// t.Log(time.Now().Sub(now))
// t.LogItem(time.Now().Sub(now))
// if err != nil {
// t.Fatalf("failed to send request: %v", err)
// }
@@ -91,5 +91,5 @@ func TestLoki(t *testing.T) {
// if err != nil {
// t.Fatalf("failed to marshal data: %v", err)
// }
// t.Log(string(data))
// t.LogItem(string(data))
//}
+1 -1
View File
@@ -124,7 +124,7 @@ func (t *Tool) RegisterMCP(s *server.MCPServer) {
}
apikey := utils.Label(ctx, "apikey")
if apikey != "" {
req.Header.Set("Authorization", utils.Md5(apikey))
req.Header.Set("Authorization", apikey)
}
resp, err := client.Do(req)
+2 -1
View File
@@ -718,7 +718,8 @@ func (i *imlProviderModule) getAiProviders(ctx context.Context) ([]*gateway.Dyna
}
model, has := driver.GetModel(l.DefaultLLM)
if !has {
return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
continue
//return nil, fmt.Errorf("model not found: %s", l.DefaultLLM)
}
cfg := make(map[string]interface{})
cfg["provider"] = l.Id
+179 -4
View File
@@ -4,9 +4,14 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/eolinker/go-common/server"
log_driver "github.com/APIParkLab/APIPark/log-driver"
"github.com/eolinker/go-common/register"
"github.com/eolinker/go-common/utils"
"github.com/APIParkLab/APIPark/gateway"
@@ -16,11 +21,11 @@ import (
"github.com/APIParkLab/APIPark/service/cluster"
"github.com/eolinker/go-common/auto"
log_dto "github.com/APIParkLab/APIPark/module/log/dto"
"github.com/APIParkLab/APIPark/service/log"
eosc_log "github.com/eolinker/eosc/log"
log_print "github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto"
)
var _ ILogModule = (*imlLogModule)(nil)
@@ -28,7 +33,10 @@ var _ ILogModule = (*imlLogModule)(nil)
type imlLogModule struct {
service log.ILogService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
transaction store.ITransaction `autowired:""`
//scheduleCtx context.Context
scheduleCancel context.CancelFunc
}
var labels = map[string]string{
@@ -54,6 +62,7 @@ var logFormatter = map[string]interface{}{
"$proxy_host",
"$proxy_header",
"$proxy_addr",
"$response_header",
"$response_headers",
"$status",
"$content_type",
@@ -70,6 +79,11 @@ var logFormatter = map[string]interface{}{
"$authorization",
"$response_body",
"$proxy_response_body",
"$ai_provider",
"$ai_model",
"$ai_model_input_token",
"$ai_model_output_token",
"$ai_model_total_token",
},
}
@@ -135,6 +149,11 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S
return err
}
log_driver.SetDriver(driver, d)
newCtx, cancel := context.WithCancel(context.Background())
newCtx = utils.SetUserId(newCtx, "admin")
i.scheduleCancel()
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(newCtx)
return nil
})
}
@@ -164,8 +183,15 @@ func (i *imlLogModule) Get(ctx context.Context, driver string) (*log_dto.LogSour
}, nil
}
func (i *imlLogModule) OnComplete() {
func (i *imlLogModule) OnInit() {
register.Handle(func(v server.Server) {
ctx, cancel := context.WithCancel(context.Background())
ctx = utils.SetUserId(ctx, "admin")
//i.scheduleCtx = ctx
i.scheduleCancel = cancel
i.scheduleUpdateLogRecord(ctx)
})
}
func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, clientDriver gateway.IClientDriver) error {
@@ -222,3 +248,152 @@ func (i *imlLogModule) initGateway(ctx context.Context, clusterId string, client
return nil
}
const (
oneSecond = 1
oneMinute = 60
oneHour = 60 * oneMinute
oneDay = 24 * oneHour
)
// 定时更新历史记录
func (i *imlLogModule) scheduleUpdateLogRecord(ctx context.Context) {
driver, has := log_driver.GetDriver("loki")
if !has {
eosc_log.Error("driver loki not found")
return
}
info, err := i.service.GetLogSource(ctx, "loki")
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
eosc_log.Errorf("get log source loki error: %s", err)
return
}
return
}
now := time.Now()
before90Days := now.Add(-7 * 24 * time.Hour)
beginTime := before90Days
if info.LastPullTime.After(before90Days) {
before90Days = info.LastPullTime
}
pauseTime := now
historyFinish := false
go func() {
eosc_log.Infof("start update history log record,start time: %s", beginTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(1 * time.Minute)
defer ticket.Stop()
for {
now = time.Now()
select {
case <-ctx.Done():
return
case <-ticket.C:
switch {
case now.Sub(beginTime) > oneDay:
endTime := beginTime.Add(oneDay)
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
beginTime = endTime
case now.Sub(pauseTime) <= oneDay:
endTime := pauseTime
err = i.updateLogRecord(ctx, driver, beginTime, endTime)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
historyFinish = true
return
}
historyFinish = true
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &endTime,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
return
}
eosc_log.Infof("update log record finish")
return
}
}
}
}()
go func() {
eosc_log.Infof("start update running log record,start time: %s", pauseTime.Format("2006-01-02 15:04:05"))
ticket := time.NewTicker(10 * time.Second)
defer ticket.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticket.C:
end := time.Now()
start := end.Add(-1 * time.Minute)
err = i.updateLogRecord(ctx, driver, start, end)
if err != nil {
eosc_log.Errorf("update log record error: %s", err)
continue
}
if historyFinish {
err = i.service.UpdateLogSource(ctx, "loki", &log.Save{
LastPullTime: &end,
})
if err != nil {
eosc_log.Errorf("update log source error: %s", err)
continue
}
}
}
}
}()
}
func (i *imlLogModule) updateLogRecord(ctx context.Context, driver log_driver.ILogDriver, start, end time.Time) error {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
logs, err := driver.LogRecords(c.Cluster, start, end)
if err != nil {
return fmt.Errorf("get log records error: %s", err)
}
for _, l := range logs {
err = i.service.InsertLog(ctx, "loki", &log.InsertLog{
ID: l.ID,
Driver: "loki",
Strategy: l.Strategy,
API: l.API,
Service: l.Service,
Method: l.Method,
Url: l.Url,
RemoteIP: l.RemoteIP,
Consumer: l.Consumer,
Authorization: l.Authorization,
InputToken: l.InputToken,
OutputToken: l.OutputToken,
TotalToken: l.TotalToken,
AIProvider: l.AIProvider,
AIModel: l.AIModel,
StatusCode: l.StatusCode,
ResponseTime: l.ResponseTime,
Traffic: l.Traffic,
RecordTime: l.RecordTime,
})
if err != nil {
eosc_log.Errorf("insert log record error: %s,log id: %s", err, l.ID)
continue
}
}
return nil
}
+1 -1
View File
@@ -270,7 +270,7 @@ func (i *imlMcpModule) Invoke(ctx context.Context, req mcp.CallToolRequest) (*mc
queryParam.Add(k, value)
}
case float64:
queryParam.Add(k, strconv.FormatFloat(v, 'e', -1, 64))
queryParam.Add(k, strconv.FormatFloat(v, 'f', -1, 64))
default:
return nil, fmt.Errorf("invalid query param type: %T", v)
}
+22
View File
@@ -21,4 +21,26 @@ type IExecutor interface {
InvokeTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
ProxyTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonInvokeCountTrend, string, error)
MessageTrend(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (*monitor.MonMessageTrend, string, error)
IBasicOverview
IRestOverview
IAIOverview
}
type IBasicOverview interface {
RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error)
ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error)
}
type IRestOverview interface {
TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error)
AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error)
}
type IAIOverview interface {
TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error)
}
+331 -6
View File
@@ -3,6 +3,7 @@ package influxdb_v2
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
@@ -23,6 +24,9 @@ import (
"github.com/APIParkLab/APIPark/service/monitor"
)
var _ driver.IAIOverview = (*executor)(nil)
var _ driver.IRestOverview = (*executor)(nil)
func newExecutor(cfg string, fluxQuery flux.IFluxQuery) (driver.IExecutor, error) {
var data InfluxdbV2Config
err := json.Unmarshal([]byte(cfg), &data)
@@ -147,7 +151,7 @@ func (e *executor) MessageTrend(ctx context.Context, start time.Time, end time.T
fieldsConditions := []string{"request", "response"}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset)
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -166,9 +170,9 @@ func (e *executor) ProxyTrend(ctx context.Context, start time.Time, end time.Tim
filters := formatFilter(wheres)
proxyConditions := []string{"p_total", "p_success", "p_s4xx", "p_s5xx"}
proxyConditions := []string{"p_total", "p_success", "p_s2xx", "p_s4xx", "p_s5xx"}
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
dates, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -200,9 +204,9 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "success", "s4xx", "s5xx"}
requestConditions := []string{"total", "success", "2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset)
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -221,7 +225,7 @@ func (e *executor) InvokeTrend(ctx context.Context, start time.Time, end time.Ti
proxyConditions := []string{"p_total", "p_success"}
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset)
_, proxyValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "proxy", filters, proxyConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, "", err
}
@@ -359,5 +363,326 @@ func (e *executor) CommonStatistics(ctx context.Context, start, end time.Time, g
}
return resultMap, nil
}
func (e *executor) overviewByStatusCode(ctx context.Context, start, end time.Time, table string, wheres []monitor.MonWhereItem, statusCode []string, dataFields []string, fn flux.AggregateFn) ([]time.Time, map[string][]int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
var returnDates []time.Time
var returnResult = make(map[string][]int64)
for _, s := range statusCode {
newWheres := make([]monitor.MonWhereItem, 0, len(wheres)+1)
newWheres = append(newWheres, wheres...)
newWheres = append(newWheres, monitor.MonWhereItem{
Key: "status_code",
Operation: "=",
Values: []string{s},
})
dates, result, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, table, formatFilter(newWheres), dataFields, every, windowOffset, fn)
if err != nil {
return nil, nil, err
}
if len(dates) > 0 {
returnDates = dates
}
for _, v := range dataFields {
key := fmt.Sprintf("%s_%s", s, v)
if _, ok := returnResult[key]; !ok {
returnResult[key] = make([]int64, 0, len(returnDates))
}
returnResult[key] = append(returnResult[key], result[v]...)
}
}
return returnDates, returnResult, nil
}
func (e *executor) TrafficOverviewByStatusCode(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
fieldsConditions := []string{"request", "response"}
statusFilters := []string{"2xx", "4xx", "5xx"}
dates, overview, err := e.overviewByStatusCode(ctx, start, end, "request", wheres, statusFilters, fieldsConditions, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
s2xxRequest := overview["2xx_request"]
s2xxRequestLen := len(s2xxRequest)
s4xxRequest := overview["4xx_request"]
s4xxRequestLen := len(s4xxRequest)
s5xxRequest := overview["5xx_request"]
s5xxRequestLen := len(s5xxRequest)
s2xxResponse := overview["2xx_response"]
s2xxResponseLen := len(s2xxResponse)
s4xxResponse := overview["4xx_response"]
s4xxResponseLen := len(s4xxResponse)
s5xxResponse := overview["5xx_response"]
s5xxResponseLen := len(s5xxResponse)
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
r := new(monitor.StatusCodeOverview)
if s2xxRequestLen > i {
r.Status2xx = s2xxRequest[i]
}
if s4xxRequestLen > i {
r.Status4xx = s4xxRequest[i]
}
if s5xxRequestLen > i {
r.Status5xx = s5xxRequest[i]
}
if s2xxResponseLen > i {
r.Status2xx += s2xxResponse[i]
}
if s4xxResponseLen > i {
r.Status4xx += s4xxResponse[i]
}
if s5xxResponseLen > i {
r.Status5xx += s5xxResponse[i]
}
r.StatusTotal += r.Status2xx + r.Status4xx + r.Status5xx
totalOverview.Status2xx += r.Status2xx
totalOverview.Status4xx += r.Status4xx
totalOverview.Status5xx += r.Status5xx
totalOverview.StatusTotal += r.StatusTotal
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) aggregateSummary(ctx context.Context, start time.Time, end time.Time, measurement string, bucket string, filters string, fields []string) (map[string]*monitor.Aggregate, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("fields is empty")
}
maxFields := make([]string, 0, len(fields))
minFields := make([]string, 0, len(fields))
avgFields := make([]string, 0, len(fields))
for _, field := range fields {
maxFields = append(maxFields, field+"_max")
minFields = append(minFields, field+"_min")
avgFields = append(avgFields, field+"_avg")
}
maxRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "max()",
Fields: maxFields,
})
if err != nil {
return nil, err
}
minRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "min()",
Fields: minFields,
})
if err != nil {
return nil, err
}
avgRes, err := e.fluxQuery.CommonQueryOnce(ctx, e.openApi, start, end, bucket, filters, &flux.StatisticsFilterConf{
Measurement: measurement,
AggregateFn: "mean()",
Fields: avgFields,
})
if err != nil {
return nil, err
}
result := make(map[string]*monitor.Aggregate)
for _, field := range fields {
a := new(monitor.Aggregate)
if v, ok := avgRes[field+"_avg"]; ok {
a.Avg = int64(v.(float64))
}
if v, ok := maxRes[field+"_max"]; ok {
a.Max = v.(int64)
}
if v, ok := minRes[field+"_min"]; ok {
a.Min = v.(int64)
}
result[field] = a
}
return result, nil
}
func (e *executor) SumResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"timing"}
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
if err != nil {
return nil, nil, nil, err
}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
timing := groupValues["timing"]
timingLen := len(timing)
result := make([]int64, 0, len(dates))
for i := range dates {
if timingLen > i {
result = append(result, timing[i])
}
}
return dates, agg["timing"], result, nil
}
func (e *executor) AvgResponseTimeOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.Aggregate, []int64, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
fieldsConditions := []string{"timing_avg"}
agg, err := e.aggregateSummary(ctx, newStartTime, end, "request", bucket, filters, []string{"timing"})
if err != nil {
return nil, nil, nil, err
}
dates, groupValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, fieldsConditions, every, windowOffset, flux.AvgFn)
if err != nil {
return nil, nil, nil, err
}
timingAvg := groupValues["timing_avg"]
timingAvgLen := len(timingAvg)
result := make([]int64, 0, len(dates))
for i := range dates {
if timingAvgLen > i {
result = append(result, timingAvg[i])
}
}
return dates, agg["timing"], result, nil
}
func (e *executor) RequestOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.StatusCodeOverview, []*monitor.StatusCodeOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total", "s2xx", "s4xx", "s5xx"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
total := requestValues["total"]
totalLen := len(total)
s2xx := requestValues["s2xx"]
s2xxLen := len(s2xx)
s4xx := requestValues["s4xx"]
s4xxLen := len(s4xx)
s5xx := requestValues["s5xx"]
s5xxLen := len(s5xx)
totalOverview := new(monitor.StatusCodeOverview)
result := make([]*monitor.StatusCodeOverview, 0, len(dates))
for i := range dates {
r := new(monitor.StatusCodeOverview)
if totalLen > i {
r.StatusTotal = total[i]
totalOverview.StatusTotal += r.StatusTotal
}
if s2xxLen > i {
r.Status2xx = s2xx[i]
totalOverview.Status2xx += r.Status2xx
}
if s4xxLen > i {
r.Status4xx = s4xx[i]
totalOverview.Status4xx += r.Status4xx
}
if s5xxLen > i {
r.Status5xx = s5xx[i]
totalOverview.Status5xx += r.Status5xx
}
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) TopN(ctx context.Context, start time.Time, end time.Time, limit int, groupBy string, wheres []monitor.MonWhereItem) ([]*monitor.TopN, error) {
filters := formatFilter(wheres)
newStartTime, _, _, bucket := getTimeIntervalAndBucket(start, end)
statisticsConf := []*flux.StatisticsFilterConf{
{
Measurement: "request",
AggregateFn: "sum()",
Fields: []string{"total", "request", "response", "input_token", "output_token"},
},
{
Measurement: "proxy",
AggregateFn: "sum()",
Fields: []string{"p_total"},
},
}
results, err := e.fluxQuery.CommonStatistics(ctx, e.openApi, newStartTime, end, bucket, groupBy, filters, statisticsConf, limit)
if err != nil {
return nil, err
}
topN := make([]*monitor.TopN, 0, len(results))
for key, result := range results {
n := new(monitor.TopN)
n.Key = key
n.Request = result.Total
n.Token = result.TotalToken
n.Traffic = result.TotalRequest + result.TotalResponse
topN = append(topN, n)
}
return topN, nil
}
func (e *executor) TokenOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) ([]time.Time, *monitor.TokenOverview, []*monitor.TokenOverview, error) {
newStartTime, every, windowOffset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
requestConditions := []string{"total_token", "input_token", "output_token"}
dates, requestValues, err := e.fluxQuery.CommonTendency(ctx, e.openApi, newStartTime, end, bucket, "request", filters, requestConditions, every, windowOffset, flux.SumFn)
if err != nil {
return nil, nil, nil, err
}
//total := requestValues["total_token"]
//totalLen := len(total)
input := requestValues["input_token"]
inputLen := len(input)
output := requestValues["output_token"]
outputLen := len(output)
totalOverview := new(monitor.TokenOverview)
result := make([]*monitor.TokenOverview, 0, len(dates))
for i := range dates {
r := new(monitor.TokenOverview)
if inputLen > i {
r.InputToken = input[i]
}
if outputLen > i {
r.OutputToken = output[i]
}
r.TotalToken = r.InputToken + r.OutputToken
totalOverview.InputToken += r.InputToken
totalOverview.OutputToken += r.OutputToken
totalOverview.TotalToken += r.TotalToken
result = append(result, r)
}
return dates, totalOverview, result, nil
}
func (e *executor) ConsumerOverview(ctx context.Context, start time.Time, end time.Time, wheres []monitor.MonWhereItem) (int64, map[time.Time]int64, error) {
newStartTime, every, offset, bucket := getTimeIntervalAndBucket(start, end)
filters := formatFilter(wheres)
return e.fluxQuery.CommonTendencyTag(ctx, e.openApi, newStartTime, end, bucket, "request", filters, every, offset, "app")
}
+97 -22
View File
@@ -14,7 +14,8 @@ import (
type IFluxQuery interface {
CommonStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonProxyStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf []*StatisticsFilterConf, limit int) (map[string]*FluxStatistics, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error)
CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error)
CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error)
// CommonQueryOnce 查询只返回一条结果
CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error)
CommonWarnStatistics(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, groupBy, filters string, statisticsConf *StatisticsFilterConf) (map[string]*FluxWarnStatistics, error)
@@ -61,18 +62,30 @@ func (f *fluxQuery) CommonStatistics(ctx context.Context, queryApi api.QueryAPI,
totalRequest := common.FmtIntFromInterface(maps["request"])
maxRequest := common.FmtIntFromInterface(maps["request_max"])
minRequest := common.FmtIntFromInterface(maps["request_min"])
totalResponse := common.FmtIntFromInterface(maps["response"])
maxResponse := common.FmtIntFromInterface(maps["response_max"])
minResponse := common.FmtIntFromInterface(maps["response_min"])
inputToken := common.FmtIntFromInterface(maps["input_token"])
outputToken := common.FmtIntFromInterface(maps["output_token"])
//totalToken := common.FmtIntFromInterface(maps["total_token"])
//maxToken := common.FmtIntFromInterface(maps["total_token_max"])
//minToken := common.FmtIntFromInterface(maps["total_token_min"])
resultMap[key] = &FluxStatistics{
Total: total,
Success: success,
ProxyTotal: pTotal,
ProxySuccess: pSuccess,
TotalTiming: totalTiming,
MaxTiming: maxMinTiming,
MinTiming: minTiming,
TotalRequest: totalRequest,
RequestMax: maxRequest,
RequestMin: minRequest,
Total: total,
Success: success,
ProxyTotal: pTotal,
ProxySuccess: pSuccess,
TotalTiming: totalTiming,
MaxTiming: maxMinTiming,
MinTiming: minTiming,
TotalRequest: totalRequest,
RequestMax: maxRequest,
RequestMin: minRequest,
TotalResponse: totalResponse,
ResponseMax: maxResponse,
ResponseMin: minResponse,
TotalToken: inputToken + outputToken,
}
}
@@ -128,10 +141,10 @@ func (f *fluxQuery) CommonProxyStatistics(ctx context.Context, queryApi api.Quer
return resultMap, nil
}
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string) ([]time.Time, map[string][]int64, error) {
func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters string, dataFields []string, every, windowOffset string, fn AggregateFn) ([]time.Time, map[string][]int64, error) {
fieldConditions := f.assembleTendencyFieldCondition(dataFields)
//拼装请求
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset)
query := f.assembleTendencyFlux(start, end, bucket, table, filters, fieldConditions, every, windowOffset, fn)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
@@ -148,21 +161,46 @@ func (f *fluxQuery) CommonTendency(ctx context.Context, queryApi api.QueryAPI, s
//初始返回内容
dates := make([]time.Time, 0, len(resultList))
resultMap := make(map[string][]int64, len(dataFields))
for _, field := range dataFields {
resultMap[field] = make([]int64, 0, len(resultList))
}
for _, res := range resultList {
for _, field := range dataFields {
resultMap[field] = append(resultMap[field], common.FmtIntFromInterface(res[field]))
}
t, _ := res["_time"].(time.Time)
dates = append(dates, t)
dates = append(dates, t.In(time.Local))
}
return dates, resultMap, nil
}
func (f *fluxQuery) CommonTendencyTag(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, table, filters, every, offset, tag string) (int64, map[time.Time]int64, error) {
query := f.assembleTendencyTagFlux(start, end, bucket, table, filters, every, offset, tag)
log.Info("flux sql=", query)
result, err := queryApi.Query(ctx, query)
if err != nil {
log.Error("flux err=", err)
return 0, nil, err
}
dateMap := map[time.Time]map[string]struct{}{}
tagMap := make(map[string]struct{})
defer result.Close()
for result.Next() {
date := result.Record().Values()["_start"].(time.Time).In(time.Local)
if _, ok := dateMap[date]; !ok {
dateMap[date] = map[string]struct{}{}
}
if vv, ok := result.Record().Values()[tag]; ok {
v := vv.(string)
tagMap[v] = struct{}{}
dateMap[date][v] = struct{}{}
}
}
returnMap := make(map[time.Time]int64)
for k, v := range dateMap {
returnMap[k] = int64(len(v))
}
return int64(len(tagMap)), returnMap, nil
}
func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI, start, end time.Time, bucket, filters string, fieldsConf *StatisticsFilterConf) (map[string]interface{}, error) {
query := f.getCircularMapFlux(start, end, bucket, filters, fieldsConf)
@@ -172,6 +210,7 @@ func (f *fluxQuery) CommonQueryOnce(ctx context.Context, queryApi api.QueryAPI,
log.Error("flux err=", err)
return nil, err
}
defer result.Close()
for result.Next() {
return result.Record().Values(), nil
@@ -270,7 +309,7 @@ from(bucket: "%s")
}
return fmt.Sprintf(`
union(tables: [
union(tables: [
%s
])
|> pivot(rowKey: ["%s"], columnKey: ["_field"], valueColumn: "_value")
@@ -278,23 +317,59 @@ union(tables: [
`, strings.Join(streams, ",\n"), groupBy, limitStr)
}
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every string, windowOffset string) string {
type AggregateFn string
const (
SumFn AggregateFn = "sum"
MaxFn AggregateFn = "max"
MinFn AggregateFn = "min"
AvgFn AggregateFn = "mean"
)
var (
fns = map[AggregateFn]struct{}{
SumFn: {},
MaxFn: {},
MinFn: {},
}
)
func (f *fluxQuery) assembleTendencyFlux(start, end time.Time, bucket, table, filters, fieldConditions, every, windowOffset string, fn AggregateFn) string {
windowOffsetFlux := ""
if windowOffset != "" {
windowOffsetFlux = fmt.Sprintf(", offset: %s", windowOffset)
}
if _, ok := fns[fn]; !ok {
fn = SumFn
}
return fmt.Sprintf(`from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
%s
|> group(columns: ["_field"])
|> aggregateWindow(every: %s, fn: sum, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> aggregateWindow(every: %s, fn: %s, location: {offset: 0ns, zone: "Asia/Shanghai"}, timeSrc: "_start"%s)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")`, bucket, start.Unix(), end.Unix(), table,
filters, fieldConditions, every, windowOffsetFlux)
filters, fieldConditions, every, string(fn), windowOffsetFlux)
}
func (f *fluxQuery) assembleTendencyTagFlux(start, end time.Time, bucket, table, filters string, every, offset, tag string) string {
windowOffset := ""
if len(offset) > 0 {
windowOffset = fmt.Sprintf(", offset: %s", offset)
}
return fmt.Sprintf(`
from(bucket: "%s")
|> range(start: %d, stop: %d)
|> filter(fn: (r) => r["_measurement"] == "%s")
%s
|> keep(columns: ["_time", "%s"])
|> window(every: %s%s)
|> distinct(column: "%s")`, bucket, start.Unix(), end.Unix(), table, filters, tag, every, windowOffset, tag)
}
// assembleTendencyFieldCondition 封装趋势图需要的Field数据
func (f *fluxQuery) assembleTendencyFieldCondition(fieldConditions []string) string {
/*
@@ -2,22 +2,30 @@ package flux
// FluxStatistics flux统计通用字段
type FluxStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
ProxyTotal int64 `json:"p_total"` //转发总数
ProxySuccess int64 `json:"p_success"` //转发成功
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最大响应时间
MinTiming int64 `json:"timing_min"` //最响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最大流量
RequestMin int64 `json:"request_min"` //最流量
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"` //2xx
ProxyTotal int64 `json:"p_total"` //转发
ProxySuccess int64 `json:"p_success"` //转发成功数
TotalTiming int64 `json:"timing"` //平均响应时间
MaxTiming int64 `json:"timing_max"` //最响应时间
MinTiming int64 `json:"timing_min"` //最小响应时间
TotalRequest int64 `json:"request"` //总请求流量
RequestMax int64 `json:"request_max"` //最流量
RequestMin int64 `json:"request_min"` //最小流量
TotalResponse int64 `json:"response"` //总请求流量
ResponseMax int64 `json:"response_max"` //最大流量
ResponseMin int64 `json:"response_min"` //最小流量
TotalToken int64 `json:"total_token"` //总token流量
TokenMax int64 `json:"total_token_max"` //最大token流量
TokenMin int64 `json:"total_token_min"` //最小token流量
}
// FluxWarnStatistics flux统计告警通用字段
type FluxWarnStatistics struct {
Total int64 `json:"total"` //总数
Success int64 `json:"success"` //成功数
S2xx int64 `json:"s2xx"`
S4xx int64 `json:"s4xx"`
S5xx int64 `json:"s5xx"`
ProxyTotal int64 `json:"p_total"` //转发总数
@@ -0,0 +1,309 @@
-
task_name: "apinto_day_request_v1"
cron: "0 0 * * *"
offset: "2m30s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_day_proxy_v1"
cron: "0 0 * * *"
offset: "2m45s"
flux: |
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/hour")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/day",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -0,0 +1,309 @@
-
task_name: "apinto_hour_request_v1"
cron: "0 * * * *"
offset: "1m30s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> mean()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_hour_proxy_v1"
cron: "0 * * * *"
offset: "1m45s"
flux: |
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> sum()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> max()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/minute")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
"_measurement",
],
)
|> min()
|> to(
bucket: "apinto/hour",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -0,0 +1,307 @@
-
task_name: "apinto_week_request_v1"
cron: "0 0 * * 1"
offset: "3m30s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "total" or r._field == "success" or r._field == "s2xx" or r._field == "s4xx" or r._field == "s5xx"
or
r._field == "timing" or r._field == "request" or r._field == "response" or r._field
==
"retry"
or r._field == "total_token" or r._field == "input_token" or r._field == "output_token",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_max" or r._field == "request_max" or r._field == "response_max"
or
r._field == "retry_max"
or
r._field == "input_token_max" or r._field == "output_token_max" or r._field == "total_token_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_min" or r._field == "request_min" or r._field == "response_min"
or
r._field == "retry_min"
or
r._field == "input_token_min" or r._field == "output_token_min" or r._field == "total_token_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "request")
|> filter(
fn: (r) =>
r._field == "timing_avg" or r._field == "request_avg" or r._field == "response_avg"
or
r._field == "input_token_avg" or r._field == "output_token_avg" or r._field == "total_token_avg",
)
|> group(
columns: [
"api",
"app",
"upstream",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> mean()
|> set(key: "_measurement", value: "request")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
-
task_name: "apinto_week_proxy_v1"
cron: "0 0 * * 1"
offset: "3m45s"
flux: |
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_total" or r._field == "p_success" or r._field == "p_s2xx" or r._field == "p_s4xx" or r._field
==
"p_s5xx" or r._field == "p_timing" or r._field == "p_request" or r._field
==
"p_response" or r._field == "p_retry",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> sum()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_max" or r._field == "p_request_max" or r._field
==
"p_response_max" or r._field == "p_retry_max",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> max()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
from(bucket: "apinto/day")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "proxy")
|> filter(
fn: (r) =>
r._field == "p_timing_min" or r._field == "p_request_min" or r._field
==
"p_response_min" or r._field == "p_retry_min",
)
|> group(
columns: [
"api",
"app",
"upstream",
"addr",
"method",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
"_field",
],
)
|> min()
|> set(key: "_measurement", value: "proxy")
|> to(
bucket: "apinto/week",
timeColumn: "_start",
tagColumns: [
"api",
"app",
"method",
"upstream",
"addr",
"node",
"cluster",
"provider",
"api_kind",
"status_code",
],
)
@@ -1,13 +1,18 @@
package flux
import (
"embed"
_ "embed"
"fmt"
"strings"
"gopkg.in/yaml.v3"
"github.com/eolinker/eosc/log"
yaml "gopkg.in/yaml.v3"
)
//go:embed influxdb_config/tasks.yaml
var tasksData []byte
//go:embed tasks/*.yaml
var taskReader embed.FS
var (
taskList []*TaskConf
@@ -22,9 +27,28 @@ type TaskConf struct {
func initTasksConfig() {
conf := make([]*TaskConf, 0, 15)
err := yaml.Unmarshal(tasksData, &conf)
files, err := taskReader.ReadDir("tasks")
if err != nil {
panic(err)
panic(fmt.Sprintf("read tasks dir error: %v", err))
}
for _, file := range files {
if file.IsDir() || !strings.HasSuffix(file.Name(), ".yaml") {
continue
}
name := fmt.Sprintf("tasks/%s", file.Name())
data, err := taskReader.ReadFile(name)
if err != nil {
log.Errorf("read file(%s) error: %v", name, err)
continue
}
tmp := make([]*TaskConf, 0, 15)
err = yaml.Unmarshal(data, &tmp)
if err != nil {
log.Errorf("unmarshal file(%s) error: %v", name, err)
continue
}
conf = append(conf, tmp...)
}
taskList = conf
}
+14 -6
View File
@@ -15,9 +15,9 @@ const (
tenDay = 10 * oneDay
oneYear = 365 * oneDay
bucketMinuteRetention = (7 - 1) * oneDay
bucketHourRetention = (90 - 1) * oneDay
bucketDayRetention = (5*365 - 1) * oneDay
bucketMinuteRetention = (7) * oneDay
bucketHourRetention = (90) * oneDay
bucketDayRetention = (5 * 365) * oneDay
bucketMinute = "apinto/minute"
bucketHour = "apinto/hour"
@@ -127,11 +127,11 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
switch minimumBucket {
case bucketMinute:
offset := ""
offsetTime := startTime.Minute() % 5
offsetTime := startTime.Minute() % 10
if offsetTime != 0 {
offset = fmt.Sprintf("%dm", offsetTime)
}
return startTime, "5m", offset, bucketMinute
return startTime, "10m", offset, bucketMinute
case bucketHour:
newStart := formatStartTimeHour(startTime, location)
@@ -148,7 +148,15 @@ func getTimeIntervalAndBucket(startTime, endTime time.Time) (time.Time, string,
} else if diff <= tenDay {
switch minimumBucket {
case bucketMinute, bucketHour:
case bucketMinute:
offset := ""
offsetTime := startTime.Minute()
if offsetTime != 0 {
offset = fmt.Sprintf("%dm", offsetTime)
}
return startTime, "1h", offset, bucketMinute
case bucketHour:
newStart := formatStartTimeHour(startTime, location)
return newStart, "1h", "", bucketHour
case bucketDay:
+98
View File
@@ -138,3 +138,101 @@ type MonitorCluster struct {
Name string `json:"name"`
Enable bool `json:"enable"`
}
type ChartOverview struct {
}
type StatusCodeOverview struct {
Status2xx int64 `json:"2xx"` //状态码2xx数
Status4xx int64 `json:"4xx"`
Status5xx int64 `json:"5xx"` //状态码5xx数
}
type TokenOverview struct {
TotalToken int64 `json:"total_token"` //总token流量
OutputToken int64 `json:"output_token"`
InputToken int64 `json:"input_token"` //最小token流量
}
type TokenFloatOverview struct {
TotalToken float64 `json:"total_token"` //总token流量
OutputToken float64 `json:"output_token"`
InputToken float64 `json:"input_token"` //最小token流量
}
type ChartAIOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"`
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
RequestTotal int64 `json:"request_total"`
Request2xxTotal int64 `json:"request_2xx_total"`
Request4xxTotal int64 `json:"request_4xx_total"`
Request5xxTotal int64 `json:"request_5xx_total"`
TokenTotal int64 `json:"token_total"` //总token流量
InputTokenTotal int64 `json:"input_token_total"`
OutputTokenTotal int64 `json:"output_token_total"` //最大token流量
TokenOverview []*TokenOverview `json:"token_overview"` //token概况
AvgTokenOverview []float64 `json:"avg_token_overview"`
AvgTokenPerSubscriberOverview []*TokenFloatOverview `json:"avg_token_per_subscriber_overview"`
AvgToken float64 `json:"avg_token"`
MaxToken float64 `json:"max_token"`
MinToken float64 `json:"min_token"`
Date []string `json:"date"`
MaxTokenPerSubscriber float64 `json:"max_token_per_subscriber"`
MinTokenPerSubscriber float64 `json:"min_token_per_subscriber"`
}
type ChartRestOverview struct {
RequestOverview []*StatusCodeOverview `json:"request_overview"` //请求概况
AvgRequestPerSubscriberOverview []float64 `json:"avg_request_per_subscriber_overview"` //平均响应时间概况
MaxRequestPerSubscriber float64 `json:"max_request_per_subscriber"`
MinRequestPerSubscriber float64 `json:"min_request_per_subscriber"`
RequestTotal int64 `json:"request_total"`
Request2xxTotal int64 `json:"request_2xx_total"`
Request4xxTotal int64 `json:"request_4xx_total"`
Request5xxTotal int64 `json:"request_5xx_total"`
TrafficOverview []*StatusCodeOverview `json:"traffic_overview"` //流量概况
Traffic2xxTotal int64 `json:"traffic_2xx_total"`
Traffic4xxTotal int64 `json:"traffic_4xx_total"` //流量概况
Traffic5xxTotal int64 `json:"traffic_5xx_total"` //流量概况
AvgResponseTimeOverview []int64 `json:"avg_response_time_overview"` //平均响应时间概况
AvgTrafficPerSubscriberOverview []float64 `json:"avg_traffic_per_subscriber_overview"`
TrafficTotal int64 `json:"traffic_total"`
AvgResponseTime int64 `json:"avg_response_time"` //平均响应时间
MaxResponseTime int64 `json:"max_response_time"` //最大响应时间
MinResponseTime int64 `json:"min_response_time"` //最小响应时间
Date []string `json:"date"`
MaxTrafficPerSubscriber float64 `json:"max_traffic_per_subscriber"`
MinTrafficPerSubscriber float64 `json:"min_traffic_per_subscriber"`
}
type ServiceChartRestOverview struct {
EnableMCP bool `json:"enable_mcp"`
SubscriberNum int64 `json:"subscriber_num"`
APINum int64 `json:"api_num"`
ServiceKind string `json:"service_kind"`
AvailableMonitor bool `json:"available_monitor"`
*ChartRestOverview
}
type ServiceChartAIOverview struct {
EnableMCP bool `json:"enable_mcp"`
SubscriberNum int64 `json:"subscriber_num"`
APINum int64 `json:"api_num"`
ServiceKind string `json:"service_kind"`
AvailableMonitor bool `json:"available_monitor"`
*ChartAIOverview
}
type TopN struct {
Id string `json:"id"`
Name string `json:"name"`
Request string `json:"request"`
Traffic string `json:"traffic,omitempty"`
Token string `json:"token,omitempty"`
}
+471 -17
View File
@@ -5,9 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"
"github.com/APIParkLab/APIPark/common"
"github.com/APIParkLab/APIPark/gateway"
"github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/auto"
@@ -43,6 +47,439 @@ type imlMonitorStatisticModule struct {
apiService api.IAPIService `autowired:""`
}
func (i *imlMonitorStatisticModule) genOverviewWhere(ctx context.Context, serviceId string, apiKind []string) ([]monitor.MonWhereItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
if err != nil {
return nil, err
}
wheres, err := i.genCommonWheres(ctx, clusterId)
if err != nil {
return nil, err
}
if serviceId != "" {
wheres = append(wheres, monitor.MonWhereItem{
Key: "provider",
Operation: "=",
Values: []string{serviceId},
})
}
if len(apiKind) > 0 {
wheres = append(wheres, monitor.MonWhereItem{
Key: "api_kind",
Operation: "in",
Values: apiKind,
})
}
return wheres, nil
}
func (i *imlMonitorStatisticModule) AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"ai"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(3)
errChan := make(chan error, 3)
result := new(monitor_dto.ChartAIOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for index, item := range items {
consumerNum := consumerMap[date[index]]
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
if result.MinRequestPerSubscriber == 0 || result.MinRequestPerSubscriber > avgRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.RequestTotal = summary.StatusTotal
result.Request2xxTotal = summary.Status2xx
result.Request4xxTotal = summary.Status4xx
result.Request5xxTotal = summary.Status5xx
}()
sumResponseTimes := make([]int64, 0)
go func() {
defer wg.Done()
_, _, items, err := executor.SumResponseTimeOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
for _, item := range items {
sumResponseTimes = append(sumResponseTimes, item)
}
}()
totalTokens := make([]int64, 0)
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
date, summary, items, err := executor.TokenOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.TokenOverview = make([]*monitor_dto.TokenOverview, 0, len(items))
result.AvgTokenOverview = make([]float64, 0, len(items))
result.AvgTokenPerSubscriberOverview = make([]*monitor_dto.TokenFloatOverview, 0, len(items))
var maxToken, minToken int64 = 0, 0
for index, item := range items {
if maxToken < item.TotalToken {
maxToken = item.TotalToken
}
if minToken == 0 || minToken > item.TotalToken {
minToken = item.TotalToken
}
result.TokenOverview = append(result.TokenOverview, &monitor_dto.TokenOverview{
TotalToken: item.TotalToken,
OutputToken: item.OutputToken,
InputToken: item.InputToken,
})
totalTokens = append(totalTokens, item.TotalToken)
consumerNum := consumerMap[date[index]]
avgTotalPerSubscriber := 0.0
avgOutputPerSubscriber := 0.0
avgInputPerSubscriber := 0.0
if consumerNum != 0 {
avgTotalPerSubscriber = float64(item.TotalToken) / float64(consumerNum)
avgOutputPerSubscriber = float64(item.OutputToken) / float64(consumerNum)
avgInputPerSubscriber = float64(item.InputToken) / float64(consumerNum)
if avgTotalPerSubscriber > result.MaxTokenPerSubscriber {
result.MaxTokenPerSubscriber = avgTotalPerSubscriber
}
if result.MinTokenPerSubscriber == 0 || result.MinTokenPerSubscriber > avgTotalPerSubscriber {
result.MinTokenPerSubscriber = avgTotalPerSubscriber
}
}
result.AvgTokenPerSubscriberOverview = append(result.AvgTokenPerSubscriberOverview, &monitor_dto.TokenFloatOverview{
TotalToken: avgTotalPerSubscriber,
OutputToken: avgOutputPerSubscriber,
InputToken: avgInputPerSubscriber,
})
}
result.TokenTotal = summary.TotalToken
result.InputTokenTotal = summary.InputToken
result.OutputTokenTotal = summary.OutputToken
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 3)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
sumResponseTime := 0.0
var maxTokenPerSecond, minTokenPerSecond float64 = 0, 0
for index, token := range totalTokens {
var p float64 = 0
if len(sumResponseTimes) > index && sumResponseTimes[index] > 0 {
p = math.Round(float64(token)*1000*100/float64(sumResponseTimes[index])) / 100
sumResponseTime += float64(sumResponseTimes[index])
}
result.AvgTokenOverview = append(result.AvgTokenOverview, p)
if maxTokenPerSecond < p {
maxTokenPerSecond = p
}
if p > 0 && (minTokenPerSecond == 0 || minTokenPerSecond > p) {
minTokenPerSecond = p
}
}
if sumResponseTime > 0 {
result.AvgToken = math.Round(float64(result.TokenTotal)*1000*100/sumResponseTime) / 100
}
result.MaxToken = maxTokenPerSecond
result.MinToken = minTokenPerSecond
return result, nil
}
func (i *imlMonitorStatisticModule) RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{"rest"})
if err != nil {
return nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, err
}
_, consumerMap, err := executor.ConsumerOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(3)
errChan := make(chan error, 2)
result := new(monitor_dto.ChartRestOverview)
go func() {
defer wg.Done()
date, summary, items, err := executor.RequestOverview(ctx, formatTimeByMinute(start), formatTimeByMinute(end), wheres)
if err != nil {
errChan <- err
return
}
result.Date = utils.SliceToSlice(date, func(t time.Time) string {
return t.Format("2006/01/02 15:04")
})
result.AvgRequestPerSubscriberOverview = make([]float64, 0, len(items))
result.RequestOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
for index, item := range items {
consumerNum := consumerMap[date[index]]
avgRequestPerSubscriber := 0.0
if consumerNum != 0 {
avgRequestPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgRequestPerSubscriber > result.MaxRequestPerSubscriber {
result.MaxRequestPerSubscriber = avgRequestPerSubscriber
}
if result.MinRequestPerSubscriber == 0 || avgRequestPerSubscriber < result.MinRequestPerSubscriber {
result.MinRequestPerSubscriber = avgRequestPerSubscriber
}
}
result.AvgRequestPerSubscriberOverview = append(result.AvgRequestPerSubscriberOverview, avgRequestPerSubscriber)
result.RequestOverview = append(result.RequestOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
}
result.RequestTotal = summary.StatusTotal
result.Request2xxTotal = summary.Status2xx
result.Request4xxTotal = summary.Status4xx
result.Request5xxTotal = summary.Status5xx
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
_, summary, items, err := executor.AvgResponseTimeOverview(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
for _, item := range items {
if item > result.MaxResponseTime {
result.MaxResponseTime = item
}
if item > 0 && (result.MinResponseTime == 0 || item < result.MinResponseTime) {
result.MinResponseTime = item
}
}
result.AvgResponseTimeOverview = items
result.AvgResponseTime = summary.Avg
}()
go func() {
defer wg.Done()
startTime := formatTimeByMinute(start)
endTime := formatTimeByMinute(end)
date, summary, items, err := executor.TrafficOverviewByStatusCode(ctx, startTime, endTime, wheres)
if err != nil {
errChan <- err
return
}
result.TrafficOverview = make([]*monitor_dto.StatusCodeOverview, 0, len(items))
result.AvgTrafficPerSubscriberOverview = make([]float64, 0, len(items))
for index, item := range items {
result.TrafficOverview = append(result.TrafficOverview, &monitor_dto.StatusCodeOverview{
Status2xx: item.Status2xx,
Status4xx: item.Status4xx,
Status5xx: item.Status5xx,
})
consumerNum := consumerMap[date[index]]
avgTrafficPerSubscriber := 0.0
if consumerNum != 0 {
avgTrafficPerSubscriber = float64(item.StatusTotal) / float64(consumerNum)
if avgTrafficPerSubscriber > result.MaxTrafficPerSubscriber {
result.MaxTrafficPerSubscriber = avgTrafficPerSubscriber
}
if result.MinTrafficPerSubscriber == 0 || result.MinTrafficPerSubscriber > avgTrafficPerSubscriber {
result.MinTrafficPerSubscriber = avgTrafficPerSubscriber
}
}
result.AvgTrafficPerSubscriberOverview = append(result.AvgTrafficPerSubscriberOverview, avgTrafficPerSubscriber)
}
result.TrafficTotal = summary.StatusTotal
result.Traffic2xxTotal = summary.Status2xx
result.Traffic4xxTotal = summary.Status4xx
result.Traffic5xxTotal = summary.Status5xx
}()
go func() {
wg.Wait()
close(errChan)
}()
errs := make([]error, 0, 3)
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred: %v", errs)
}
return result, nil
}
func generateTopN(id string, name string, item *monitor.TopN, apiKind string) *monitor_dto.TopN {
n := &monitor_dto.TopN{
Id: id,
Name: name,
Request: common.FormatCountInt64(item.Request),
}
switch apiKind {
case "rest":
n.Traffic = common.FormatByte(item.Traffic)
case "ai":
n.Token = common.FormatCountInt64(item.Token)
}
return n
}
func (i *imlMonitorStatisticModule) Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error) {
wheres, err := i.genOverviewWhere(ctx, serviceId, []string{apiKind})
if err != nil {
return nil, nil, err
}
executor, err := i.getExecutor(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, nil, err
}
errChan := make(chan error, 2)
var wg sync.WaitGroup
apisResult, consumersResult := make([]*monitor_dto.TopN, 0), make([]*monitor_dto.TopN, 0)
var errs []error
wg.Add(2)
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "api", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
apiIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apis, err := i.apiService.ListInfo(ctx, apiIds...)
if err != nil {
errChan <- err
return
}
apiMap := utils.SliceToMap(apis, func(t *api.Info) string {
return t.UUID
})
for _, item := range result {
if v, ok := apiMap[item.Key]; ok {
apisResult = append(apisResult, generateTopN(v.UUID, v.Name, item, apiKind))
}
}
}()
go func() {
defer wg.Done()
result, err := executor.TopN(ctx, formatTimeByMinute(start), formatTimeByMinute(end), limit, "app", wheres)
if err != nil {
errChan <- err
return
}
if len(result) < 1 {
return
}
appIds := utils.SliceToSlice(result, func(t *monitor.TopN) string {
return t.Key
})
apps, err := i.serviceService.AppList(ctx, appIds...)
if err != nil {
errChan <- err
return
}
appMap := utils.SliceToMap(apps, func(t *service.Service) string {
return t.Id
})
appMap["apipark-global"] = &service.Service{
Id: "apipark-global",
Name: "System Consumer",
}
for _, item := range result {
if v, ok := appMap[item.Key]; ok {
consumersResult = append(consumersResult, generateTopN(v.Id, v.Name, item, apiKind))
}
}
}()
// 收集所有错误
go func() {
wg.Wait()
close(errChan)
}()
// 收集错误
for err := range errChan {
errs = append(errs, err)
}
if len(errs) > 0 {
return nil, nil, fmt.Errorf("errors occurred: %v", errs)
}
return apisResult, consumersResult, nil
}
func (i *imlMonitorStatisticModule) ApiStatistics(ctx context.Context, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error) {
clusterId := cluster.DefaultClusterID
_, err := i.clusterService.Get(ctx, clusterId)
@@ -142,6 +579,10 @@ func (i *imlMonitorStatisticModule) SubscriberStatistics(ctx context.Context, in
if err != nil {
return nil, err
}
apps = append(apps, &service.Service{
Id: "apipark-global",
Name: "System Consumer",
})
appIds := utils.SliceToSlice(apps, func(p *service.Service) string {
return p.Id
})
@@ -350,18 +791,27 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
if err != nil {
return nil, err
}
var service []*service.Service
var services []*service.Service
switch groupBy {
case "app":
service, err = i.serviceService.AppList(ctx)
services, err = i.serviceService.AppList(ctx)
if err != nil {
return nil, err
}
services = append(services, &service.Service{
Id: "apipark-global",
Name: "System Consumer",
})
case "provider":
service, err = i.serviceService.ServiceList(ctx)
services, err = i.serviceService.ServiceList(ctx)
if err != nil {
return nil, err
}
default:
return nil, errors.New("invalid group by")
}
if err != nil {
return nil, err
}
wheres, err := i.genCommonWheres(ctx, clusterId)
if err != nil {
@@ -379,7 +829,7 @@ func (i *imlMonitorStatisticModule) statisticOnApi(ctx context.Context, clusterI
}
result := make([]*monitor_dto.ServiceStatisticBasicItem, 0, len(statisticMap))
for _, item := range service {
for _, item := range services {
statisticItem := &monitor_dto.ServiceStatisticBasicItem{
Id: item.Id,
@@ -445,17 +895,21 @@ func (i *imlMonitorStatisticModule) ApiStatisticsOnSubscriber(ctx context.Contex
if err != nil {
return nil, err
}
// 根据订阅ID查询订阅的服务列表
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
if err != nil {
return nil, err
}
serviceIds := utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
return t.Service
})
if len(serviceIds) < 1 {
return nil, nil
serviceIds := make([]string, 0)
if subscriberId != "apipark-global" {
// 根据订阅ID查询订阅的服务列表
subscriptions, err := i.subscribeService.MySubscribeServices(ctx, subscriberId, nil)
if err != nil {
return nil, err
}
serviceIds = utils.SliceToSlice(subscriptions, func(t *subscribe.Subscribe) string {
return t.Service
})
if len(serviceIds) < 1 {
return nil, nil
}
}
apiInfos, err := i.apiService.ListInfoForServices(ctx, serviceIds...)
if err != nil {
return nil, err
+4
View File
@@ -43,6 +43,10 @@ type IMonitorStatisticModule interface {
ApiStatisticsOnProvider(ctx context.Context, providerId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
ApiStatisticsOnSubscriber(ctx context.Context, subscriberId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ApiStatisticBasicItem, error)
SubscriberStatisticsOnApi(ctx context.Context, apiId string, input *monitor_dto.StatisticInput) ([]*monitor_dto.ServiceStatisticBasicItem, error)
AIChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartAIOverview, error)
RestChartOverview(ctx context.Context, serviceId string, start int64, end int64) (*monitor_dto.ChartRestOverview, error)
Top(ctx context.Context, serviceId string, start int64, end int64, limit int, apiKind string) ([]*monitor_dto.TopN, []*monitor_dto.TopN, error)
}
type IMonitorConfigModule interface {
+29 -22
View File
@@ -66,28 +66,28 @@ type imlPublishModule struct {
func (m *imlPublishModule) initGateway(ctx context.Context, partitionId string, clientDriver gateway.IClientDriver) error {
return nil
projects, err := m.serviceService.List(ctx)
if err != nil {
return err
}
projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
return p.Id
})
for _, projectId := range projectIds {
releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
if err != nil {
return err
}
if releaseInfo == nil {
continue
}
err = clientDriver.Project().Online(ctx, releaseInfo)
if err != nil {
return err
}
}
return nil
//projects, err := m.serviceService.List(ctx)
//if err != nil {
// return err
//}
//projectIds := utils.SliceToSlice(projects, func(p *service.Service) string {
// return p.Id
//})
//for _, projectId := range projectIds {
// releaseInfo, err := m.GetProjectRelease(ctx, projectId, partitionId)
// if err != nil {
// return err
// }
// if releaseInfo == nil {
// continue
// }
//
// err = clientDriver.Project().Online(ctx, releaseInfo)
// if err != nil {
// return err
// }
//}
//return nil
}
func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID string, commitId string) (*gateway.ProjectRelease, error) {
@@ -110,6 +110,10 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
strategyCommitIds = append(strategyCommitIds, c.Commit)
}
}
serviceInfo, err := m.serviceService.Get(ctx, projectID)
if err != nil {
return nil, err
}
apiInfos, err := m.apiService.ListInfo(ctx, apiIds...)
if err != nil {
@@ -140,6 +144,9 @@ func (m *imlPublishModule) getProjectRelease(ctx context.Context, projectID stri
},
Path: a.Path,
Methods: a.Methods,
Labels: map[string]string{
"api_kind": serviceInfo.Kind.String(),
},
//Service: a.Upstream,
}
if hasUpstream {
+1
View File
@@ -11,6 +11,7 @@ import (
type Item struct {
Id string `json:"id"`
Name string `json:"name"`
Methods []string `json:"methods"`
Protocols []string `json:"protocols"`
Path string `json:"request_path"`
+1
View File
@@ -205,6 +205,7 @@ func (i *imlRouterModule) Search(ctx context.Context, keyword string, serviceId
}
return &router_dto.Item{
Id: item.UUID,
Name: item.Name,
Methods: item.Methods,
Protocols: protocols,
Path: item.Path,
+77
View File
@@ -221,3 +221,80 @@ type ExportApp struct {
Description string `json:"description"`
Team string `json:"team"`
}
type Overview struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
EnableMCP bool `json:"enable_mcp"`
ServiceKind string `json:"service_kind"`
SubscriberNum int64 `json:"subscriber_num"`
InvokeNum int64 `json:"invoke_num"`
Logo string `json:"logo"`
AvailableMonitor bool `json:"available_monitor"`
IsReleased bool `json:"is_released"`
Catalogue auto.Label `json:"catalogue" aolabel:"catalogue"`
APINum int64 `json:"api_num"`
}
type AILogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Token int64 `json:"token"`
TokenPerSecond int64 `json:"token_per_second"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
}
type RestLogItem struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Status int64 `json:"status"`
LogTime auto.TimeLabel `json:"log_time"`
Ip string `json:"ip"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
}
type RestLogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
ResponseTime string `json:"response_time"`
Traffic string `json:"traffic"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginRequest `json:"request"`
Response OriginRequest `json:"response"`
}
type AILogInfo struct {
Id string `json:"id"`
API auto.Label `json:"api" aolabel:"api"`
Consumer auto.Label `json:"consumer" aolabel:"service"`
IsSystemConsumer bool `json:"is_system_consumer"`
Status int64 `json:"status"`
Ip string `json:"ip"`
Provider auto.Label `json:"provider" aolabel:"ai_provider"`
Model string `json:"model"`
LogTime auto.TimeLabel `json:"log_time"`
Request OriginAIRequest `json:"request"`
Response OriginAIRequest `json:"response"`
}
type OriginRequest struct {
Header string `json:"header"`
Origin string `json:"origin"`
Body string `json:"body"`
}
type OriginAIRequest struct {
OriginRequest
Token int64 `json:"token"`
}
+215 -6
View File
@@ -5,10 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
"strings"
"time"
"github.com/APIParkLab/APIPark/common"
"github.com/mitchellh/mapstructure"
"github.com/eolinker/go-common/register"
@@ -27,6 +30,7 @@ import (
model_runtime "github.com/APIParkLab/APIPark/ai-provider/model-runtime"
"github.com/APIParkLab/APIPark/resources/access"
log_service "github.com/APIParkLab/APIPark/service/log"
"github.com/eolinker/eosc/log"
"github.com/eolinker/go-common/server"
@@ -79,14 +83,217 @@ type imlServiceModule struct {
tagService tag.ITagService `autowired:""`
localModelService ai_local.ILocalModelService `autowired:""`
serviceTagService service_tag.ITagService `autowired:""`
apiService api.IAPIService `autowired:""`
apiDocService api_doc.IAPIDocService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
serviceTagService service_tag.ITagService `autowired:""`
apiService api.IAPIService `autowired:""`
apiDocService api_doc.IAPIDocService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
subscribeServer subscribe.ISubscribeService `autowired:""`
releaseService release.IReleaseService `autowired:""`
serviceModelMappingService service_model_mapping.IServiceModelMappingService `autowired:""`
logService log_service.ILogService `autowired:""`
transaction store.ITransaction `autowired:""`
}
func formatHeader(header string) string {
result, err := url.QueryUnescape(header)
if err != nil {
return header
}
result = strings.ReplaceAll(result, "&", "\n")
result = strings.ReplaceAll(result, "=", ": ")
return result
}
func (i *imlServiceModule) RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
logInfo := &service_dto.RestLogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
ResponseTime: common.FormatTime(info.ResponseTime),
Traffic: common.FormatByte(info.Traffic),
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
Body: info.RequestBody,
},
Response: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
Body: info.ResponseBody,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error) {
c, err := i.clusterService.Get(ctx, cluster.DefaultClusterID)
if err != nil {
return nil, fmt.Errorf("cluster %s not found", cluster.DefaultClusterID)
}
info, err := i.logService.LogInfo(ctx, "loki", c.Cluster, logId)
if err != nil {
return nil, err
}
if info.Service != serviceId {
return nil, errors.New("service not match")
}
response, err := parseAIResponse(info.ResponseBody)
if err != nil {
response = info.ResponseBody
}
logInfo := &service_dto.AILogInfo{
Id: info.ID,
API: auto.UUID(info.API),
Consumer: auto.UUID(info.Consumer),
Status: info.StatusCode,
Ip: info.RemoteIP,
Provider: auto.UUID(info.AIProvider),
Model: info.AIModel,
LogTime: auto.TimeLabel(info.RecordTime),
Request: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.RequestHeader),
Origin: info.RequestBody,
Body: parseAIRequest(info.RequestBody),
},
Token: info.InputToken,
},
Response: service_dto.OriginAIRequest{
OriginRequest: service_dto.OriginRequest{
Header: formatHeader(info.ResponseHeader),
Origin: info.ResponseBody,
Body: response,
},
Token: info.OutputToken,
},
}
if info.Consumer == "apipark-global" {
logInfo.IsSystemConsumer = true
logInfo.Consumer = auto.Label{
Id: info.Consumer,
Name: "System Consumer",
}
}
return logInfo, nil
}
func (i *imlServiceModule) RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.RestLogItem {
item := &service_dto.RestLogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Consumer: auto.UUID(s.Consumer),
ResponseTime: common.FormatTime(s.ResponseTime),
Traffic: common.FormatByte(s.Traffic),
}
if s.Consumer == "apipark-global" {
item.Consumer = auto.Label{
Id: s.Consumer,
Name: "System Consumer",
}
}
return item
}), total, nil
}
func (i *imlServiceModule) AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error) {
list, total, err := i.logService.LogRecordsByService(ctx, serviceId, time.Unix(start, 0), time.Unix(end, 0), page, size)
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_service.Item) *service_dto.AILogItem {
item := &service_dto.AILogItem{
Id: s.ID,
API: auto.UUID(s.API),
Status: s.StatusCode,
LogTime: auto.TimeLabel(s.RecordTime),
Ip: s.RemoteIP,
Token: s.TotalToken,
TokenPerSecond: s.TotalToken * 1000 / s.ResponseTime,
Consumer: auto.UUID(s.Consumer),
Provider: auto.UUID(s.AIProvider),
Model: s.AIModel,
}
if s.Consumer == "apipark-global" {
item.Consumer = auto.Label{
Id: s.Consumer,
Name: "System Consumer",
}
}
return item
}), total, nil
}
func (i *imlServiceModule) ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error) {
info, err := i.serviceService.Get(ctx, id)
if err != nil {
return nil, err
}
apiCountMap, err := i.apiDocService.APICountByServices(ctx, id)
if err != nil {
return nil, err
}
subscribeMap, err := i.subscribeServer.CountMapByService(ctx, subscribe.ApplyStatusSubscribe, id)
if err != nil {
return nil, err
}
result := &service_dto.Overview{
Id: info.Id,
Name: info.Name,
Description: info.Description,
EnableMCP: info.EnableMCP,
ServiceKind: info.Kind.String(),
SubscriberNum: subscribeMap[id],
Logo: info.Logo,
Catalogue: auto.UUID(info.Catalogue),
APINum: apiCountMap[id],
}
_, err = i.releaseService.GetRunning(ctx, id)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
} else {
result.IsReleased = true
}
return result, nil
}
func (i *imlServiceModule) OnInit() {
@@ -746,7 +953,9 @@ func (i *imlServiceModule) Delete(ctx context.Context, id string) error {
Id: id,
})
if err != nil {
return err
if err.Error() != "nil" {
return err
}
}
err = client.Subscribe().Offline(ctx, &gateway.SubscribeRelease{
Service: id,
+10
View File
@@ -34,6 +34,16 @@ type IServiceModule interface {
//MySimple 获取我的简易项目列表
MySimple(ctx context.Context) ([]*service_dto.SimpleServiceItem, error)
ServiceOverview(ctx context.Context, id string) (*service_dto.Overview, error)
ILogModule
}
type ILogModule interface {
AILogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.AILogItem, int64, error)
RestLogs(ctx context.Context, serviceId string, start int64, end int64, page int, size int) ([]*service_dto.RestLogItem, int64, error)
RestLogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.RestLogInfo, error)
AILogInfo(ctx context.Context, serviceId string, logId string) (*service_dto.AILogInfo, error)
}
type IServiceDocModule interface {
+115
View File
@@ -0,0 +1,115 @@
package service
import (
"bufio"
"encoding/json"
"strings"
)
// ChatCompletionChunk represents the structure of a single chunk in the streaming response
type ChatCompletionChunk struct {
Object string `json:"object"`
Choices []Choice `json:"choices"`
}
// ChatCompletion represents the structure of a non-streaming response
type ChatCompletion struct {
Object string `json:"object"`
Choices []FullChoice `json:"choices"`
}
// Choice represents a choice in the streaming chunk
type Choice struct {
Delta Delta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
// FullChoice represents a choice in the non-streaming response
type FullChoice struct {
Message Message `json:"message"`
}
// Delta represents the delta content in a streaming choice
type Delta struct {
Content string `json:"content"`
Role string `json:"role,omitempty"`
}
// Message represents the message content in a non-streaming choice
type Message struct {
Content string `json:"content"`
Role string `json:"role"`
}
// ParseAIResponse parses both streaming and non-streaming AI responses and returns the concatenated content
func parseAIResponse(input string) (string, error) {
// First, try to parse as a non-streaming response
var nonStreaming ChatCompletion
if err := json.Unmarshal([]byte(input), &nonStreaming); err == nil && nonStreaming.Object == "chat.completion" {
var result strings.Builder
for _, choice := range nonStreaming.Choices {
result.WriteString(choice.Message.Content)
}
return result.String(), nil
}
// If not non-streaming, parse as streaming response
var result strings.Builder
scanner := bufio.NewScanner(strings.NewReader(input))
for scanner.Scan() {
line := scanner.Text()
// Skip empty lines or [DONE]
if line == "" || line == "data: [DONE]" {
continue
}
// Check if line starts with "data: "
if !strings.HasPrefix(line, "data: ") {
continue
}
// Extract JSON data
jsonData := strings.TrimPrefix(line, "data: ")
var chunk ChatCompletionChunk
if err := json.Unmarshal([]byte(jsonData), &chunk); err != nil {
return "", err
}
// Process each choice
for _, choice := range chunk.Choices {
// Append content from delta
result.WriteString(choice.Delta.Content)
// Check if this is the final chunk
if choice.FinishReason != nil && *choice.FinishReason == "stop" {
return result.String(), nil
}
}
}
if err := scanner.Err(); err != nil {
return "", err
}
return result.String(), nil
}
func parseAIRequest(ori string) string {
type aiRequest struct {
Messages []struct {
Role string `json:"role"`
Content string `json:"content"`
} `json:"messages"`
}
var req aiRequest
err := json.Unmarshal([]byte(ori), &req)
if err != nil {
return ori
}
size := len(req.Messages)
if size == 0 {
return ""
}
return req.Messages[size-1].Content
}
+5
View File
@@ -22,5 +22,10 @@ func (p *plugin) monitorStatisticApis() []pm3.Api {
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend", []string{"context", "rest:data_type", "query:id", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrend),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/trend/:typ", []string{"context", "rest:data_type", "rest:typ", "query:api", "query:provider", "query:subscriber", "body"}, []string{"tendency", "time_interval"}, p.monitorStatisticController.InvokeTrendInner),
pm3.CreateApiWidthDoc(http.MethodPost, "/api/v1/monitor/:data_type/statistics/:typ", []string{"context", "rest:data_type", "rest:typ", "query:id", "body"}, []string{"statistics"}, p.monitorStatisticController.StatisticsInner),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/rest", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartRestOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/chart/ai", []string{"context", "query:start", "query:end"}, []string{"overview"}, p.monitorStatisticController.ChartAIOverview, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/rest", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.RestTopN, access.SystemAnalysisRunViewView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/monitor/overview/top10/ai", []string{"context", "query:start", "query:end", "query:limit"}, []string{"apis", "consumers"}, p.monitorStatisticController.AITopN, access.SystemAnalysisRunViewView),
}
}
+10
View File
@@ -39,5 +39,15 @@ func (p *plugin) ServiceApis() []pm3.Api {
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/swagger/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/service/apidoc/:id", p.serviceController.Swagger),
pm3.CreateApiSimple(http.MethodGet, "/api/v1/export/openapi/:id", p.serviceController.ExportSwagger),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/monitor/top10", []string{"context", "query:service", "query:start", "query:end"}, []string{"apis", "consumers"}, p.serviceController.Top10, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/ai", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.AIChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/monitor/rest", []string{"context", "query:service", "query:start", "query:end"}, []string{"overview"}, p.serviceController.RestChartOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/overview/basic", []string{"context", "query:service"}, []string{"overview"}, p.serviceController.ServiceOverview, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/ai", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.AILogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/logs/rest", []string{"context", "query:service", "query:start", "query:end", "query:page", "query:page_size"}, []string{"logs", "total"}, p.serviceController.RestLogs, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/rest", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.RestLogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
pm3.CreateApiWidthDoc(http.MethodGet, "/api/v1/service/log/ai", []string{"context", "query:service", "query:log"}, []string{"log"}, p.serviceController.AILogInfo, access.SystemWorkspaceServiceViewAll, access.TeamTeamServiceView),
}
}
+8 -4
View File
@@ -1,10 +1,14 @@
# 名称:apipark通用镜像
# 创建时间:2022-10-25
FROM centos:7.9.2009
MAINTAINER liujian
FROM alpine:latest
RUN sed -i 's|https://dl-cdn.alpinelinux.org/alpine|https://mirrors.aliyun.com/alpine|g' /etc/apk/repositories \
&& apk update \
&& apk add --no-cache curl tzdata bind-tools
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN ln -sf /usr/share/zoneinfo/${TZ} /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
ARG APP
+1 -1
View File
@@ -73,7 +73,7 @@ build_frontend() {
echo_info "Install dependencies..."
pnpm install --registry https://registry.npmmirror.com --dir ./frontend
echo_info "Build frontend..."
cd ./frontend && pnpm run build
cd ./frontend && pnpm run build --verbose
cd ..
else
echo_info "Need not build frontend."
+9 -5
View File
@@ -10,6 +10,11 @@ BuildMode=$3
if [[ "${BuildMode}" == "" ]];then
BuildMode="all"
fi
if [[ "${ARCH}" == "" ]];then
ARCH="amd64"
fi
# 编译可执行文件
./scripts/build.sh "cmd" "" "${BuildMode}" ${ARCH}
@@ -22,12 +27,11 @@ mkdir -p scripts/cmd/ && cp cmd/${APP} scripts/cmd/ && cp cmd/apipark_ai_event_l
VERSION=$(gen_version)
if [[ "${ARCH}" == "" ]];then
ARCH="amd64"
fi
OPTIONS=""
if [[ "${ARCH}" == "arm" ]];then
SYS_ARCH=$(arch)
if [[ (${SYS_ARCH} == "aarch64" || ${SYS_ARCH} == "arm64") && $ARCH == "amd64" ]];then
OPTIONS="--platform=linux/amd64"
elif [[ ${SYS_ARCH} == "amd64" && $ARCH == "arm64" ]];then
OPTIONS="--platform=linux/arm64"
fi
+29 -3
View File
@@ -1,11 +1,10 @@
#!/bin/sh
set -e
source ./init_config.sh
OLD_IFS="$IFS"
IFS=","
arr=(${REDIS_ADDR})
IFS="$OLD_IFS"
@@ -21,10 +20,11 @@ echo -e "redis:" >> config.yml
echo -e " user_name: ${REDIS_USER_NAME}" >> config.yml
echo -e " password: ${REDIS_PWD}" >> config.yml
echo -e " addr: " >> config.yml
for s in ${arr[@]}
for s in $REDIS_ADDR
do
echo -e " - $s" >> config.yml
done
echo -e "nsq:" >> config.yml
echo -e " addr: ${NSQ_ADDR}" >> config.yml
echo -e " topic_prefix: ${NSQ_TOPIC_PREFIX}" >> config.yml
@@ -38,5 +38,31 @@ echo -e " log_period: ${ERROR_PERIOD}" >> config.yml
cat config.yml
nohup ./apipark >> run.log 2>&1 &
wait_for_apipark
nohup ./apipark_ai_event_listen >> run.log 2>&1 &
if [[ ${Init} == "true" ]];then
login_apipark
r=$(is_init)
if [[ $r == "true" ]];then
echo "Already initialized, skipping initialization."
else
wait_for_influxdb
wait_for_apinto
set_cluster
wait_for_influxdb
set_influxdb
set_loki
set_nsq
set_openapi_config
# 重启apipark
kill -9 $(pgrep apipark)
nohup ./apipark >> run.log 2>&1 &
fi
fi
tail -F run.log
+292
View File
@@ -0,0 +1,292 @@
#!/bin/sh
Cookie=""
if [[ "$ApiparkAddress" == "" ]]; then
ApiparkAddress="http://127.0.0.1:8288"
fi
if [[ "${ApintoAddress}" == "" ]]; then
ApintoAddress="http://apipark-apinto:9400"
fi
if [[ "$InfluxdbAddress" == "" ]]; then
InfluxdbAddress="http://apipark-influxdb:8086"
fi
if [[ "$NSQAddress" == "" ]]; then
NSQAddress="apipark-nsq:4150"
fi
if [[ "$LokiAddress" == "" ]]; then
LokiAddress="http://apipark-loki:3100"
fi
echo_fail() {
printf "\e[91m✘ Error:\e[0m $@\n" >&2
}
echo_pass() {
printf "\e[92m✔ Passed:\e[0m $@\n" >&2
}
echo_warn() {
printf "\e[93m⚠ Warning:\e[0m $@\n" >&2
}
echo_pause() {
printf "\e[94m⏸ Pause:\e[0m $1\n" >&2
}
echo_question() {
printf "\e[95m? Question:\e[0m $@\n" >&2
}
echo_info() {
printf "\e[96m Info:\e[0m $1\n" >&2
}
echo_point() {
printf "\e[94m➜ Point:\e[0m $1\n" >&2
}
echo_bullet() {
printf "\e[94m• Step:\e[0m $1\n" >&2
}
echo_wait() {
printf "\e[95m⏳ Waiting:\e[0m $1\n" >&2
}
echo_split() {
echo "" >&2
echo "" >&2
echo -e "\e[94m────────────────────────────────────────────────────────────\e[0m" >&2
}
request_apipark() {
path=$1
body=$2
method=$3
if [[ "$method" == "" ]]; then
method="POST"
fi
if [[ "$Cookie" == "" ]]; then
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -d '$body' \"${ApiparkAddress}${path}\""
echo_info "Executing: $cmd" # 打印命令
response=$(eval "$cmd")
else
cmd="curl -X ${method} -s -i -H \"Content-Type: application/json\" -H \"Cookie: $Cookie\" -d '$body' \"${ApiparkAddress}${path}\""
echo_info "Executing: $cmd" # 打印命令
response=$(eval "$cmd")
fi
echo "$response"
}
request_apinto() {
path="$1"
body="$2"
echo_info "Executing: curl -i -X POST -H \"Content-Type: application/json\" \"${ApintoAddress}${path}\" -d '$body'"
response=$(curl -i -X POST -H "Content-Type: application/json" "${ApintoAddress}${path}" -d "$body")
status_code=$(echo "$response" | grep -E 'HTTP/[0-9.]+ [0-9]+' | awk '{print $2}' || echo "0")
echo_info "$response"
echo "$status_code"
}
login_apipark() {
# 执行登录请求并捕获响应头
body='{"name":"admin","password":"'"${ADMIN_PASSWORD}"'"}'
response=$(request_apipark "/api/v1/account/login/username" "$body")
# 从响应中提取 Set-Cookie 头
cookie=$(echo "$response" | grep -i "Set-Cookie" | sed 's/Set-Cookie: //;s/;.*//')
# 提取 JSON 主体(假设 JSON 在最后一行或响应中可识别)
json_body=$(echo "$response" | grep '^{.*}$')
# 提取 code 值
code=$(echo "$json_body" | sed 's/.*"code":\([0-9]*\).*/\1/')
# 检查 code 是否为 0
if [ "$code" -eq 0 ]; then
Cookie=$cookie
echo_pass "login success"
else
echo_fail "login failed: $json_body"
exit 1
fi
}
set_cluster() {
# 设置集群地址
body='{"manager_address":"'"${ApintoAddress}"'"}'
path="/api/v1/cluster/reset"
response=$(request_apipark "$path" "$body" "PUT")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Set cluster successfully"
else
echo_fail "Set cluster failed: ${response}"
exit 1
fi
}
set_loki() {
# 设置 loki 地址
body='{"config":{"url":"'"${LokiAddress}"'"}}'
path="/api/v1/log/loki"
response=$(request_apipark "$path" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Set loki successfully"
else
echo_fail "Set loki failed: ${response}"
exit 1
fi
}
set_nsq() {
body="{
\"address\": [
\"${NSQAddress}\"
],
\"description\": \"auto init nsqd config\",
\"driver\": \"nsqd\",
\"formatter\": {
\"ai\": [
\"\$ai_provider\",
\"\$ai_model\",
\"\$ai_model_input_token\",
\"\$ai_model_output_token\",
\"\$ai_model_total_token\",
\"\$ai_model_cost\",
\"\$ai_provider_statuses\"
],
\"fields\": [
\"\$time_iso8601\",
\"\$request_id\",
\"\$api\",
\"\$provider\",
\"@ai\"
]
},
\"scopes\": [
\"access_log\"
],
\"topic\": \"apipark_ai_event\",
\"type\": \"json\"
}"
status_code=$(request_apinto "/api/output/ai_event" "$body")
echo "Status code: $status_code"
if [ "$status_code" -eq 200 ]; then
echo_pass "Update nsq successfully"
else
echo_fail "Update nsq failed: ${status_code}"
exit 1
fi
}
set_influxdb() {
if [ -z "$InfluxdbToken" ]; then
echo_fail "Influxdb token is empty"
exit 1
fi
if [ -z "$InfluxdbOrg" ]; then
InfluxdbOrg="apipark"
fi
body='{"driver":"influxdb-v2","config":{"addr":"'"${InfluxdbAddress}"'","org":"'"${InfluxdbOrg}"'","token":"'${InfluxdbToken}'"}}'
response=$(request_apipark "/api/v1/monitor/config" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Update influxdb config successfully"
else
echo_fail "Update influxdb config failed: ${response}"
exit 1
fi
}
retry() {
max_wait="$1"
shift
cmd="$@"
sleep_interval=2
curr_wait=0
until $cmd
do
if [ "$curr_wait" -ge "$max_wait" ]
then
echo "Command '$cmd' failed after $curr_wait seconds."
return 1
else
curr_wait=$((curr_wait + sleep_interval))
sleep "$sleep_interval"
fi
done
}
is_init() {
path="/api/v1/system/general"
method="GET"
response=$(request_apipark "$path" "" "$method")
# 从响应中提取 site_prefix
site_prefix=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"site_prefix":"\{0,1\}\([^"]*\)"\{0,1\}.*/\1/')
if [ -z "$site_prefix" ]; then
echo_pass "No apipark openapi address set"
echo false
else
echo_pass "Apipark openapi address found: $site_prefix"
echo true
fi
}
set_openapi_config() {
IP=$(dig +short myip.opendns.com @resolver1.opendns.com | grep -E '^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$')
if [ -z "$IP" ]; then
echo_fail "Failed to resolve IP address"
exit 1
fi
body='{"site_prefix":"http://'"${IP}"':18288"}'
response=$(request_apipark "/api/v1/system/general" "$body")
# 从响应中提取 code
code=$(echo "${response}" | grep '^{.*}$' | sed 's/.*"code":\([0-9]*\).*/\1/')
if [ "$code" -eq 0 ]; then
echo_pass "Update apipark openapi address successfully"
else
echo_fail "Update apipark openapi address failed: ${response}"
exit 1
fi
}
wait_for() {
waitName=$1
cmd=$2
echo ${cmd}
echo_wait "Waiting for ${waitName} to start..."
retry 30 ${cmd}
if [ $? -eq 0 ]; then
echo_pass "${waitName} has been installed successfully"
else
echo_fail "${waitName} installation failed"
exit 1
fi
}
wait_for_apipark() {
wait_for "apipark" "curl -s -o /dev/null ${ApiparkAddress}/api/v1/account/login"
}
wait_for_apinto() {
wait_for "apinto" "curl -s -o /dev/null ${ApintoAddress}/api/router"
}
wait_for_influxdb() {
wait_for "influxdb" "curl -s -o /dev/null ${InfluxdbAddress}/api/v2/health"
}
+107 -3
View File
@@ -23,7 +23,77 @@ var (
)
type imlLogService struct {
store log_source.ILogSourceStore `autowired:""`
store log_source.ILogSourceStore `autowired:""`
logRecordStore log_source.ILogRecordStore `autowired:""`
}
func (i *imlLogService) LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error) {
list, total, err := i.logRecordStore.ListPage(ctx, "`record_time` between ? and ? and `service` = ?", page, size, []interface{}{
start,
end,
serviceId,
}, "record_time desc")
if err != nil {
return nil, 0, err
}
return utils.SliceToSlice(list, func(s *log_source.LogRecord) *Item {
return &Item{
ID: s.UUID,
Strategy: s.Strategy,
Service: s.Service,
API: s.API,
Method: s.Method,
Url: s.Url,
RemoteIP: s.RemoteIP,
Consumer: s.Consumer,
Authorization: s.Authorization,
InputToken: s.InputToken,
OutputToken: s.OutputToken,
TotalToken: s.TotalToken,
AIProvider: s.AIProvider,
AIModel: s.AIModel,
StatusCode: s.StatusCode,
ResponseTime: s.ResponseTime,
Traffic: s.Traffic,
RecordTime: s.RecordTime,
}
}), total, nil
}
func (i *imlLogService) InsertLog(ctx context.Context, driver string, input *InsertLog) error {
// 判断日志是否已存在,若已存在,则不插入
_, err := i.logRecordStore.First(ctx, map[string]interface{}{"uuid": input.ID})
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
log_print.Errorf("get log record %s error: %s", input.ID, err)
return err
}
return i.logRecordStore.Insert(ctx, &log_source.LogRecord{
UUID: input.ID,
Driver: input.Driver,
Service: input.Service,
API: input.API,
Strategy: input.Strategy,
Method: input.Method,
Url: input.Url,
RemoteIP: input.RemoteIP,
Consumer: input.Consumer,
Authorization: input.Authorization,
InputToken: input.InputToken,
OutputToken: input.OutputToken,
TotalToken: input.TotalToken,
AIProvider: input.AIProvider,
AIModel: input.AIModel,
StatusCode: input.StatusCode,
ResponseTime: input.ResponseTime,
Traffic: input.Traffic,
RecordTime: input.RecordTime,
})
}
return nil
}
func (i *imlLogService) OnComplete() {
@@ -67,9 +137,10 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
if input.Config == nil || *input.Config == "" {
return errors.New("config is required")
}
now := time.Now()
userId := utils.UserId(ctx)
s = &log_source.Log{
s = &log_source.LogSource{
UUID: input.ID,
Cluster: *input.Cluster,
Driver: driver,
@@ -79,11 +150,19 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
CreateAt: now,
UpdateAt: now,
}
if input.LastPullTime == nil {
s.LastPullAt = time.Now().Add(-24 * time.Hour)
} else {
s.LastPullAt = *input.LastPullTime
}
} else {
if input.Config != nil && *input.Config != "" {
s.Config = *input.Config
}
if input.LastPullTime != nil {
s.LastPullAt = *input.LastPullTime
}
s.Updater = utils.UserId(ctx)
s.UpdateAt = time.Now()
}
@@ -129,6 +208,10 @@ func (i *imlLogService) Logs(ctx context.Context, driver string, cluster string,
return result, count, nil
}
func (i *imlLogService) LogRecords(ctx context.Context, driver string, keyword string, start time.Time, end time.Time) ([]*Item, int64, error) {
panic(errors.New("not implemented"))
}
func (i *imlLogService) LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error) {
d, has := log_driver.GetDriver(driver)
if !has {
@@ -147,11 +230,32 @@ func (i *imlLogService) LogInfo(ctx context.Context, driver string, cluster stri
return nil, err
}
return &Info{
ID: info.ID,
Item: Item{
ID: info.ID,
Strategy: info.Strategy,
Service: info.Service,
API: info.API,
Method: info.Method,
Url: info.Url,
RemoteIP: info.RemoteIP,
Consumer: info.Consumer,
Authorization: info.Authorization,
InputToken: info.InputToken,
OutputToken: info.OutputToken,
TotalToken: info.TotalToken,
AIProvider: info.AIProvider,
AIModel: info.AIModel,
StatusCode: info.StatusCode,
ResponseTime: info.ResponseTime,
Traffic: info.Traffic,
RecordTime: info.RecordTime,
},
ContentType: info.ContentType,
RequestBody: info.RequestBody,
ProxyBody: info.ProxyBody,
ProxyResponseBody: info.ProxyResponseBody,
ResponseBody: info.ResponseBody,
RequestHeader: info.RequestHeader,
ResponseHeader: info.ResponseHeader,
}, nil
}
+59 -22
View File
@@ -7,51 +7,88 @@ import (
)
type Save struct {
ID string
Cluster *string
Config *string
ID string
Cluster *string
Config *string
LastPullTime *time.Time
}
type Source struct {
ID string
Cluster string
Driver string
Config string
Creator string
Updater string
CreateAt time.Time
UpdateAt time.Time
ID string
Cluster string
Driver string
Config string
Creator string
Updater string
CreateAt time.Time
UpdateAt time.Time
LastPullTime time.Time
}
func FromEntity(ov *log_source.Log) *Source {
func FromEntity(ov *log_source.LogSource) *Source {
return &Source{
ID: ov.UUID,
Cluster: ov.Cluster,
Driver: ov.Driver,
Config: ov.Config,
Creator: ov.Creator,
Updater: ov.Updater,
CreateAt: ov.CreateAt,
UpdateAt: ov.UpdateAt,
ID: ov.UUID,
Cluster: ov.Cluster,
Driver: ov.Driver,
Config: ov.Config,
Creator: ov.Creator,
Updater: ov.Updater,
LastPullTime: ov.LastPullAt,
CreateAt: ov.CreateAt,
UpdateAt: ov.UpdateAt,
}
}
type Item struct {
type InsertLog struct {
ID string
Driver string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type Item struct {
ID string
Strategy string
Service string
API string
Method string
Url string
RemoteIP string
Consumer string
Authorization string
InputToken int64
OutputToken int64
TotalToken int64
AIProvider string
AIModel string
StatusCode int64
ResponseTime int64
Traffic int64
RecordTime time.Time
}
type Info struct {
ID string
Item
ContentType string
RequestBody string
ProxyBody string
ProxyResponseBody string
ResponseBody string
RequestHeader string
ResponseHeader string
}
+6
View File
@@ -13,8 +13,14 @@ type ILogService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error
GetLogSource(ctx context.Context, driver string) (*Source, error)
Logs(ctx context.Context, driver string, cluster string, conditions map[string]string, start time.Time, end time.Time, limit int64, offset int64) ([]*Item, int64, error)
LogRecordsByService(ctx context.Context, serviceId string, start time.Time, end time.Time, page int, size int) ([]*Item, int64, error)
LogCount(ctx context.Context, driver string, cluster string, conditions map[string]string, spendHour int64, group string) (map[string]int64, error)
LogInfo(ctx context.Context, driver string, cluster string, id string) (*Info, error)
InsertLog(ctx context.Context, driver string, input *InsertLog) error
}
type ILogUpdateService interface {
UpdateLogSource(ctx context.Context, driver string, input *Save) error
}
func init() {
+26
View File
@@ -157,3 +157,29 @@ type MonTrendValues struct {
Names []string
Values [][]interface{}
}
type StatusCodeOverview struct {
Status2xx int64
Status4xx int64
Status5xx int64
StatusTotal int64
}
type TokenOverview struct {
InputToken int64
OutputToken int64
TotalToken int64
}
type TopN struct {
Key string
Request int64
Token int64
Traffic int64
}
type Aggregate struct {
Max int64
Min int64
Avg int64
}
+44 -12
View File
@@ -2,22 +2,54 @@ package log_source
import "time"
type Log struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
type LogSource struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Cluster string `gorm:"column:cluster;type:varchar(36);NOT NULL;comment:集群ID"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Config string `gorm:"column:config;type:TEXT;NOT NULL;comment:配置"`
Creator string `gorm:"type:varchar(36);not null;column:creator;comment:creator" aovalue:"creator"`
Updater string `gorm:"type:varchar(36);not null;column:updater;comment:updater" aovalue:"updater"`
LastPullAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:last_pull_at;comment:最后拉取时间"`
CreateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;column:create_at;comment:创建时间"`
UpdateAt time.Time `gorm:"type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;column:update_at;comment:修改时间" json:"update_at"`
}
func (c *Log) IdValue() int64 {
func (c *LogSource) IdValue() int64 {
return c.Id
}
func (c *Log) TableName() string {
func (c *LogSource) TableName() string {
return "log"
}
type LogRecord struct {
Id int64 `gorm:"column:id;type:BIGINT(20);AUTO_INCREMENT;NOT NULL;comment:id;primary_key;comment:主键ID;"`
UUID string `gorm:"type:varchar(36);not null;column:uuid;uniqueIndex:uuid;comment:UUID;"`
Driver string `gorm:"column:driver;type:VARCHAR(36);NOT NULL;comment:驱动"`
Service string `gorm:"column:service;type:varchar(36);NOT NULL;comment:服务ID"`
API string `gorm:"column:api;type:varchar(36);NOT NULL;comment:接口ID"`
Strategy string `gorm:"column:strategy;type:varchar(36);NOT NULL;comment:策略ID"`
Method string `gorm:"column:method;type:varchar(36);NOT NULL;comment:请求方法"`
Url string `gorm:"column:url;type:varchar(255);NOT NULL;comment:请求URL"`
RemoteIP string `gorm:"column:remote_ip;type:varchar(255);NOT NULL;comment:请求IP"`
Consumer string `gorm:"column:consumer;type:varchar(255);NOT NULL;comment:消费者ID"`
Authorization string `gorm:"column:authorization;type:varchar(255);NOT NULL;comment:鉴权ID"`
InputToken int64 `gorm:"column:input_token;type:int(11);NOT NULL;comment:输入令牌"`
OutputToken int64 `gorm:"column:output_token;type:int(11);NOT NULL;comment:输出令牌"`
TotalToken int64 `gorm:"column:total_token;type:int(11);NOT NULL;comment:总令牌"`
AIProvider string `gorm:"column:ai_provider;type:varchar(255);NOT NULL;comment:AI提供商"`
AIModel string `gorm:"column:ai_model;type:varchar(255);NOT NULL;comment:AI模型"`
StatusCode int64 `gorm:"column:status_code;type:int(11);NOT NULL;comment:请求状态码"`
ResponseTime int64 `gorm:"column:response_time;type:int(11);NOT NULL;comment:响应时间"`
Traffic int64 `gorm:"column:traffic;type:BIGINT(20);NOT NULL;comment:流量"`
RecordTime time.Time `gorm:"column:record_time;type:timestamp;NOT NULL;DEFAULT:CURRENT_TIMESTAMP;comment:记录时间"`
}
func (c *LogRecord) IdValue() int64 {
return c.Id
}
func (c *LogRecord) TableName() string {
return "log_record"
}
+13 -2
View File
@@ -8,15 +8,26 @@ import (
)
type ILogSourceStore interface {
store.IBaseStore[Log]
store.IBaseStore[LogSource]
}
type storeLogSource struct {
store.Store[Log]
store.Store[LogSource]
}
type ILogRecordStore interface {
store.IBaseStore[LogRecord]
}
type storeLogRecord struct {
store.Store[LogRecord]
}
func init() {
autowire.Auto[ILogSourceStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogSource))
})
autowire.Auto[ILogRecordStore](func() reflect.Value {
return reflect.ValueOf(new(storeLogRecord))
})
}