examples: rework nm-up-many.py for ratelimiting parallel activations

The previous implementation did some ratelimiting, namely how many parallel
ActivateConnection D-Bus calls are in fly. This way we are able to kick off
many parallel calls, but the activations themselves were not ratelimited.

Rework the code. Now there are two rate limits (that can be set via environment
variables):

  NUM_PARALLEL_STARTING
  NUM_PARALLEL_IN_PROGRESS

This allows more control about how much is happening in parallel. If we are
going to activate 1000 profiles, then it matters that we do things in parallel,
but not everything at the same time.
This commit is contained in:
Thomas Haller 2021-06-21 23:40:18 +02:00
parent 3e7a589972
commit 938f9b075f
No known key found for this signature in database
GPG key ID: 29C2366E4DFC5728

View file

@ -9,21 +9,34 @@
# probably would run the context only at one point as long as # probably would run the context only at one point as long as
# the application is running (from the main function). # the application is running (from the main function).
import sys
import gi import gi
import os
import sys
import time import time
gi.require_version("NM", "1.0") gi.require_version("NM", "1.0")
from gi.repository import NM, GLib from gi.repository import NM, GLib
start_time = time.monotonic()
class MyError(Exception): class MyError(Exception):
pass pass
NUM_PARALLEL_STARTING = 10
NUM_PARALLEL_IN_PROGRESS = 50
s = os.getenv("NUM_PARALLEL_STARTING")
if s:
NUM_PARALLEL_STARTING = int(s)
s = os.getenv("NUM_PARALLEL_IN_PROGRESS")
if s:
NUM_PARALLEL_IN_PROGRESS = int(s)
start_time = time.monotonic()
def log(msg): def log(msg):
# use nm_utils_print(), so that the log messages are in synch with # use nm_utils_print(), so that the log messages are in synch with
# LIBNM_CLIENT_DEBUG=trace messages. # LIBNM_CLIENT_DEBUG=trace messages.
@ -142,212 +155,176 @@ def find_connections(nmc, argv):
return result return result
def nmc_activate_start(nmc, con): class Activation(object):
ACTIVATION_STATE_START = "start"
ACTIVATION_STATE_STARTING = "starting"
ACTIVATION_STATE_WAITING = "waiting"
ACTIVATION_STATE_DONE = "done"
# Call nmc.activate_connection_async() and return a user data def __init__(self, con):
# with the information about the pending operation. self.con = con
self.state = Activation.ACTIVATION_STATE_START
self.result_msg = None
self.result_ac = None
self.ac_result = None
self.wait_id = None
activation = { def __str__(self):
"con": con, return "%s (%s)" % (self.con.get_id(), self.con.get_uuid())
"result": None,
"result_msg": None,
"result_ac": None,
"ac_result": None,
}
log("activation %s (%s) start asynchronously" % (con.get_id(), con.get_uuid())) def is_done(self, log=log):
def cb(source_object, res, activation): if self.state == Activation.ACTIVATION_STATE_DONE:
# The callback does not call other code for signaling the return True
# completion. Instead, we remember in "activation" that
# the callback was completed.
#
# Other code will repeatedly go through the "activation_list"
# and find those that are completed (nmc_activate_find_completed()).
try:
ac = nmc.activate_connection_finish(res)
except Exception as e:
activation["result"] = False
activation["result_msg"] = str(e)
else:
activation["result"] = True
activation["result_msg"] = "success"
activation["result_ac"] = ac
nmc.activate_connection_async(con, None, None, None, cb, activation) if self.state != Activation.ACTIVATION_STATE_WAITING:
return False
return activation def _log_result(self, msg, done_with_success=False):
log("connection %s done: %s" % (self, msg))
self.state = Activation.ACTIVATION_STATE_DONE
self.done_with_success = done_with_success
return True
ac = self.result_ac
if not ac:
return _log_result(self, "failed activation call (%s)" % (self.result_msg,))
def nmc_activate_find_completed(activation_list): if ac.get_client() is None:
return _log_result(self, "active connection disappeared")
# Iterate over list of "activation" data, find the first if ac.get_state() > NM.ActiveConnectionState.ACTIVATED:
# one that is completed, remove it from the list and return return _log_result(
# it. self, "connection failed to activate (state %s)" % (ac.get_state())
for idx, activation in enumerate(activation_list):
if activation["result"] is not None:
del activation_list[idx]
return activation
return None
def nmc_activate_complete(
nmc, activation_list, completed_list, num_parallel_invocations
):
# We schedule activations asynchronously and in parallel. However, we
# still want to rate limit the number of parallel activations. This
# function does that: if there are more than "num_parallel_invocations" activations
# in progress, then wait until the excess number of them completed.
# The completed ones move from "activation_list" over to "completed_list".
completed = 0
while True:
need_to_wait = len(activation_list) > num_parallel_invocations
# Even if we don't need to wait (that is, the list of pending activations
# is reasonably short), we still tentatively iterate the GMainContext a bit.
if not nmc.get_main_context().iteration(may_block=need_to_wait):
if need_to_wait:
continue
# Ok, nothing ready yet.
break
# this is not efficient after each iteration(), but it's good enough.
# The activation list is supposed to be short.
activation = nmc_activate_find_completed(activation_list)
if activation is None:
continue
con = activation["con"]
log(
"activation %s (%s) start complete: %s%s"
% (
con.get_id(),
con.get_uuid(),
activation["result_msg"],
(
""
if not activation["result"]
else (" (%s)" % (activation["result_ac"].get_path()))
),
) )
)
completed += 1
completed_list.append(activation) if ac.get_state() == NM.ActiveConnectionState.ACTIVATED:
return _log_result(
self, "connection successfully activated", done_with_success=True
)
if completed > 0: return False
log(
"completed %d activations, %d activations still pending" def start(self, nmc, cancellable=None, activated_callback=None, log=log):
% (completed, len(activation_list))
# Call nmc.activate_connection_async() and return a user data
# with the information about the pending operation.
assert self.state == Activation.ACTIVATION_STATE_START
self.state = Activation.ACTIVATION_STATE_STARTING
log("activation %s start asynchronously" % (self))
def cb_activate_connection(source_object, res):
assert self.state == Activation.ACTIVATION_STATE_STARTING
try:
ac = nmc.activate_connection_finish(res)
except Exception as e:
self.result_msg = str(e)
log(
"activation %s started asynchronously failed: %s"
% (self, self.result_msg)
)
else:
self.result_msg = "success"
self.result_ac = ac
log(
"activation %s started asynchronously success: %s"
% (self, ac.get_path())
)
self.state = Activation.ACTIVATION_STATE_WAITING
if activated_callback is not None:
activated_callback(self)
nmc.activate_connection_async(
self.con, None, None, cancellable, cb_activate_connection
) )
def wait(self, done_callback=None, log=log):
def nmc_activate_all(nmc, cons): assert self.state == Activation.ACTIVATION_STATE_WAITING
assert self.result_ac
assert self.wait_id is None
# iterate of all connections ("cons") and activate them def cb_wait(ac, state):
# in parallel. nmc_activate_complete() is used to rate limits if self.is_done(log=log):
# how many parallel invocations we allow. self.result_ac.disconnect(self.wait_id)
self.wait_id = None
done_callback(self)
num_parallel_invocations = 100 log("waiting for %s to fully activate" % (self))
self.wait_id = self.result_ac.connect("notify", cb_wait)
activation_list = []
completed_list = []
for c in cons:
activation = nmc_activate_start(nmc, c)
activation_list.append(activation)
nmc_activate_complete(
nmc, activation_list, completed_list, num_parallel_invocations
)
nmc_activate_complete(nmc, activation_list, completed_list, 0)
assert not activation_list
assert len(completed_list) == len(cons)
return completed_list
def nmc_activate_wait_for_pending(nmc, completed_list): class Manager(object):
def __init__(self, nmc, cons):
# go through the list of activations and wait that they self.nmc = nmc
# all reach a final state. That is, either that they are failed
# or fully ACTIVATED state.
log("wait for all active connection to either reach ACTIVATED state or fail...") self.ac_start = [Activation(c) for c in cons]
self.ac_starting = []
self.ac_waiting = []
self.ac_done = []
def log_result(activation, message): def _log(self, msg):
activation["ac_result"] = message
log(
"connection %s (%s) activation fully completed: %s"
% (ac.get_id(), ac.get_uuid(), message)
)
while True: lists = [self.ac_start, self.ac_starting, self.ac_waiting, self.ac_done]
# again, it's not efficient to check the entire list for completion n = sum(len(l) for l in lists)
# after each g_main_context_iteration(). But "completed_list" should n = str(len(str(n)))
# be reasonably small.
activation = None prefix = "/".join((("%0" + n + "d") % len(l)) for l in lists)
for idx, activ in enumerate(completed_list): log("%s: %s" % (prefix, msg))
if activ["ac_result"] is not None:
continue def ac_run(self):
if activ["result"] is False:
log_result(activ, "failed to start activation") loop = GLib.MainLoop(self.nmc.get_main_context())
continue
ac = activ["result_ac"] while self.ac_start or self.ac_starting or self.ac_waiting:
if ac.get_client() is None:
log_result(activ, "active connection disappeared") rate_limit_parallel_in_progress = (
continue len(self.ac_starting) + len(self.ac_waiting) >= NUM_PARALLEL_IN_PROGRESS
if ac.get_state() == NM.ActiveConnectionState.ACTIVATED: )
log_result(activ, "connection successfully activated")
continue if (
if ac.get_state() > NM.ActiveConnectionState.ACTIVATED: not rate_limit_parallel_in_progress
log_result( and self.ac_start
activ, "connection failed to activate (state %s)" % (ac.get_state()) and len(self.ac_starting) < NUM_PARALLEL_STARTING
):
activation = self.ac_start.pop(0)
self.ac_starting.append(activation)
def cb_activated(activation2):
self.ac_starting.remove(activation2)
if activation2.is_done(log=self._log):
self.ac_done.append(activation2)
else:
self.ac_waiting.append(activation2)
def cb_done(activation3):
self.ac_waiting.remove(activation3)
self.ac_done.append(activation3)
loop.quit()
activation2.wait(done_callback=cb_done, log=self._log)
loop.quit()
activation.start(
self.nmc, activated_callback=cb_activated, log=self._log
) )
continue continue
activation = activ
break
if activation is None: loop.run()
log("no more activation to wait for")
break
nmc.get_main_context().iteration(may_block=True) res_list = [ac.done_with_success for ac in self.ac_done]
log(
"%s out of %s activations are now successfully activated"
% (sum(res_list), len(self.ac_done))
)
def nmc_activate_check_good(nmc, completed_list): return all(res_list)
# go through the list of activations and check that all of them are
# in a good state.
n_good = 0
n_bad = 0
for activ in completed_list:
if activ["result"] is False:
n_bad += 1
continue
ac = activ["result_ac"]
if ac.get_client() is None:
n_bad += 1
continue
if ac.get_state() != NM.ActiveConnectionState.ACTIVATED:
n_bad += 1
continue
n_good += 1
log(
"%d out of %d activations are now successfully activated"
% (n_good, n_good + n_bad)
)
return n_bad == 0
def main(): def main():
@ -355,11 +332,7 @@ def main():
cons = find_connections(nmc, sys.argv[1:]) cons = find_connections(nmc, sys.argv[1:])
completed_list = nmc_activate_all(nmc, cons) all_good = Manager(nmc, cons).ac_run()
nmc_activate_wait_for_pending(nmc, completed_list)
all_good = nmc_activate_check_good(nmc, completed_list)
nmc_transfer_ref = [nmc] nmc_transfer_ref = [nmc]
del nmc del nmc