Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/AdministratorGuide/Resources/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Configuration options are:
* ``SpaceReservation``: just a name of a zone of the physical storage which can have some space reserved. Extends the SRM ``SpaceToken`` concept.
* ``ArchiveTimeout``: for tape SE only. If set to a value in seconds, enables the `FTS Archive Monitoring feature <https://fts3-docs.web.cern.ch/fts3-docs/docs/archive_monitoring.html>`_
* ``BringOnlineTimeout``: for tape SE only. If set to a value in seconds, specify the BringOnline parameter for FTS transfers. Otherwise, the default is whatever is in the ``FTS3Job`` class.
* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to
* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to (only used for FTS transfers for now)
* ``TokenSupport``: EXPERIMENTAL Default False. If true, allow using tokens for FTS3 transfers.

VO specific paths
-----------------
Expand Down
30 changes: 27 additions & 3 deletions docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,40 @@ Token support
.. versionadded:: v8.0.51

.. warning::
Very experimental feature
Experimental feature


The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions...
Currently used in production by LHCb for all disk to disk transfers.

A transfer will happen with token if:

* ``UseTokens`` is true in the FTSAgent configuration
* ``WLCGTokenBasePath`` is set for both the source and the destination
* ``TokenSupport`` is true for both the source and the destination

The token issued are file specific, long lived, and unmanaged (i.e. FTS will not refresh them).

You will need to define a specific client in IAM with the following scopes:

* fts
* storage.modify:/
* storage.read:/

Obviously, you can adapt the ``/`` if needed. This client then needs to be added to your DIRAC IAM IdP configuration as: ``fts_client_id`` and ``fts_client_secret``. For example

.. code-block:: guess

The tokens use specific file path, and not generic wildcard permissions.
Resources
{
IdProviders
{
<IdProvider name>
{
fts_client_id = <client_id>
fts_client_secret = <client_secret>
}
}
}

.. warning::
Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental)
Expand Down
42 changes: 31 additions & 11 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
""" FTS3Job module containing only the FTS3Job class """
"""FTS3Job module containing only the FTS3Job class"""

import datetime
import errno
import os
from packaging.version import Version

from cachetools import cachedmethod, LRUCache

# Requires at least version 3.3.3
from fts3 import __version__ as fts3_version
Expand All @@ -26,8 +28,9 @@

from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import getIdProviderClient

from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnValueOrRaise
from DIRAC.Core.Utilities.DErrno import cmpError

from DIRAC.Core.Utilities.JEncode import JSerializable
Expand All @@ -36,6 +39,10 @@
# 3 days in seconds
BRING_ONLINE_TIMEOUT = 259200

# Number of IdP to keep in cache. Should correspond roughly
# to the number of groups performing transfers
IDP_CACHE_SIZE = 8


class FTS3Job(JSerializable):
"""Abstract class to represent a job to be executed by FTS. It belongs
Expand Down Expand Up @@ -78,6 +85,8 @@ class FTS3Job(JSerializable):
"userGroup",
]

_idp_cache = LRUCache(maxsize=IDP_CACHE_SIZE)

def __init__(self):
self.submitTime = None
self.lastUpdate = None
Expand Down Expand Up @@ -113,6 +122,11 @@ def __init__(self):
# when a job is in a final state
self.accountingDict = None

@classmethod
@cachedmethod(lambda cls: cls._idp_cache)
def _getIdpClient(cls, group_name: str):
return returnValueOrRaise(getIdProviderClient(group_name, None, client_name_prefix="fts"))

def monitor(self, context=None, ftsServer=None, ucert=None):
"""Queries the fts server to monitor the job.
The internal state of the object is updated depending on the
Expand Down Expand Up @@ -509,11 +523,10 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
if not res["OK"]:
return res
srcTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup=self.userGroup,
requiredTimeLeft=3600,
res = self._getIdpClient(self.userGroup).fetchToken(
grant_type="client_credentials",
scope=[f"storage.read:/{srcTokenPath}", "offline_access"],
useCache=False,
# TODO: add a specific audience
)
if not res["OK"]:
return res
Expand All @@ -528,11 +541,17 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
if not res["OK"]:
return res
dstTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup=self.userGroup,
requiredTimeLeft=3600,
scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"],
useCache=False,
res = self._getIdpClient(self.userGroup).fetchToken(
grant_type="client_credentials",
scope=[
f"storage.modify:/{dstTokenPath}",
f"storage.read:/{dstTokenPath}",
# Needed because CNAF
# https://ggus.eu/index.php?mode=ticket_info&ticket_id=165048
f"storage.read:/{os.path.dirname(dstTokenPath)}",
"offline_access",
],
# TODO: add a specific audience
)
if not res["OK"]:
return res
Expand Down Expand Up @@ -728,6 +747,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken):
retry=3,
metadata=job_metadata,
priority=self.priority,
unmanaged_tokens=True,
**dest_spacetoken,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
DEFAULT_AT_EXPIRATION_TIME = 1200


def getIdProviderClient(userGroup: str, idProviderClientName: str = None):
def getIdProviderClient(userGroup: str, idProviderClientName: str = None, client_name_prefix: str = ""):
"""Get an IdProvider client

:param userGroup: group name
:param idProviderClientName: name of an identity provider in the DIRAC CS
:param client_name_prefix: prefix of the client in the CS options
"""
# Get IdProvider credentials from CS
if not idProviderClientName and userGroup:
Expand All @@ -23,7 +24,7 @@ def getIdProviderClient(userGroup: str, idProviderClientName: str = None):
return S_ERROR(f"The {userGroup} group belongs to the VO that is not tied to any Identity Provider.")

# Prepare the client instance of the appropriate IdP
return IdProviderFactory().getIdProvider(idProviderClientName)
return IdProviderFactory().getIdProvider(idProviderClientName, client_name_prefix=client_name_prefix)


def getCachedKey(
Expand Down
14 changes: 11 additions & 3 deletions src/DIRAC/Resources/IdProvider/IdProviderFactory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" The Identity Provider Factory instantiates IdProvider objects
according to their configuration
"""The Identity Provider Factory instantiates IdProvider objects
according to their configuration
"""

import jwt

from DIRAC import S_OK, S_ERROR, gLogger, gConfig
Expand Down Expand Up @@ -40,11 +41,12 @@ def getIdProviderFromToken(self, accessToken):
return result
return self.getIdProvider(result["Value"])

def getIdProvider(self, name, **kwargs):
def getIdProvider(self, name, client_name_prefix="", **kwargs):
"""This method returns a IdProvider instance corresponding to the supplied
name.

:param str name: the name of the Identity Provider client
:param str client_name_prefix: name of the client of the IdP

:return: S_OK(IdProvider)/S_ERROR()
"""
Expand All @@ -68,8 +70,14 @@ def getIdProvider(self, name, **kwargs):
if not result["OK"]:
self.log.error("Failed to read configuration", f"{name}: {result['Message']}")
return result

pDict = result["Value"]

if client_name_prefix:
client_name_prefix = client_name_prefix + "_"
pDict["client_id"] = pDict[f"{client_name_prefix}client_id"]
pDict["client_secret"] = pDict[f"{client_name_prefix}client_secret"]

pDict.update(kwargs)
pDict["ProviderName"] = name

Expand Down
Loading