diff --git a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py
index fee5838d442e85785842ac408187f7182375a995..14435df9a719a7d128efc0d61b4840d838eadbbb 100755
--- a/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py
+++ b/SAS/TMSS/backend/services/scheduling/test/t_dynamic_scheduling.py
@@ -1495,6 +1495,9 @@ class TestTriggers(BaseDynamicSchedulingTestCase):
         cls.scheduling_set.project.trigger_priority = 1
         cls.scheduling_set.project.save()
 
+        # create some trigger quota for this project
+        models.ProjectQuota.objects.create(project=cls.scheduling_set.project, value=1000, resource_type=models.ResourceType.objects.get(quantity__value=models.Quantity.Choices.NUMBER.value, name__icontains='trigger'))
+
         # create a second scheduling set in a project that allows triggers and has higher trigger_priority
         cls.scheduling_set_high_trigger_priority = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
         cls.scheduling_set_high_trigger_priority.project.can_trigger = True
diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py
index 31c89b3c0cdab0c278011b754f8ab0bae8ef545c..e4e79090dda63464d8db0af017bd0a640f349f14 100644
--- a/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py
+++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/specification.py
@@ -567,6 +567,35 @@ class Project(RefreshFromDbInvalidatesCachedPropertiesMixin, NamedCommonPK):
     def project(self):
         return self
 
+    @property
+    def scheduling_unit_blueprints(self) -> QuerySet:
+        '''return a queryset containing all this project's scheduling_unit_blueprints'''
+        return SchedulingUnitBlueprint.objects.filter(draft__scheduling_set__project=self)
+
+    @property
+    def nr_of_allowed_triggers(self) -> int:
+        '''return the number of allowed triggers for this project'''
+        trigger_quota = self.quota.filter(resource_type__quantity__value=Quantity.Choices.NUMBER.value).filter(resource_type__name__icontains='trigger').all()
+        if trigger_quota.count() == 0:
+            return 0
+        if trigger_quota.count() == 1:
+            return trigger_quota[0].value
+        raise TMSSException("Multiple trigger quota defined for project %s" % self.name)
+
+    @property
+    def nr_of_used_triggers(self) -> int:
+        '''return the number of successfully used/in-use triggers for this project'''
+        # TODO: TMSS-771 exclude non-accepted/failed_since scheduling units.
+        return self.scheduling_unit_blueprints.filter(interrupts_telescope=True).\
+                                               filter(status__value__in=(SchedulingUnitStatus.Choices.SCHEDULED.value,
+                                                                         SchedulingUnitStatus.Choices.OBSERVING.value,
+                                                                         SchedulingUnitStatus.Choices.OBSERVED.value,
+                                                                         SchedulingUnitStatus.Choices.PROCESSING.value,
+                                                                         SchedulingUnitStatus.Choices.PROCESSED.value,
+                                                                         SchedulingUnitStatus.Choices.INGESTING.value,
+                                                                         SchedulingUnitStatus.Choices.INGESTED.value,
+                                                                         SchedulingUnitStatus.Choices.FINISHED.value)).count()
+
 
 class ProjectQuota(Model):
     project = ForeignKey('Project', related_name="quota", on_delete=PROTECT, help_text='Project to wich this quota belongs.')  # protected to avoid accidents
