From afe047b102250e6ece7fcd58805df5e8c5d03e56 Mon Sep 17 00:00:00 2001 From: xml Date: Wed, 18 Mar 2026 13:35:41 +0800 Subject: [PATCH] feat: Implement support for online and local rate limiting Task: https://pms.uniontech.com/story-view-40501.html --- src/internal/config/config.go | 22 +++- src/internal/system/apt/apt.go | 18 +-- src/internal/system/apt/proxy.go | 74 ++++++++++-- src/internal/system/command.go | 38 ++++-- src/internal/system/system.go | 13 ++ src/internal/updateplatform/message_report.go | 78 +++++++++++- src/lastore-daemon/dbusutil.go | 26 ++++ src/lastore-daemon/job.go | 16 ++- src/lastore-daemon/job_manager.go | 23 +++- src/lastore-daemon/main.go | 4 + src/lastore-daemon/manager.go | 46 ++++--- src/lastore-daemon/manager_download.go | 114 +++++++++++++++++- src/lastore-daemon/manager_ifc.go | 2 +- src/lastore-daemon/manager_update.go | 35 ++++++ src/lastore-daemon/speed_meter.go | 10 +- src/lastore-daemon/stub.go | 2 + src/lastore-daemon/updater.go | 8 +- src/lastore-daemon/updater_ifc.go | 25 ++-- .../org.deepin.dde.lastore.json | 13 +- 19 files changed, 487 insertions(+), 80 deletions(-) diff --git a/src/internal/config/config.go b/src/internal/config/config.go index 679fc1531..e402c916e 100644 --- a/src/internal/config/config.go +++ b/src/internal/config/config.go @@ -91,10 +91,11 @@ type Config struct { NonUnknownList []string // 非未知来源更新list文件 OtherSourceList []string // 其他类型更新list文件路径 - DownloadSpeedLimitConfig string - lastoreDaemonStatus LastoreDaemonStatus - UpdateStatus string - PlatformUpdate bool + DownloadSpeedLimitConfig string + LocalDownloadSpeedLimitConfig string + lastoreDaemonStatus LastoreDaemonStatus + UpdateStatus string + PlatformUpdate bool PlatformUrl string // 更新接口地址 StartCheckRange []int // 开机检查更新区间 @@ -188,6 +189,7 @@ const ( dSettingsKeySystemSourceList = "system-sources" dSettingsKeyNonUnknownList = "non-unknown-sources" dSettingsKeyDownloadSpeedLimit = "download-speed-limit" + dSettingsKeyLocalDownloadSpeedLimit = "local-download-speed-limit" DSettingsKeyLastoreDaemonStatus = "lastore-daemon-status" dSettingsKeyUpdateStatus = "update-status" dSettingsKeyPlatformUpdate = "platform-update" @@ -467,6 +469,13 @@ func getConfigFromDSettings() *Config { c.DownloadSpeedLimitConfig = v.Value().(string) } + v, err = c.dsLastoreManager.Value(0, dSettingsKeyLocalDownloadSpeedLimit) + if err != nil { + logger.Warning(err) + } else { + c.LocalDownloadSpeedLimitConfig = v.Value().(string) + } + updateLastoreDaemonStatus := func() { v, err = c.dsLastoreManager.Value(0, DSettingsKeyLastoreDaemonStatus) if err != nil { @@ -899,6 +908,11 @@ func (c *Config) SetDownloadSpeedLimitConfig(config string) error { return c.save(dSettingsKeyDownloadSpeedLimit, config) } +func (c *Config) SetLocalDownloadSpeedLimitConfig(config string) error { + c.LocalDownloadSpeedLimitConfig = config + return c.save(dSettingsKeyLocalDownloadSpeedLimit, config) +} + func (c *Config) SetLastoreDaemonStatus(status LastoreDaemonStatus) error { c.statusMu.Lock() c.lastoreDaemonStatus = status diff --git a/src/internal/system/apt/apt.go b/src/internal/system/apt/apt.go index 83ab7da8c..c806a4fb4 100644 --- a/src/internal/system/apt/apt.go +++ b/src/internal/system/apt/apt.go @@ -120,19 +120,21 @@ func createCommandLine(cmdType string, cmdArgs []string) *exec.Cmd { return exec.Command("apt-get", args...) } -func newAPTCommand(cmdSet system.CommandSet, jobId string, cmdType string, fn system.Indicator, cmdArgs []string) *system.Command { +func newAPTCommand(cmdSet system.CommandSet, jobId string, cmdType string, fn system.Indicator, deliveryFn system.DeliveryIndicator, cmdArgs []string) *system.Command { cmd := createCommandLine(cmdType, cmdArgs) // See aptCommand.Abort cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} r := &system.Command{ - JobId: jobId, - CmdSet: cmdSet, - Indicator: fn, - ParseJobError: parseJobError, - ParseProgressInfo: parseProgressInfo, - Cmd: cmd, - Cancelable: true, + JobId: jobId, + CmdSet: cmdSet, + Indicator: fn, + DeliveryIndicator: deliveryFn, + ParseJobError: parseJobError, + ParseProgressInfo: parseProgressInfo, + ParseDeliveryDownloadInfo: parseDeliveryDownloadInfo, + Cmd: cmd, + Cancelable: true, } cmd.Stdout = &r.Stdout cmd.Stderr = &r.Stderr diff --git a/src/internal/system/apt/proxy.go b/src/internal/system/apt/proxy.go index 35691d09d..98ef9f71d 100644 --- a/src/internal/system/apt/proxy.go +++ b/src/internal/system/apt/proxy.go @@ -29,6 +29,7 @@ type APTSystem struct { CmdSet map[string]*system.Command Indicator system.Indicator IncrementalUpdate bool + DeliveryIndicator system.DeliveryIndicator } func NewSystem(nonUnknownList []string, otherList []string, incrementalUpdate bool) system.System { @@ -56,6 +57,53 @@ func parseProgressField(v string) (float64, error) { return progress, nil } +func parseDeliveryDownloadInfo(id, line string) (system.JobDeliveryDownloadInfo, error) { + if strings.Contains(line, "102 Status") { + var jobDeliveryDownloadInfo system.JobDeliveryDownloadInfo + jobDeliveryDownloadInfo.JobId = id + line = strings.TrimPrefix(line, "102 Status[") + line = strings.TrimSuffix(line, "]") + parts := strings.Split(line, "} {") + var isFinish bool + var err error + var speed int64 + var proto string + for i := range parts { + parts[i] = strings.Trim(parts[i], "{} ") + } + for _, part := range parts { + kv := strings.SplitN(part, " ", 2) + if len(kv) != 2 { + continue + } + key := kv[0] + value := kv[1] + + if key != "IsFinish" && key != "Speed" && key != "Proto" { + continue + } else if key == "IsFinish" { + isFinish, err = strconv.ParseBool(value) + if err != nil { + logger.Warningf("failed to parse isFinish %v", err) + isFinish = true + } + } else if key == "Speed" { + speed, _ = strconv.ParseInt(value, 10, 64) + } else if key == "Proto" { + proto = value + } + } + if !isFinish { + jobDeliveryDownloadInfo.Speed = speed + jobDeliveryDownloadInfo.Proto = proto + } else { + jobDeliveryDownloadInfo.Speed = -1 + } + return jobDeliveryDownloadInfo, nil + } + return system.JobDeliveryDownloadInfo{JobId: id}, nil +} + func parseProgressInfo(id, line string) (system.JobProgressInfo, error) { fs := strings.SplitN(line, ":", 4) if len(fs) != 4 { @@ -109,6 +157,10 @@ func (p *APTSystem) AttachIndicator(f system.Indicator) { p.Indicator = f } +func (p *APTSystem) AttachDeliveryIndicator(f system.DeliveryIndicator) { + p.DeliveryIndicator = f +} + func WaitDpkgLockRelease() { for { msg, wait := system.CheckLock("/var/lib/dpkg/lock") @@ -268,7 +320,7 @@ func (p *APTSystem) DownloadPackages(jobId string, packages []string, environ ma if err != nil { return err } - c := newAPTCommand(p, jobId, system.DownloadJobType, p.Indicator, append(packages, OptionToArgs(args)...)) + c := newAPTCommand(p, jobId, system.DownloadJobType, p.Indicator, p.DeliveryIndicator, append(packages, OptionToArgs(args)...)) c.SetEnv(environ) return c.Start() } @@ -300,11 +352,11 @@ func (p *APTSystem) DownloadSource(jobId string, packages []string, environ map[ environ["DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION"] = upgradeArgString logger.Info("DownloadSource set env DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION:", upgradeArgString) - c := newAPTCommand(p, jobId, system.IncrementalDownloadJobType, p.Indicator, cmdArgs) + c := newAPTCommand(p, jobId, system.IncrementalDownloadJobType, p.Indicator, p.DeliveryIndicator, cmdArgs) c.SetEnv(environ) return c.Start() } - c := newAPTCommand(p, jobId, system.PrepareDistUpgradeJobType, p.Indicator, append(packages, OptionToArgs(args)...)) + c := newAPTCommand(p, jobId, system.PrepareDistUpgradeJobType, p.Indicator, p.DeliveryIndicator, append(packages, OptionToArgs(args)...)) c.SetEnv(environ) return c.Start() } @@ -316,7 +368,7 @@ func (p *APTSystem) Remove(jobId string, packages []string, environ map[string]s return err } - c := newAPTCommand(p, jobId, system.RemoveJobType, p.Indicator, packages) + c := newAPTCommand(p, jobId, system.RemoveJobType, p.Indicator, p.DeliveryIndicator, packages) environ["IMMUTABLE_DISABLE_REMOUNT"] = "false" c.SetEnv(environ) return safeStart(c) @@ -328,7 +380,7 @@ func (p *APTSystem) Install(jobId string, packages []string, environ map[string] if err != nil { return err } - c := newAPTCommand(p, jobId, system.InstallJobType, p.Indicator, append(OptionToArgs(args), packages...)) + c := newAPTCommand(p, jobId, system.InstallJobType, p.Indicator, p.DeliveryIndicator, append(OptionToArgs(args), packages...)) environ["IMMUTABLE_DISABLE_REMOUNT"] = "false" c.SetEnv(environ) return safeStart(c) @@ -365,12 +417,12 @@ func (p *APTSystem) DistUpgrade(jobId string, packages []string, environ map[str environ["DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION"] = upgradeArgString logger.Info("DistUpgrade set env DEEPIN_IMMUTABLE_UPGRADE_APT_OPTION:", upgradeArgString) - c := newAPTCommand(p, jobId, system.IncrementalUpdateJobType, p.Indicator, cmdArgs) + c := newAPTCommand(p, jobId, system.IncrementalUpdateJobType, p.Indicator, p.DeliveryIndicator, cmdArgs) c.SetEnv(environ) return c.Start() } - c := newAPTCommand(p, jobId, system.DistUpgradeJobType, p.Indicator, append(OptionToArgs(args), packages...)) + c := newAPTCommand(p, jobId, system.DistUpgradeJobType, p.Indicator, p.DeliveryIndicator, append(OptionToArgs(args), packages...)) environ["IMMUTABLE_DISABLE_REMOUNT"] = "false" c.SetEnv(environ) return safeStart(c) @@ -384,7 +436,7 @@ func (p *APTSystem) UpdateSource(jobId string, environ map[string]string, args m logger.Warningf("Failed to update remotes: %v, %s", err, string(output)) } } - c := newAPTCommand(p, jobId, system.UpdateSourceJobType, p.Indicator, OptionToArgs(args)) + c := newAPTCommand(p, jobId, system.UpdateSourceJobType, p.Indicator, p.DeliveryIndicator, OptionToArgs(args)) c.AtExitFn = func() bool { // 无网络时检查更新失败,exitCode为0,空间不足(不确定exit code)导致需要特殊处理 if c.ExitCode == system.ExitSuccess && bytes.Contains(c.Stderr.Bytes(), []byte("Some index files failed to download")) { @@ -402,7 +454,7 @@ func (p *APTSystem) UpdateSource(jobId string, environ map[string]string, args m } func (p *APTSystem) Clean(jobId string) error { - c := newAPTCommand(p, jobId, system.CleanJobType, p.Indicator, nil) + c := newAPTCommand(p, jobId, system.CleanJobType, p.Indicator, p.DeliveryIndicator, nil) return c.Start() } @@ -422,7 +474,7 @@ func (p *APTSystem) AbortWithFailed(jobId string) error { func (p *APTSystem) FixError(jobId string, errType string, environ map[string]string, args map[string]string) error { WaitDpkgLockRelease() - c := newAPTCommand(p, jobId, system.FixErrorJobType, p.Indicator, append([]string{errType}, OptionToArgs(args)...)) + c := newAPTCommand(p, jobId, system.FixErrorJobType, p.Indicator, p.DeliveryIndicator, append([]string{errType}, OptionToArgs(args)...)) environ["IMMUTABLE_DISABLE_REMOUNT"] = "false" c.SetEnv(environ) if system.JobErrorType(errType) == system.ErrorDependenciesBroken { // 修复依赖错误的时候,会有需要卸载dde的情况,因此需要用safeStart来进行处理 @@ -655,7 +707,7 @@ func parseBackupJobError(stdErrStr string, stdOutStr string) *system.JobError { } func (p *APTSystem) OsBackup(jobId string) error { - c := newAPTCommand(p, jobId, system.BackupJobType, p.Indicator, nil) + c := newAPTCommand(p, jobId, system.BackupJobType, p.Indicator, p.DeliveryIndicator, nil) c.ParseJobError = parseBackupJobError c.ParseProgressInfo = func(id, line string) (system.JobProgressInfo, error) { type info struct { diff --git a/src/internal/system/command.go b/src/internal/system/command.go index 172614344..e2822058b 100644 --- a/src/internal/system/command.go +++ b/src/internal/system/command.go @@ -34,9 +34,11 @@ type Command struct { pipe *os.File - Indicator Indicator - ParseProgressInfo ParseProgressInfo - ParseJobError ParseJobError + Indicator Indicator + DeliveryIndicator DeliveryIndicator + ParseProgressInfo ParseProgressInfo + ParseJobError ParseJobError + ParseDeliveryDownloadInfo ParseDeliveryDownloadInfo Stdout bytes.Buffer Stderr bytes.Buffer @@ -275,17 +277,27 @@ func (c *Command) updateProgress() { return } - info, err := c.ParseProgressInfo(c.JobId, line) - if err != nil { - logger.Errorf("aptCommand.updateProgress %v -> %v\n", info, err) - c.Indicator(JobProgressInfo{ - OnlyLog: true, - OriginalLog: line, - }) + if strings.Contains(line, "102 Status") { + deliveryInfo, err := c.ParseDeliveryDownloadInfo(c.JobId, line) + if err != nil { + logger.Errorf("aptCommand.updateProgress %v -> %v\n", deliveryInfo, err) + continue + } + c.DeliveryIndicator(deliveryInfo) continue + } else { + info, err := c.ParseProgressInfo(c.JobId, line) + if err != nil { + logger.Errorf("aptCommand.updateProgress %v -> %v\n", info, err) + c.Indicator(JobProgressInfo{ + OnlyLog: true, + OriginalLog: line, + }) + continue + } + info.OriginalLog = line + c.Cancelable = info.Cancelable + c.Indicator(info) } - info.OriginalLog = line - c.Cancelable = info.Cancelable - c.Indicator(info) } } diff --git a/src/internal/system/system.go b/src/internal/system/system.go index 11af920c6..593fa68e9 100644 --- a/src/internal/system/system.go +++ b/src/internal/system/system.go @@ -62,6 +62,16 @@ const ( NotifyExpireTimeoutPrivateLong = 600000 ) +type JobDeliveryDownloadInfo struct { + JobId string + FileName string + Proto string + DownloadSize int64 + DownloadedSize int64 + Speed int64 + Progress float64 +} + type JobProgressInfo struct { JobId string Progress float64 @@ -132,7 +142,9 @@ var NotSupportError = errors.New("not support operation") var ResourceExitError = errors.New("resource exists") type Indicator func(JobProgressInfo) +type DeliveryIndicator func(JobDeliveryDownloadInfo) type ParseProgressInfo func(id, line string) (JobProgressInfo, error) +type ParseDeliveryDownloadInfo func(id, line string) (JobDeliveryDownloadInfo, error) type ParseJobError func(stdErrStr string, stdOutStr string) *JobError type System interface { @@ -146,6 +158,7 @@ type System interface { Abort(jobId string) error AbortWithFailed(jobId string) error AttachIndicator(Indicator) + AttachDeliveryIndicator(DeliveryIndicator) FixError(jobId string, errType string, environ map[string]string, cmdArgs map[string]string) error OsBackup(jobId string) error CheckSystem(jobId string, checkType string, environ map[string]string, cmdArgs map[string]string) error diff --git a/src/internal/updateplatform/message_report.go b/src/internal/updateplatform/message_report.go index 3d3e47a7e..22e5e7919 100644 --- a/src/internal/updateplatform/message_report.go +++ b/src/internal/updateplatform/message_report.go @@ -119,10 +119,11 @@ type UpdatePlatformManager struct { Token string arch string - Tp UpdateTp // 更新策略类型:1.非强制更新,2.强制更新/立即更新,3.强制更新/关机或重启时更新,4.强制更新/指定时间更新 - UpdateTime time.Time // 更新时间(指定时间更新时的时间) - UpdateNowForce bool // 立即更新 - mu sync.Mutex + Tp UpdateTp // 更新策略类型:1.非强制更新,2.强制更新/立即更新,3.强制更新/关机或重启时更新,4.强制更新/指定时间更新 + UpdateTime time.Time // 更新时间(指定时间更新时的时间) + OnlineRateLimit throttlingMessage + UpdateNowForce bool // 立即更新 + mu sync.Mutex jobPostMsgMap map[string]*UpgradePostMsg jobPostMsgMapMu sync.Mutex @@ -454,6 +455,18 @@ type Version struct { TaskID int `json:"taskID"` } +type AllDayRateLimit struct { + Bps int `json:"bps"` + Enable bool `json:"enable"` +} + +type PeakOrNotTimeRateLimit struct { + Enable bool `json:"enable"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Bps int `json:"bps"` +} + type Policy struct { Tp UpdateTp `json:"tp"` @@ -485,6 +498,13 @@ type updateMessage struct { ClientPollSetting ClientPollSetting `json:"clientPollSetting"` } +type throttlingMessage struct { + ServerTime string `json:"serverTime"` + AllDayRateLimit AllDayRateLimit `json:"allDayRateLimit"` // 全天限速 + PeakTimeRateLimit PeakOrNotTimeRateLimit `json:"peakTimeRateLimit"` // 忙时(07:00~22:00)限速 + OffPeakTimeRateLimit PeakOrNotTimeRateLimit `json:"offPeakTimeRateLimit"` // 闲时(22:00~07:00)限速 +} + type tokenMessage struct { Result bool `json:"result"` Code int `json:"code"` @@ -504,6 +524,7 @@ const ( GetTargetPkgLists // 系统软件包清单 GetCurrentPkgLists GetPkgCVEs // CVE 信息 + GetThrottling PostProcess PostProcessEvent PostResult @@ -539,6 +560,10 @@ var Urls = map[requestType]requestContent{ "/api/v1/cve/sync", "GET", }, + GetThrottling: { + "/api/v1/throttling/client", + "GET", + }, PostProcess: { "/api/v1/process", "POST", @@ -569,6 +594,19 @@ func (m *UpdatePlatformManager) genVersionResponse() (*http.Response, error) { return client.Do(request) } +func (m *UpdatePlatformManager) genThrottlingResponse() (*http.Response, error) { + throttlingUrl := m.requestUrl + Urls[GetThrottling].path + client := &http.Client{ + Timeout: 40 * time.Second, + } + request, err := http.NewRequest(Urls[GetThrottling].method, throttlingUrl, nil) + if err != nil { + return nil, fmt.Errorf("%v new request failed: %v ", GetThrottling.string(), err.Error()) + } + request.Header.Set("X-Repo-Token", base64.RawStdEncoding.EncodeToString([]byte(m.Token))) + return client.Do(request) +} + func (m *UpdatePlatformManager) genTargetPkgListsResponse() (*http.Response, error) { policyUrl := m.requestUrl + Urls[GetTargetPkgLists].path client := &http.Client{ @@ -727,6 +765,18 @@ func getVersionData(data json.RawMessage) *updateMessage { return tmp } +func getThrottlingData(data json.RawMessage) *throttlingMessage { + tmp := &throttlingMessage{} + err := json.Unmarshal(data, &tmp) + if err != nil { + logger.Warningf("%v failed to Unmarshal msg.Data to updateMessage: %v ", GetThrottling.string(), err.Error()) + return nil + } + serverTime, err := time.ParseInLocation(time.RFC3339, tmp.ServerTime, time.Local) + tmp.ServerTime = serverTime.Format("15:04:05") + return tmp +} + func getTargetPkgListData(data json.RawMessage) *PreInstalledPkgMeta { tmp := &PreInstalledPkgMeta{} err := json.Unmarshal(data, &tmp) @@ -839,7 +889,9 @@ func (m *UpdatePlatformManager) GenUpdatePolicyByToken(updateInRelease bool) err // 根据配置更新Tp switch m.Tp { case UpdateNow: + m.config.SetInstallUpdateTime("") case UpdateShutdown: + m.config.SetInstallUpdateTime("") case UpdateRegularly: default: logger.Debug("Config update time:", m.config.UpdateTime) @@ -882,6 +934,24 @@ func (m *UpdatePlatformManager) GenUpdatePolicyByToken(updateInRelease bool) err return err } +func (m *UpdatePlatformManager) GenThrottlingByToken() error { + response, err := m.genThrottlingResponse() + if err != nil { + return fmt.Errorf("failed get throttling data %v", err) + } + data, err := getResponseData(response, GetThrottling) + if err != nil { + return fmt.Errorf("failed get throttling data %v", err) + } + msg := getThrottlingData(data) + if msg == nil { + return errors.New("failed get throttling data") + } + + m.OnlineRateLimit = *msg + return err +} + type packageLists struct { Core []system.PlatformPackageInfo `json:"core"` // "必须安装软件包清单" Select []system.PlatformPackageInfo `json:"select"` // "可选软件包清单" diff --git a/src/lastore-daemon/dbusutil.go b/src/lastore-daemon/dbusutil.go index 254a0882c..99716f833 100644 --- a/src/lastore-daemon/dbusutil.go +++ b/src/lastore-daemon/dbusutil.go @@ -286,6 +286,19 @@ func (v *Job) emitPropChangedProgress(value float64) error { return v.service.EmitPropertyChanged(v, "Progress", value) } +func (v *Job) setPropProto(value string) (changed bool) { + if v.Proto != value { + v.Proto = value + v.emitPropChangedProto(value) + return true + } + return false +} + +func (v *Job) emitPropChangedProto(value string) error { + return v.service.EmitPropertyChanged(v, "Proto", value) +} + func (v *Job) setPropDescription(value string) (changed bool) { if v.Description != value { v.Description = value @@ -356,6 +369,19 @@ func (v *Manager) emitPropChangedSystemOnChanging(value bool) error { return v.service.EmitPropertyChanged(v, "SystemOnChanging", value) } +func (v *Manager) setPropDownloadLimitOnChanging(value bool) (changed bool) { + if v.DownloadLimitOnChanging != value { + v.DownloadLimitOnChanging = value + v.emitPropChangedDownloadLimitOnChanging(value) + return true + } + return false +} + +func (v *Manager) emitPropChangedDownloadLimitOnChanging(value bool) error { + return v.service.EmitPropertyChanged(v, "DownloadLimitOnChanging", value) +} + func (v *Manager) setPropAutoClean(value bool) (changed bool) { if v.AutoClean != value { v.AutoClean = value diff --git a/src/lastore-daemon/job.go b/src/lastore-daemon/job.go index aa46ac8e6..08e1a368b 100644 --- a/src/lastore-daemon/job.go +++ b/src/lastore-daemon/job.go @@ -40,8 +40,10 @@ type Job struct { Description string // completed bytes per second - Speed int64 - speedMeter SpeedMeter + Speed int64 + DeliverySpeed int64 + Proto string + speedMeter SpeedMeter Cancelable bool @@ -131,6 +133,13 @@ func (j *Job) String() string { ) } +func (j *Job) updateDeliveryDownloadInfo(info system.JobDeliveryDownloadInfo) { + j.PropsMu.Lock() + defer j.PropsMu.Unlock() + j.DeliverySpeed = info.Speed + j.Proto = info.Proto +} + // updateInfo update Job information from info and return // whether the information changed. func (j *Job) updateInfo(info system.JobProgressInfo) bool { @@ -183,12 +192,13 @@ func (j *Job) updateInfo(info system.JobProgressInfo) bool { } // see the apt.go, we scale download progress value range in [0,0.5 - speed := j.speedMeter.Speed(info.Progress) + speed := j.speedMeter.Speed(info.Progress, j.DeliverySpeed) if speed != j.Speed { changed = true j.Speed = speed _ = j.emitPropChangedSpeed(speed) + _ = j.emitPropChangedProto(j.Proto) } if info.FatalError { diff --git a/src/lastore-daemon/job_manager.go b/src/lastore-daemon/job_manager.go index 5e5726557..d3fdbdaf1 100644 --- a/src/lastore-daemon/job_manager.go +++ b/src/lastore-daemon/job_manager.go @@ -43,8 +43,9 @@ type JobManager struct { system system.System - mux sync.RWMutex - changed bool + mux sync.RWMutex + changed bool + DownloadLimitOnChanging bool dispatchMux sync.Mutex notify func() @@ -69,6 +70,7 @@ func NewJobManager(service *dbusutil.Service, api system.System, notifyFn func() m.createJobList(DelayLockQueue, 1) api.AttachIndicator(m.handleJobProgressInfo) + api.AttachDeliveryIndicator(m.handleDeliveryDownloadInfo) return m } @@ -136,6 +138,10 @@ func (jm *JobManager) CreateJob(jobName, jobType string, packages []string, envi "Dir::Etc::SourceParts": "/dev/null", } } + + // 理论上只有开启了更新传递后,这个选项才会有效,不过没有开启的话,加上这个选项也不会报错,所以统一都加上吧 + partJob.option["Acquire::delivery::LogLevel"] = "1" + partJob.updateTyp = typ if len(jobList) >= 1 { jobList[len(jobList)-1].next = partJob @@ -357,6 +363,7 @@ func (jm *JobManager) ForceAbortAndRetry(job *Job) error { job.retry = 1 } + jm.DownloadLimitOnChanging = true return nil } @@ -562,6 +569,18 @@ func (jm *JobManager) handleJobProgressInfo(info system.JobProgressInfo) { } } +func (jm *JobManager) handleDeliveryDownloadInfo(info system.JobDeliveryDownloadInfo) { + j := jm.findJobById(info.JobId) + if j == nil { + logger.Warningf("Can't find Job %q when update info %v\n", info.JobId, info) + return + } + j.updateDeliveryDownloadInfo(info) + if info.Speed > 0 && jm.DownloadLimitOnChanging { + jm.DownloadLimitOnChanging = false + } +} + func (jm *JobManager) findJobById(jobId string) *Job { for _, queue := range jm.queues { job := queue.Find(jobId) diff --git a/src/lastore-daemon/main.go b/src/lastore-daemon/main.go index 1cc2dfabe..fa9bee511 100644 --- a/src/lastore-daemon/main.go +++ b/src/lastore-daemon/main.go @@ -100,6 +100,10 @@ func main() { } manager.initAgent() manager.initPlatformManager() + if config.IntranetUpdate { + //私有化更新的Cron是核心业务场景,不再依赖检查更新或者online定时器触发,而是直接创建 + manager.TryToStartCronCheck() + } err = serverObject.SetWriteCallback(updater, "AutoInstallUpdates", updater.autoInstallUpdatesWriteCallback) if err != nil { logger.Error("failed to set write cb for property AutoInstallUpdates:", err) diff --git a/src/lastore-daemon/manager.go b/src/lastore-daemon/manager.go index 213bea1f5..729faadf7 100644 --- a/src/lastore-daemon/manager.go +++ b/src/lastore-daemon/manager.go @@ -63,8 +63,9 @@ type Manager struct { // dbusutil-gen: equal=nil UpgradableApps []string - SystemOnChanging bool - AutoClean bool + SystemOnChanging bool + AutoClean bool + DownloadLimitOnChanging bool inhibitFd dbus.UnixFD updateSourceOnce bool @@ -126,19 +127,20 @@ func NewManager(service *dbusutil.Service, updateApi system.System, c *config.Co } m := &Manager{ - service: service, - config: c, - updateApi: updateApi, - SystemArchitectures: archs, - inhibitFd: -1, - AutoClean: c.AutoClean, - loginManager: login1.NewManager(service.Conn()), - sysDBusDaemon: ofdbus.NewDBus(service.Conn()), - signalLoop: dbusutil.NewSignalLoop(service.Conn(), 10), - systemd: systemd1.NewManager(service.Conn()), - sysPower: power.NewPower(service.Conn()), - securitySourceConfig: make(UpdateSourceConfig), - systemSourceConfig: make(UpdateSourceConfig), + service: service, + config: c, + updateApi: updateApi, + SystemArchitectures: archs, + inhibitFd: -1, + AutoClean: c.AutoClean, + loginManager: login1.NewManager(service.Conn()), + sysDBusDaemon: ofdbus.NewDBus(service.Conn()), + signalLoop: dbusutil.NewSignalLoop(service.Conn(), 10), + systemd: systemd1.NewManager(service.Conn()), + sysPower: power.NewPower(service.Conn()), + securitySourceConfig: make(UpdateSourceConfig), + systemSourceConfig: make(UpdateSourceConfig), + DownloadLimitOnChanging: false, } m.reloadOemConfig(true) m.signalLoop.Start() @@ -261,6 +263,13 @@ func (m *Manager) initPlatformManager() { } } +func (m *Manager) TryToStartCronCheck() { + if utils.IsFileExist("/run/systemd/transient/lastoreCronCheckPrivate.timer") { + return + } + m.startCheckPolicyTask() +} + func (m *Manager) delUpdatePackage(sender dbus.Sender, jobName string, packages string) (*Job, error) { pkgs, err := NormalizePackageNames(packages) if err != nil { @@ -1001,6 +1010,8 @@ func (m *Manager) ChangePrepareDistUpgradeJobOption() { err := m.jobManager.ForceAbortAndRetry(job) if err != nil { logger.Warning(err) + } else { + m.setPropDownloadLimitOnChanging(m.jobManager.DownloadLimitOnChanging) } } } @@ -1021,8 +1032,9 @@ func (m *Manager) afterUpdateModeChanged(change *dbusutil.PropertyChanged) { } func (m *Manager) handleDownloadLimitChanged(job *Job) { - limitEnable, limitConfig := m.updater.GetLimitConfig() - if limitEnable { + limitEnable, limitConfig, limitIsOnline := m.updater.GetLimitConfig() + logger.Infof("handleDownloadLimitChanged limitEnable: %v, limitConfig: %v, limitIsOnline: %v", limitEnable, limitConfig, limitIsOnline) + if limitEnable || limitIsOnline { if job.option == nil { job.option = make(map[string]string) } diff --git a/src/lastore-daemon/manager_download.go b/src/lastore-daemon/manager_download.go index 43e0d4881..27b6fc3b4 100644 --- a/src/lastore-daemon/manager_download.go +++ b/src/lastore-daemon/manager_download.go @@ -9,8 +9,10 @@ import ( "errors" "fmt" "os" + "strconv" "strings" "sync" + "time" "github.com/linuxdeepin/lastore-daemon/src/internal/system" "github.com/linuxdeepin/lastore-daemon/src/internal/system/dut" @@ -21,6 +23,73 @@ import ( "github.com/linuxdeepin/go-lib/gettext" ) +const ( + timeFormat = "15:04" + timeFormatWithSec = "15:04:05" +) + +func (m *Manager) setEffectiveOnlineRateLimit(nowTime string) { + downloadSpeed := m.updater.downloadSpeedLimitConfigObj + m.applyOnlineRateLimit(&downloadSpeed, nowTime) + downloadSpeedStr, err := json.Marshal(downloadSpeed) + if err == nil { + logger.Infof("setEffectiveOnlineRateLimit %v --> %v", m.config.DownloadSpeedLimitConfig, string(downloadSpeedStr)) + m.updater.SetDownloadSpeedLimit(string(downloadSpeedStr)) + } +} + +func (m *Manager) applyOnlineRateLimit(downloadSpeed *downloadSpeedLimitConfig, nowTime string) { + onlineRateLimit := m.updatePlatform.OnlineRateLimit + if onlineRateLimit.AllDayRateLimit.Enable { + downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.AllDayRateLimit.Bps) + downloadSpeed.IsOnlineSpeedLimit = true + downloadSpeed.DownloadSpeedLimitEnabled = true + } else if isTimeInRange(nowTime, onlineRateLimit.PeakTimeRateLimit.StartTime, onlineRateLimit.PeakTimeRateLimit.EndTime) && + onlineRateLimit.PeakTimeRateLimit.Enable { + downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.PeakTimeRateLimit.Bps) + downloadSpeed.IsOnlineSpeedLimit = true + downloadSpeed.DownloadSpeedLimitEnabled = true + } else if isTimeInRange(nowTime, onlineRateLimit.OffPeakTimeRateLimit.StartTime, onlineRateLimit.OffPeakTimeRateLimit.EndTime) && + onlineRateLimit.OffPeakTimeRateLimit.Enable { + downloadSpeed.LimitSpeed = strconv.Itoa(onlineRateLimit.OffPeakTimeRateLimit.Bps) + downloadSpeed.IsOnlineSpeedLimit = true + downloadSpeed.DownloadSpeedLimitEnabled = true + } else { + err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), downloadSpeed) + if err != nil { + downloadSpeed.IsOnlineSpeedLimit = false + downloadSpeed.LimitSpeed = strconv.FormatInt(defaultSpeedLimit, 10) + downloadSpeed.DownloadSpeedLimitEnabled = true + } + } +} + +func isTimeInRange(nowTimeStr, startStr, endStr string) bool { + now, err := parseTime(nowTimeStr) + if err != nil { + return false + } + start, err := parseTime(startStr) + if err != nil { + return false + } + end, err := parseTime(endStr) + if err != nil { + return false + } + if start.Before(end) { + return now.After(start) && now.Before(end) + } + return now.After(start) || now.Before(end) +} + +func parseTime(t string) (time.Time, error) { + if len(t) > 5 { + return time.Parse(timeFormatWithSec, t) + } + return time.Parse(timeFormat, t) +} + // calculateTotalDownloadSize calculates the total download size for all packages under the specified update mode func calculateTotalDownloadSize(mode system.UpdateType, updatablePkgsMap map[system.UpdateType][]string) (float64, []error) { totalNeedDownloadSize := 0.0 @@ -97,6 +166,39 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp m.statusManager.SetUpdateStatus(mode, system.DownloadErr) return nil, dbusutil.ToError(errors.New(string(errStr))) } + done := make(chan bool) + if m.config.IntranetUpdate { + //私有化更新有忙闲时下载限速的功能,需要在真正开始下载前刷新一下线上限速配置 + if err = m.refreshThrottlingFromPlatform(); err != nil { + logger.Warning("updatePlatform gen download speed limit failed", err) + } else { + go func() { + ticker := time.NewTicker(5 * time.Second) + startTime := time.Now() + defer ticker.Stop() + var count int + layout := "15:04:05" + for { + select { + case <-done: + logger.Info("online rate limit ticker stopped") + return + case t := <-ticker.C: + count++ + downloadStartServiceTime, err := time.ParseInLocation(layout, m.updatePlatform.OnlineRateLimit.ServerTime, time.Local) + if err != nil { + logger.Warningf("format OnlineRateLimit service time failed, %v", err) + return + } + logger.Infof("downloadStartServiceTime %v", downloadStartServiceTime) + nowTime := downloadStartServiceTime.Add(t.Sub(startTime)) + m.setEffectiveOnlineRateLimit(nowTime.Format(layout)) + } + } + }() + } + } + var job *Job var isExist bool @@ -137,8 +239,9 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp for currentJob != nil { j := currentJob currentJob = currentJob.next - limitEnable, limitConfig := m.updater.GetLimitConfig() - if limitEnable { + limitEnable, limitConfig, limitIsOnline := m.updater.GetLimitConfig() + logger.Infof("preDistUpgrade limitEnable: %v, limitConfig: %v, limitIsOnline: %v", limitEnable, limitConfig, limitIsOnline) + if limitEnable || limitIsOnline { j.option[aptLimitKey] = limitConfig } j.subRetryHookFn = func(job *Job) { @@ -194,6 +297,7 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp }, string(system.FailedStatus): func() error { if m.config.IntranetUpdate { + done <- true cacheFile := "/tmp/checkpolicy.cache" _ = os.RemoveAll(cacheFile) } @@ -277,6 +381,9 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp return nil }, string(system.SucceedStatus): func() error { + if m.config.IntranetUpdate { + done <- true + } msg := fmt.Sprintf("download %v package success", j.updateTyp.JobType()) m.updatePlatform.PostProcessEventMessage(updateplatform.ProcessEvent{ TaskID: 1, @@ -362,6 +469,9 @@ func (m *Manager) prepareDistUpgrade(sender dbus.Sender, origin system.UpdateTyp return nil }, string(system.EndStatus): func() error { + if m.config.IntranetUpdate { + done <- true + } if j.next == nil { logger.Info("running in last end hook") // 如果出现单项失败,其他的状态需要修改,IsDownloading->notDownload diff --git a/src/lastore-daemon/manager_ifc.go b/src/lastore-daemon/manager_ifc.go index 907610f03..5c05f4259 100644 --- a/src/lastore-daemon/manager_ifc.go +++ b/src/lastore-daemon/manager_ifc.go @@ -697,7 +697,7 @@ func (m *Manager) GetUpdateDetails(sender dbus.Sender, fd dbus.UnixFD, realTime // 使用流式复制,避免将整个文件读入内存 logFile, err := os.Open(logTmpPath) if err != nil { - logger.Warning(err) + logger.Warningf("failed to open file: %v", err) return dbusutil.ToError(err) } defer logFile.Close() diff --git a/src/lastore-daemon/manager_update.go b/src/lastore-daemon/manager_update.go index 9aaec4319..4f936fd4e 100644 --- a/src/lastore-daemon/manager_update.go +++ b/src/lastore-daemon/manager_update.go @@ -15,6 +15,7 @@ import ( "os/exec" "path/filepath" "slices" + "strconv" "strings" "sync" "time" @@ -382,11 +383,20 @@ func (m *Manager) updateSource(sender dbus.Sender) (*Job, error) { return nil } } + if updateplatform.IsForceUpdate(m.updatePlatform.Tp) && m.updatePlatform.Tp != updateplatform.UpdateRegularly { + m.stopTimerUnit(lastoreRegularlyUpdate) + } if m.updatePlatform.TimerHasChanged { msg := gettext.Tr("timer has changed. Please reboot to take effect") go m.sendNotify(updateNotifyShowOptional, 0, "preferences-system", "", msg, nil, nil, system.NotifyExpireTimeoutPrivate) } + if m.config.IntranetUpdate { + if err = m.refreshThrottlingFromPlatform(); err != nil { + logger.Warning("updatePlatform gen download speed limit failed", err) + } + } + err = m.updatePlatform.UpdateAllPlatformDataSync() if err != nil { logger.Warning(err) @@ -452,6 +462,31 @@ var getUpgradablePackageListMap = map[system.UpdateType]func([]string) (map[stri system.UnknownUpdate: getUnknownUpgradablePackagesMap, } +func (m *Manager) refreshThrottlingFromPlatform() error { + var downloadSpeed downloadSpeedLimitConfig + if err := json.Unmarshal([]byte(m.config.LocalDownloadSpeedLimitConfig), &downloadSpeed); err != nil { + downloadSpeed = downloadSpeedLimitConfig{ + DownloadSpeedLimitEnabled: true, + LimitSpeed: strconv.FormatInt(defaultSpeedLimit, 10), + IsOnlineSpeedLimit: false, + } + } + if err := m.updatePlatform.GenThrottlingByToken(); err != nil { + return fmt.Errorf("failed to get throttling data %w", err) + } + + m.applyOnlineRateLimit(&downloadSpeed, m.updatePlatform.OnlineRateLimit.ServerTime) + + downloadSpeedStr, err := json.Marshal(downloadSpeed) + if err != nil { + return fmt.Errorf("failed to marshal download speed limit") + } + logger.Infof("set download limit %v --> %v by platform", m.config.DownloadSpeedLimitConfig, string(downloadSpeedStr)) + m.updater.SetDownloadSpeedLimit(string(downloadSpeedStr)) + + return nil +} + // 生成系统更新内容和安全更新内容 func (m *Manager) generateUpdateInfo() (errList []error) { propPkgMap := make(map[string][]string) // updater的ClassifiedUpdatablePackages用 diff --git a/src/lastore-daemon/speed_meter.go b/src/lastore-daemon/speed_meter.go index a752b2d68..15716d755 100644 --- a/src/lastore-daemon/speed_meter.go +++ b/src/lastore-daemon/speed_meter.go @@ -24,7 +24,7 @@ func (s *SpeedMeter) SetDownloadSize(size int64) { } } -func (s *SpeedMeter) Speed(newProgress float64) int64 { +func (s *SpeedMeter) Speed(newProgress float64, deliverySpeed int64) int64 { now := time.Now() if s.startTime.IsZero() { @@ -38,8 +38,12 @@ func (s *SpeedMeter) Speed(newProgress float64) int64 { return 0 } - if now.Sub(s.updateTime).Seconds() > 3 { - s.speed = int64(1.024 * (newProgress - s.progress) * float64(s.DownloadSize) / now.Sub(s.updateTime).Seconds()) + if now.Sub(s.updateTime).Seconds() > 5 && newProgress > s.progress { + if deliverySpeed >= 0 { + s.speed = deliverySpeed + } else { + s.speed = int64(1.024 * (newProgress - s.progress) * float64(s.DownloadSize) / now.Sub(s.updateTime).Seconds()) + } s.updateTime = now s.progress = newProgress } diff --git a/src/lastore-daemon/stub.go b/src/lastore-daemon/stub.go index 0f38f45e4..c4cb24802 100644 --- a/src/lastore-daemon/stub.go +++ b/src/lastore-daemon/stub.go @@ -39,6 +39,7 @@ func (m *Manager) updateJobList() { m.PropsMu.RUnlock() systemOnChanging := false + m.setPropDownloadLimitOnChanging(m.jobManager.DownloadLimitOnChanging) for i, j2 := range list { @@ -175,4 +176,5 @@ func (j *Job) notifyAll() { _ = j.emitPropChangedProgress(j.Progress) _ = j.emitPropChangedSpeed(j.Speed) _ = j.emitPropChangedCancelable(j.Cancelable) + _ = j.emitPropChangedProto(j.Proto) } diff --git a/src/lastore-daemon/updater.go b/src/lastore-daemon/updater.go index 6e5190151..7d0269c49 100644 --- a/src/lastore-daemon/updater.go +++ b/src/lastore-daemon/updater.go @@ -27,7 +27,8 @@ import ( ) const ( - p2pService = "uos-p2p.service" + p2pService = "uos-p2p.service" + defaultSpeedLimit = 10240 ) type ApplicationUpdateInfo struct { @@ -46,6 +47,7 @@ type idleDownloadConfig struct { type downloadSpeedLimitConfig struct { DownloadSpeedLimitEnabled bool LimitSpeed string + IsOnlineSpeedLimit bool } type Updater struct { @@ -457,8 +459,8 @@ func (u *Updater) getUpdatablePackagesWithClassification(updateType system.Updat return updatablePkgs, updatablePkgsMap } -func (u *Updater) GetLimitConfig() (bool, string) { - return u.downloadSpeedLimitConfigObj.DownloadSpeedLimitEnabled, u.downloadSpeedLimitConfigObj.LimitSpeed +func (u *Updater) GetLimitConfig() (bool, string, bool) { + return u.downloadSpeedLimitConfigObj.DownloadSpeedLimitEnabled, u.downloadSpeedLimitConfigObj.LimitSpeed, u.downloadSpeedLimitConfigObj.IsOnlineSpeedLimit } func (u *Updater) getP2PUnit() (systemd1.Unit, error) { diff --git a/src/lastore-daemon/updater_ifc.go b/src/lastore-daemon/updater_ifc.go index 9320eb74e..6f0a560e5 100644 --- a/src/lastore-daemon/updater_ifc.go +++ b/src/lastore-daemon/updater_ifc.go @@ -6,6 +6,7 @@ package main import ( "encoding/json" + "strings" "time" "github.com/godbus/dbus/v5" @@ -130,8 +131,7 @@ func (u *Updater) SetIdleDownloadConfig(idleConfig string) *dbus.Error { } func (u *Updater) SetDownloadSpeedLimit(limitConfig string) *dbus.Error { - err := json.Unmarshal([]byte(limitConfig), &u.downloadSpeedLimitConfigObj) - if err != nil { + if err := json.Unmarshal([]byte(limitConfig), &u.downloadSpeedLimitConfigObj); err != nil { logger.Warning(err) return dbusutil.ToError(err) } @@ -144,16 +144,25 @@ func (u *Updater) SetDownloadSpeedLimit(limitConfig string) *dbus.Error { } changed := u.setPropDownloadSpeedLimitConfig(string(config)) if changed { - logger.Info("speed limit: ", u.downloadSpeedLimitConfigObj) - err := u.config.SetDownloadSpeedLimitConfig(string(config)) - if err != nil { - logger.Warning(err) - return + logger.Infof("set changed speed limit: %v", limitConfig) + // When IsOnlineSpeedLimit is false, it means manual speed limit is set. + // Save to both configs to ensure the value persists after daemon restart. + if strings.Contains(limitConfig, "IsOnlineSpeedLimit") && !u.downloadSpeedLimitConfigObj.IsOnlineSpeedLimit { + err := u.config.SetLocalDownloadSpeedLimitConfig(string(config)) + if err != nil { + logger.Warning(err) + return + } + } else { + err := u.config.SetDownloadSpeedLimitConfig(string(config)) + if err != nil { + logger.Warning(err) + return + } } u.manager.ChangePrepareDistUpgradeJobOption() } logger.Info("update limit config") - return }) } else { u.setDownloadSpeedLimitTimer.Reset(time.Second) diff --git a/usr/share/dsg/configs/org.deepin.dde.lastore/org.deepin.dde.lastore.json b/usr/share/dsg/configs/org.deepin.dde.lastore/org.deepin.dde.lastore.json index 5c71718d2..f771c4406 100644 --- a/usr/share/dsg/configs/org.deepin.dde.lastore/org.deepin.dde.lastore.json +++ b/usr/share/dsg/configs/org.deepin.dde.lastore/org.deepin.dde.lastore.json @@ -324,7 +324,7 @@ "visibility": "private" }, "download-speed-limit":{ - "value": "{\"DownloadSpeedLimitEnabled\":false,\"LimitSpeed\":\"10240\"}", + "value": "{\"DownloadSpeedLimitEnabled\":false,\"LimitSpeed\":\"10240\",\"IsOnlineSpeedLimit\":false}", "serial": 0, "flags": [ "global" @@ -334,6 +334,17 @@ "permissions": "readwrite", "visibility": "private" }, + "local-download-speed-limit":{ + "value": "{\"DownloadSpeedLimitEnabled\":false,\"LimitSpeed\":\"10240\",\"IsOnlineSpeedLimit\":false}", + "serial": 0, + "flags": [ + "global" + ], + "name": "LocalDownloadSpeedLimit", + "description": "", + "permissions": "readwrite", + "visibility": "private" + }, "lastore-daemon-status":{ "value": 0, "serial": 0,