Revert "[autotest] Endpoints for syncing jobs and hqes back to master"
This reverts commit 1b22ff2921c65b45e3727a8aaa852325f0f0cfb4.
BUG=418022
Change-Id: I30473d21d16911f620ee7f06917c13e6017f20fb
Reviewed-on: https://chromium-review.googlesource.com/219918
Reviewed-by: Owen Lin <owenlin@chromium.org>
Commit-Queue: Owen Lin <owenlin@chromium.org>
Tested-by: Owen Lin <owenlin@chromium.org>
diff --git a/frontend/afe/json_rpc/proxy.py b/frontend/afe/json_rpc/proxy.py
index 22e7343..b9e1121 100644
--- a/frontend/afe/json_rpc/proxy.py
+++ b/frontend/afe/json_rpc/proxy.py
@@ -19,29 +19,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
-from django import conf as django_conf
-import os
import urllib2
from autotest_lib.client.common_lib import error as exceptions
-from json import decoder
-
-
-# The serializers can't be imported if django isn't configured.
-# Using try except here doesn't work, as test_that initializes it's own
-# django environment (setup_django_lite_environment) which raises import errors
-# if the django dbutils have been previously imported, as importing them leaves
-# some state behind.
-# This the variable name must not be undefined or emtpy string.
-if os.environ.get(django_conf.ENVIRONMENT_VARIABLE, None):
- # Django JSON encoder uses the standard json encoder but can handle DateTime
- from django.core.serializers import json as django_encoder
- json_encoder_class = django_encoder.DjangoJSONEncoder
-else:
- from json import encoder as json_encoder
- json_encoder_class = json_encoder.JSONEncoder
-
-
class JSONRPCException(Exception):
pass
@@ -94,9 +74,13 @@
return ServiceProxy(self.__serviceURL, name, self.__headers)
def __call__(self, *args, **kwargs):
- postdata = json_encoder_class().encode({'method': self.__serviceName,
+ # pull in json imports lazily so that the library isn't required
+ # unless you actually need to do encoding and decoding
+ from json import decoder, encoder
+
+ postdata = encoder.JSONEncoder().encode({"method": self.__serviceName,
'params': args + (kwargs,),
- 'id': 'jsonrpc'})
+ 'id':'jsonrpc'})
request = urllib2.Request(self.__serviceURL, data=postdata,
headers=self.__headers)
respdata = urllib2.urlopen(request).read()
diff --git a/frontend/afe/model_logic.py b/frontend/afe/model_logic.py
index c77c80a..6329bd0 100644
--- a/frontend/afe/model_logic.py
+++ b/frontend/afe/model_logic.py
@@ -1117,18 +1117,6 @@
return instance
- def sanity_check_update_from_shard(self, shard, updated_serialized,
- *args, **kwargs):
- """Check if an update sent from a shard is legitimate.
-
- @raises error.UnallowedRecordsSentToMaster if an update is not
- legitimate.
- """
- raise NotImplementedError(
- 'sanity_check_update_from_shard must be implemented by subclass %s '
- 'for type %s' % type(self))
-
-
def update_from_serialized(self, serialized):
"""Updates local fields of an existing object from a serialized form.
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 0b03110..5eea6d8 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -16,10 +16,10 @@
from autotest_lib.frontend.afe import model_logic, model_attributes
from autotest_lib.frontend.afe import rdb_model_extensions
from autotest_lib.frontend import settings, thread_local
-from autotest_lib.client.common_lib import enum, error, host_protections
-from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import enum, host_protections, global_config
from autotest_lib.client.common_lib import host_queue_entry_states
-from autotest_lib.client.common_lib import control_data, priorities, decorators
+from autotest_lib.client.common_lib import control_data, priorities
+from autotest_lib.client.common_lib import decorators
# job options and user preferences
DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
@@ -1100,14 +1100,6 @@
self.shard = Shard.deserialize(data)
- def sanity_check_update_from_shard(self, shard, updated_serialized):
- if not self.shard_id == shard.id:
- raise error.UnallowedRecordsSentToMaster(
- 'Job id=%s is assigned to shard (%s). Cannot update it with %s '
- 'from shard %s.' % (self.id, self.shard_id, updated_serialized,
- shard.id))
-
-
# TIMEOUT is deprecated.
DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_timeout_default', default=24)
@@ -1434,14 +1426,6 @@
self.meta_host = Label.deserialize(data)
- def sanity_check_update_from_shard(self, shard, updated_serialized,
- job_ids_sent):
- if self.job_id not in job_ids_sent:
- raise error.UnallowedRecordsSentToMaster(
- 'Sent HostQueueEntry without corresponding '
- 'job entry: %s' % updated_serialized)
-
-
Status = host_queue_entry_states.Status
ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index dd8c963..80756e3 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -945,72 +945,9 @@
def find_records_for_shard(shard):
"""Find records that should be sent to a shard.
- @param shard: Shard to find records for.
-
- @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
+ @returns: Tuple of two lists for hosts and jobs: (hosts, jobs)
"""
hosts = models.Host.assign_to_shard(shard)
jobs = models.Job.assign_to_shard(shard)
return hosts, jobs
-
-
-def _persist_records_with_type_sent_from_shard(
- shard, records, record_type, *args, **kwargs):
- """
- Handle records of a specified type that were sent to the shard master.
-
- @param shard: The shard the records were sent from.
- @param records: The records sent in their serialized format.
- @param record_type: Type of the objects represented by records.
- @param args: Additional arguments that will be passed on to the sanity
- checks.
- @param kwargs: Additional arguments that will be passed on to the sanity
- checks.
-
- @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
-
- @returns: List of primary keys of the processed records.
- """
- pks = []
- for serialized_record in records:
- pk = serialized_record['id']
- try:
- current_record = record_type.objects.get(pk=pk)
- except record_type.DoesNotExist:
- raise error.UnallowedRecordsSentToMaster(
- 'Object with pk %s of type %s does not exist on master.' % (
- pk, record_type))
-
- current_record.sanity_check_update_from_shard(
- shard, serialized_record, *args, **kwargs)
-
- current_record.update_from_serialized(serialized_record)
- pks.append(pk)
- return pks
-
-
-def persist_records_sent_from_shard(shard, jobs, hqes):
- """
- Sanity checking then saving serialized records sent to master from shard.
-
- During heartbeats shards upload jobs and hostqueuentries. This performs
- some sanity checks on these and then updates the existing records for those
- entries with the updated ones from the heartbeat.
-
- The sanity checks include:
- - Checking if the objects sent already exist on the master.
- - Checking if the objects sent were assigned to this shard.
- - hostqueueentries must be sent together with their jobs.
-
- @param shard: The shard the records were sent from.
- @param jobs: The jobs the shard sent.
- @param hqes: The hostqueuentries the shart sent.
-
- @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
- """
- job_ids_sent = _persist_records_with_type_sent_from_shard(
- shard, jobs, models.Job)
-
- _persist_records_with_type_sent_from_shard(
- shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 9e175d5..b7a945c 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -305,23 +305,16 @@
process_pool_size=4))
-def shard_heartbeat(shard_hostname, jobs=(), hqes=()):
- """Register shard if not existing, then assign hosts and jobs.
+def shard_heartbeat(shard_hostname):
+ """Register shard if it doesn't exist, then assign hosts and jobs.
@param shard_hostname: Hostname of the calling shard
- @param jobs: Jobs in serialized form that should be updated with newer
- status from a shard.
- @param hqes: Hostqueueentries in serialized form that should be updated with
- newer status from a shard. Note that for every hostqueueentry
- the corresponding job must be in jobs.
-
@returns: Serialized representations of hosts, jobs and their dependencies
to be inserted into a shard's database.
"""
timer = stats.Timer('shard_heartbeat')
with timer:
shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
- rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
hosts, jobs = rpc_utils.find_records_for_shard(shard_obj)
return {
'hosts': [host.serialize() for host in hosts],
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
index 9e7184b..198e614 100644
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ b/frontend/afe/site_rpc_interface_unittest.py
@@ -14,8 +14,6 @@
import StringIO
import unittest
-from django.core import exceptions as django_exceptions
-
import common
from autotest_lib.frontend import setup_django_environment
@@ -380,46 +378,11 @@
site_rpc_interface.set_boto_key(boto_key)
- def _get_records_for_sending_to_master(self):
- return [{'control_file': 'foo',
- 'control_type': 1,
- 'created_on': datetime.datetime(2014, 8, 21),
- 'drone_set': None,
- 'email_list': '',
- 'max_runtime_hrs': 72,
- 'max_runtime_mins': 1440,
- 'name': 'dummy',
- 'owner': 'autotest_system',
- 'parse_failed_repair': True,
- 'priority': 40,
- 'reboot_after': 0,
- 'reboot_before': 1,
- 'run_reset': True,
- 'run_verify': False,
- 'synch_count': 0,
- 'test_retry': 10,
- 'timeout': 24,
- 'timeout_mins': 1440,
- 'id': 1
- }], [{
- 'aborted': False,
- 'active': False,
- 'complete': False,
- 'deleted': False,
- 'execution_subdir': '',
- 'finished_on': None,
- 'started_on': None,
- 'status': 'Queued',
- 'id': 1
- }]
-
-
- def _do_heartbeat_and_assert_response(self, shard_hostname=None, upload_jobs=[],
- upload_hqes=[], **kwargs):
+ def _do_heartbeat_and_assert_response(self, shard_hostname=None, **kwargs):
shard_hostname = shard_hostname or str(
models.Shard.objects.count() + 1)
retval = site_rpc_interface.shard_heartbeat(
- shard_hostname=shard_hostname, jobs=upload_jobs, hqes=upload_hqes)
+ shard_hostname=shard_hostname)
self._assert_shard_heartbeat_response(shard_hostname, retval,
**kwargs)
@@ -452,62 +415,6 @@
self.assertEqual(returned_hqes, expected_hqes)
- def _send_records_to_master_helper(
- self, jobs, hqes, shard_hostname='host1',
- exception_to_throw=error.UnallowedRecordsSentToMaster):
- job_id = rpc_interface.create_job(name='dummy', priority='Medium',
- control_file='foo',
- control_type=SERVER,
- test_retry=10, hostless=True)
- job = models.Job.objects.get(pk=job_id)
- shard = models.Shard.objects.create(hostname='host1')
- job.shard = shard
- job.save()
- hqe = job.hostqueueentry_set.all()[0]
- if not exception_to_throw:
- self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname,
- upload_jobs=jobs, upload_hqes=hqes)
- else:
- self.assertRaises(
- exception_to_throw,
- self._do_heartbeat_and_assert_response,
- shard_hostname=shard_hostname,
- upload_jobs=jobs, upload_hqes=hqes)
-
-
- def testSendingRecordsToMaster(self):
- jobs, hqes = self._get_records_for_sending_to_master()
- hqes[0]['status'] = 'Completed'
- self._send_records_to_master_helper(
- jobs=jobs, hqes=hqes, exception_to_throw=None)
-
- # Check the entry was actually written to db
- self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
- 'Completed')
-
-
- def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
- jobs, hqes = self._get_records_for_sending_to_master()
- models.Shard.objects.create(hostname='other_shard')
- self._send_records_to_master_helper(
- jobs=jobs, hqes=hqes, shard_hostname='other_shard')
-
-
- def testSendingRecordsToMasterJobHqeWithoutJob(self):
- _, hqes = self._get_records_for_sending_to_master()
- self._send_records_to_master_helper(
- jobs=[], hqes=hqes)
-
-
- def testSendingRecordsToMasterNotExistingJob(self):
- jobs, hqes = self._get_records_for_sending_to_master()
- jobs[0]['id'] = 3
-
- self._send_records_to_master_helper(
- jobs=jobs, hqes=hqes)
-
-
def testShardHeartbeatFetchHostlessJob(self):
models.Label.objects.create(name='board:lumpy', platform=True)
label2 = models.Label.objects.create(name='bluetooth', platform=False)