@@ -994,6 +1023,20 @@ class SchedulingUnitBlueprint(ProjectPropertyMixin, TemplateSchemaMixin, NamedCo
             events += task.system_events
         return set(events)
 
+    def check_trigger_accounting(self):
+        '''check if this unit is triggered, and if the project's trigger quota is not exceeded; else set observations to error.'''
+        if self.interrupts_telescope:
+            if self.project.nr_of_used_triggers >= self.project.nr_of_allowed_triggers:
+                from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskType, SubtaskState
+                with transaction.atomic():
+                    for obs_subtask in self.subtasks.filter(specifications_template__type__value=SubtaskType.Choices.OBSERVATION.value).all():
+                        obs_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
+                        obs_subtask.error_reason = "trigger quota #%d exceeded for project %s" % (self.project.nr_of_allowed_triggers,  self.project.name)
+                        obs_subtask.save()
+
+                # refresh to update aggregated fields
+                self.refresh_from_db()
+
 
 class TaskDraft(NamedCommon, TemplateSchemaMixin, ProjectPropertyMixin):
     short_description = CharField(max_length=32, help_text='A short description of this task, usually the name of the target and abbreviated task type.', blank=True, default="")
diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py
index a5dbb3e83ffbc7d8ee72608f3804faf2429b211b..836cc119f42f27508c2df997cd1cd9b047cf80cf 100644
--- a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py
+++ b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py
@@ -156,7 +156,9 @@ class ProjectSerializer(DynamicRelationalHyperlinkedModelSerializer):
     class Meta:
         model = models.Project
         fields = '__all__'
-        extra_fields = ['name','quota'] #, 'scheduling_sets']
+        extra_fields = ['name','quota', 'nr_of_allowed_triggers', 'nr_of_used_triggers'] #, 'scheduling_sets']
+        read_only_fields = ['nr_of_allowed_triggers', 'nr_of_used_triggers']
+
         expandable_fields = {
             'cycles': ('lofar.sas.tmss.tmss.tmssapp.serializers.CycleSerializer', {'many': True}),
             'quota': ('lofar.sas.tmss.tmss.tmssapp.serializers.ProjectQuotaSerializer', {'many': True})
diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py
index 5545c86be6e377a134faf03e078c075b60a758a0..3cb97f373b58448ce9ed11eaa7aa7296270be430 100644
--- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py
+++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py
@@ -1186,6 +1186,9 @@ def mark_subtasks_and_successors_as_defined(subtask: Subtask):
 
 
 def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
+    # before scheduling, check_trigger_accounting which may result in subtask state->error to prevent scheduling
+    subtask.task_blueprint.scheduling_unit_blueprint.check_trigger_accounting()
+
     if subtask.state.value != SubtaskState.Choices.DEFINED.value:
         raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))
 
diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py
index ac35f78480d31f530bb5fc4edd95bc98e45210b9..006a1da70718e7420f8f1b4c55ec517327e9c1d5 100644
--- a/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py
+++ b/SAS/TMSS/backend/src/tmss/tmssapp/tasks.py
@@ -459,6 +459,11 @@ def create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit
                 at = parser.parse(constraints['time']['at'], ignoretz=True)
                 set_scheduling_unit_blueprint_start_times(scheduling_unit_blueprint, at)
 
+        if scheduling_unit_blueprint.interrupts_telescope:
+            # check the trigger accounting, may result in scheduling_unit_blueprint ERROR status
+            # it's up to the user to deal with this trigger unit in error status.
+            scheduling_unit_blueprint.check_trigger_accounting()
+
         return scheduling_unit_blueprint
 
 
diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/views.py b/SAS/TMSS/backend/src/tmss/tmssapp/views.py
index e655a2e97345e8d49588e026637e99a8bb5bb007..23947c2e09a9c5bfd28d3b86237fad1ce1ff465d 100644
--- a/SAS/TMSS/backend/src/tmss/tmssapp/views.py
+++ b/SAS/TMSS/backend/src/tmss/tmssapp/views.py
@@ -376,9 +376,16 @@ def submit_trigger(request):
         logger.error(msg)
         return RestResponse(msg, status=status.HTTP_403_FORBIDDEN)
 
+    # check quota
+    if scheduling_set.project.nr_of_allowed_triggers == 0:
+        msg = 'Project \'%s\' does not have any trigger quota' % scheduling_set.project
+        logger.error(msg)
+        return RestResponse(msg, status=status.HTTP_403_FORBIDDEN)
+
     from django.db import transaction
     with transaction.atomic():
         # try to create the draft from the trigger_doc
+        # we always create the draft (and a blueprint if mode=='run') and check the number of allowed triggers upon scheduling time
         scheduling_unit_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template,
                                                                                               scheduling_set,
                                                                                               name=trigger_doc['name'],
diff --git a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py
index aebb17dfa1434f32fc501d8c30ea4f3bce8c2c12..c78a2a37fb1ba3cb34cadf4f0e5eb6fa013745d4 100755
--- a/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py
+++ b/SAS/TMSS/backend/test/t_tmssapp_specification_REST_API.py
@@ -44,7 +44,7 @@ exit_with_skipped_code_if_skip_integration_tests()
 from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import *
 from lofar.sas.tmss.test.tmss_test_data_django_models import *
 from lofar.sas.tmss.tmss.tmssapp import models
-from lofar.sas.tmss.test.test_utils import assertUrlList
+from lofar.sas.tmss.test.test_utils import assertUrlList, set_subtask_state_following_allowed_transitions
 from django.contrib.auth.models import Group, Permission
 from django.contrib.auth import get_user_model
 User = get_user_model()
