16
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
17
#
18
18
#########################################################################
19
+ from django .conf import settings
19
20
from lxml import etree
21
+ from django .contrib .auth import get_user_model
22
+ from django .utils import timezone
20
23
21
- from geonode .tests .base import GeoNodeBaseSimpleTestSupport
24
+ from geonode .base .populate_test_data import create_single_dataset
25
+ from geonode .harvesting .models import HarvestableResource , Harvester
26
+ from geonode .tests .base import GeoNodeBaseTestSupport
27
+ from geonode .upload .handlers .remote .wms import create_harvestable_resource
22
28
from geonode .utils import get_xpath_value
23
29
24
30
25
- class UtilsTestCase (GeoNodeBaseSimpleTestSupport ):
31
+ class UtilsTestCase (GeoNodeBaseTestSupport ):
32
+
33
+ @classmethod
34
+ def setUpClass (cls ):
35
+ super ().setUpClass ()
36
+ cls .service_url = f"{ settings .GEOSERVER_LOCATION } ows"
37
+ cls .user = get_user_model ().objects .get (username = "admin" )
38
+ cls .harvester , _ = Harvester .objects .get_or_create (
39
+ remote_url = cls .service_url ,
40
+ name = "harvester1" ,
41
+ default_owner = cls .user ,
42
+ harvester_type = "geonode.harvesting.harvesters.wms.OgcWmsHarvester" ,
43
+ )
44
+ cls .dataset = create_single_dataset (name = "example_harvestable_resource" )
45
+
26
46
def test_get_xpath_value (self ):
27
47
fixtures = [
28
48
(
@@ -48,3 +68,87 @@ def test_get_xpath_value(self):
48
68
xml_el = etree .fromstring (element )
49
69
result = get_xpath_value (xml_el , xpath_expr , nsmap = nsmap )
50
70
self .assertEqual (result , expected )
71
+
72
+ def test_create_harvestable_resource (self ):
73
+ """
74
+ Given a geonode resource and a service url, should link the
75
+ harvestable resource with the harvester
76
+ """
77
+
78
+ # be sure that the dataset was not linked to the harvester
79
+ try :
80
+ self .assertFalse (HarvestableResource .objects .filter (geonode_resource = self .dataset ).exists ())
81
+
82
+ result = create_harvestable_resource (self .dataset , self .service_url )
83
+ # nothing is usually returned
84
+ self .assertIsNone (result )
85
+
86
+ # evaluating that the harvestable resource has been created
87
+ hresource = HarvestableResource .objects .filter (geonode_resource = self .dataset ).first ()
88
+ self .assertIsNotNone (hresource )
89
+ self .assertEqual (hresource .geonode_resource .pk , self .dataset .pk )
90
+ finally :
91
+ HarvestableResource .objects .filter (geonode_resource = self .dataset ).delete ()
92
+
93
+ def test_create_harvestable_resource_different_service_url (self ):
94
+ """
95
+ Should ignore if the service url provided does not exists in the DB
96
+ """
97
+ with self .assertLogs (level = "WARNING" ) as _log :
98
+ create_harvestable_resource (self .dataset , "http://someurl.com" )
99
+ self .assertIn ("The WMS layer does not belong to any known remote service" , [x .message for x in _log .records ])
100
+
101
+ def test_create_harvestable_resource_on_existing_harvestable_resource (self ):
102
+ """
103
+ If the harvestable resource already exists, it should link the newly created geonode resource
104
+ with the harvestable resource, so the harvester will not harvest it again
105
+ """
106
+ try :
107
+ self .__create_harvestable_resource ()
108
+
109
+ result = create_harvestable_resource (self .dataset , self .service_url )
110
+ # nothing is usually returned
111
+ self .assertIsNone (result )
112
+
113
+ # evaluating that the harvestable resource has been created
114
+ hresource = HarvestableResource .objects .filter (geonode_resource = self .dataset ).first ()
115
+ self .assertIsNotNone (hresource )
116
+ self .assertEqual (hresource .geonode_resource .pk , self .dataset .pk )
117
+ finally :
118
+ HarvestableResource .objects .filter (geonode_resource = self .dataset ).delete ()
119
+
120
+ def test_create_harvestable_resource_different_geonode_resource (self ):
121
+ """
122
+ If the harvestable resource already have geonode resource aligned, it should
123
+ just ignore and log the information
124
+ """
125
+ try :
126
+ self .__create_harvestable_resource (attach_resource = True )
127
+ dataset2 = create_single_dataset ("second dataset" )
128
+ dataset2 .alternate = self .dataset .alternate
129
+ dataset2 .save ()
130
+ with self .assertLogs (level = "WARNING" ) as _log :
131
+ create_harvestable_resource (dataset2 , self .service_url )
132
+ self .assertIn (
133
+ "The Resource assigned to the current HarvestableResource is different from the one provided, skipping..." ,
134
+ [x .message for x in _log .records ],
135
+ )
136
+
137
+ finally :
138
+ HarvestableResource .objects .filter (geonode_resource = self .dataset ).delete ()
139
+
140
+ def __create_harvestable_resource (self , attach_resource = False ):
141
+ HarvestableResource .objects .get_or_create (
142
+ harvester = self .harvester ,
143
+ unique_identifier = self .dataset .alternate ,
144
+ geonode_resource = None if not attach_resource else self .dataset ,
145
+ title = "Some title" ,
146
+ defaults = {
147
+ "should_be_harvested" : True ,
148
+ "remote_resource_type" : self .dataset .resource_type ,
149
+ "last_updated" : timezone .now (),
150
+ "last_refreshed" : timezone .now (),
151
+ "last_harvested" : timezone .now (),
152
+ "last_harvesting_succeeded" : True ,
153
+ },
154
+ )
0 commit comments