@@ -3240,6 +3240,9 @@ class SubmitTriggerTestCase(unittest.TestCase):
         project = models.Project.objects.create(**Project_test_data(can_trigger=True))
         scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data("scheduling set", project=project))
 
+        # create some trigger quota for this project
+        models.ProjectQuota.objects.create(project=project, value=1000, resource_type=models.ResourceType.objects.get(quantity__value=models.Quantity.Choices.NUMBER.value, name__icontains='trigger'))
+
         with tmss_test_env.create_tmss_client() as client:
             for strategy_template in models.SchedulingUnitObservingStrategyTemplate.objects.all():
                 logger.info("--------------------------------------------------------------------------------------------")
@@ -3276,6 +3279,69 @@ class SubmitTriggerTestCase(unittest.TestCase):
                 # check that the draft has a 'before' time constraint (and assume that the scheduling constraints work in the dynamic scheduler)
                 self.assertEqual(end_before, parser.parse(scheduling_unit_draft['scheduling_constraints_doc']['time']['before']))
 
+    def test_trigger_accounting(self):
+        '''Test if triggers are counted, and new triggers are blocked if max allowed triggers exceeded.'''
+        # start with a project that cannot trigger (allow it later in the test)
+        project = models.Project.objects.create(**Project_test_data(can_trigger=False))
+        scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data("scheduling set", project=project))
+
+        self.assertEqual(0, project.nr_of_allowed_triggers)
+        self.assertEqual(0, project.nr_of_used_triggers)
+
+        with tmss_test_env.create_tmss_client() as client:
+            trigger_doc = client.get_trigger_specification_doc_for_scheduling_unit_observing_strategy_template("Simple Observation")
+            trigger_doc['scheduling_set_id'] = scheduling_set.id
+
+            for mode in ('test','run'):
+                trigger_doc['mode'] = mode
+                with self.assertRaises(Exception) as e:
+                    client.submit_trigger(trigger_doc)
+                self.assertTrue('does not allow triggers' in str(e.exception))
+
+            # enable trigger in project
+            project.can_trigger = True
+            project.save()
+
+            # check that we did use any successful triggers yet
+            self.assertEqual(0, project.nr_of_used_triggers)
+
+            # and we (still) did not create any trigger quota, so nr_of_allowed_triggers should still be 0
+            self.assertEqual(0, project.nr_of_allowed_triggers)
+
+            # a new test/run trigger should still not be accepted
+            for mode in ('test','run'):
+                trigger_doc['mode'] = mode
+                with self.assertRaises(Exception) as e:
+                    client.submit_trigger(trigger_doc)
+                self.assertTrue('does not have any trigger quota' in str(e.exception))
+
+            # create some trigger quota for this project
+            models.ProjectQuota.objects.create(project=project, value=1, resource_type=models.ResourceType.objects.get(quantity__value=models.Quantity.Choices.NUMBER.value, name__icontains='trigger'))
+            self.assertEqual(1, project.nr_of_allowed_triggers)
+            self.assertEqual(0, project.nr_of_used_triggers)
+
+            # submit another trigger
+            trigger_doc['mode'] = 'run'
+            scheduling_unit_blueprint = client.submit_trigger(trigger_doc)
+            self.assertIsNotNone(scheduling_unit_blueprint)
+            self.assertEqual('schedulable', scheduling_unit_blueprint['status_value'])
+
+            # the scheduling_unit is not finished yet, so nr_of_used_triggers should still be 0
+            self.assertEqual(0, project.nr_of_used_triggers)
+
+            # simulate running the scheduling_unit_blueprint
+            for subtask in models.Subtask.objects.filter(task_blueprint__scheduling_unit_blueprint__id=scheduling_unit_blueprint['id']).all():
+                set_subtask_state_following_allowed_transitions(subtask, 'finished')
+
+            # the scheduling_unit finished, so nr_of_used_triggers should now be 1
+            self.assertEqual(1, project.nr_of_used_triggers)
+
+            # submit another trigger, this should exceed quota
+            trigger_doc['mode'] = 'run'
+            scheduling_unit_blueprint = client.submit_trigger(trigger_doc)
+            self.assertIsNotNone(scheduling_unit_blueprint)
+            self.assertEqual('error', scheduling_unit_blueprint['status_value'])
+
 
 # todo: move to t_permissions (I tried, but it broke)
 class SystemRolePermissionTestCase(unittest.TestCase):