importreimportstructimportzlibfrombase64importb64encodefromdatetimeimportdatetime,timedeltafromhashlibimportmd5fromwebob.byterangeimportContentRangefromwebob.cachecontrolimportCacheControl,serialize_cache_controlfromwebob.compatimport(PY2,bytes_,native_,string_types,text_type,url_quote,urlparse,)fromwebob.cookiesimportCookie,make_cookiefromwebob.datetime_utilsimport(parse_date_delta,serialize_date_delta,timedelta_to_seconds,)fromwebob.descriptorsimport(CHARSET_RE,SCHEME_RE,converter,date_header,header_getter,list_header,parse_auth,parse_content_range,parse_etag_response,parse_int,parse_int_safe,serialize_auth,serialize_content_range,serialize_etag_response,serialize_int,)fromwebob.headersimportResponseHeadersfromwebob.requestimportBaseRequestfromwebob.utilimportstatus_generic_reasons,status_reasons,warn_deprecationtry:importsimplejsonasjsonexceptImportError:importjson__all__=['Response']_PARAM_RE=re.compile(r'([a-z0-9]+)=(?:"([^"]*)"|([a-z0-9_.-]*))',re.I)_OK_PARAM_RE=re.compile(r'^[a-z0-9_.-]+$',re.I)_gzip_header=b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff'_marker=object()classResponse(object):""" Represents a WSGI response. If no arguments are passed, creates a :class:`~Response` that uses a variety of defaults. The defaults may be changed by sub-classing the :class:`~Response`. See the :ref:`sub-classing notes <response_subclassing_notes>`. :cvar ~Response.body: If ``body`` is a ``text_type``, then it will be encoded using either ``charset`` when provided or ``default_encoding`` when ``charset`` is not provided if the ``content_type`` allows for a ``charset``. This argument is mutually exclusive with ``app_iter``. :vartype ~Response.body: bytes or text_type :cvar ~Response.status: Either an :class:`int` or a string that is an integer followed by the status text. If it is an integer, it will be converted to a proper status that also includes the status text. Any existing status text will be kept. Non-standard values are allowed. :vartype ~Response.status: int or str :cvar ~Response.headerlist: A list of HTTP headers for the response. :vartype ~Response.headerlist: list :cvar ~Response.app_iter: An iterator that is used as the body of the response. Should conform to the WSGI requirements and should provide bytes. This argument is mutually exclusive with ``body``. :vartype ~Response.app_iter: iterable :cvar ~Response.content_type: Sets the ``Content-Type`` header. If no ``content_type`` is provided, and there is no ``headerlist``, the ``default_content_type`` will be automatically set. If ``headerlist`` is provided then this value is ignored. :vartype ~Response.content_type: str or None :cvar conditional_response: Used to change the behavior of the :class:`~Response` to check the original request for conditional response headers. See :meth:`~Response.conditional_response_app` for more information. :vartype conditional_response: bool :cvar ~Response.charset: Adds a ``charset`` ``Content-Type`` parameter. If no ``charset`` is provided and the ``Content-Type`` is text, then the ``default_charset`` will automatically be added. Currently the only ``Content-Type``'s that allow for a ``charset`` are defined to be ``text/*``, ``application/xml``, and ``*/*+xml``. Any other ``Content-Type``'s will not have a ``charset`` added. If a ``headerlist`` is provided this value is ignored. :vartype ~Response.charset: str or None All other response attributes may be set on the response by providing them as keyword arguments. A :exc:`TypeError` will be raised for any unexpected keywords. .. _response_subclassing_notes: **Sub-classing notes:** * The ``default_content_type`` is used as the default for the ``Content-Type`` header that is returned on the response. It is ``text/html``. * The ``default_charset`` is used as the default character set to return on the ``Content-Type`` header, if the ``Content-Type`` allows for a ``charset`` parameter. Currently the only ``Content-Type``'s that allow for a ``charset`` are defined to be: ``text/*``, ``application/xml``, and ``*/*+xml``. Any other ``Content-Type``'s will not have a ``charset`` added. * The ``unicode_errors`` is set to ``strict``, and access on a :attr:`~Response.text` will raise an error if it fails to decode the :attr:`~Response.body`. * ``default_conditional_response`` is set to ``False``. This flag may be set to ``True`` so that all ``Response`` objects will attempt to check the original request for conditional response headers. See :meth:`~Response.conditional_response_app` for more information. * ``default_body_encoding`` is set to 'UTF-8' by default. It exists to allow users to get/set the ``Response`` object using ``.text``, even if no ``charset`` has been set for the ``Content-Type``. """default_content_type='text/html'default_charset='UTF-8'unicode_errors='strict'default_conditional_response=Falsedefault_body_encoding='UTF-8'# These two are only around so that when people pass them into the# constructor they correctly get saved and set, however they are not used# by any part of the Response. See commit# 627593bbcd4ab52adc7ee569001cdda91c670d5d for rationale.request=Noneenviron=None## __init__, from_file, copy#def__init__(self,body=None,status=None,headerlist=None,app_iter=None,content_type=None,conditional_response=None,charset=_marker,**kw):# Do some sanity checking, and turn json_body into an actual bodyifapp_iterisNoneandbodyisNoneand('json_body'inkwor'json'inkw):if'json_body'inkw:json_body=kw.pop('json_body')else:json_body=kw.pop('json')body=json.dumps(json_body,separators=(',',':')).encode('UTF-8')ifcontent_typeisNone:content_type='application/json'ifapp_iterisNone:ifbodyisNone:body=b''elifbodyisnotNone:raiseTypeError("You may only give one of the body and app_iter arguments")# Set up Response.statusifstatusisNone:self._status='200 OK'else:self.status=status# Initialize headersself._headers=NoneifheaderlistisNone:self._headerlist=[]else:self._headerlist=headerlist# Set the encoding for the Response to charset, so if a charset is# passed but the Content-Type does not allow for a charset, we can# still encode text_type body's.# r = Response(# content_type='application/foo',# charset='UTF-8',# body=u'somebody')# Should work without issues, and the header will be correctly set to# Content-Type: application/foo with no charset on it.encoding=Noneifcharsetisnot_marker:encoding=charset# Does the status code have a body or not?code_has_body=(self._status[0]!='1'andself._status[:3]notin('204','205','304'))# We only set the content_type to the one passed to the constructor or# the default content type if there is none that exists AND there was# no headerlist passed. If a headerlist was provided then most likely# the ommission of the Content-Type is on purpose and we shouldn't try# to be smart about it.## Also allow creation of a empty Response with just the status set to a# Response with empty body, such as Response(status='204 No Content')# without the default content_type being set (since empty bodies have# no Content-Type)## Check if content_type is set because default_content_type could be# None, in which case there is no content_type, and thus we don't need# to anythingcontent_type=content_typeorself.default_content_typeifheaderlistisNoneandcode_has_bodyandcontent_type:# Set up the charset, if the content_type doesn't already have onehas_charset='charset='incontent_type# If the Content-Type already has a charset, we don't set the user# provided charset on the Content-Type, so we shouldn't use it as# the encoding for text_type based body's.ifhas_charset:encoding=None# Do not use the default_charset for the encoding because we# want things like# Response(content_type='image/jpeg',body=u'foo') to raise when# trying to encode the body.new_charset=encodingif(nothas_charsetandcharsetis_markerandself.default_charset):new_charset=self.default_charset# Optimize for the default_content_type as shipped by# WebOb, becuase we know that 'text/html' has a charset,# otherwise add a charset if the content_type has a charset.## Even if the user supplied charset explicitly, we do not add# it to the Content-Type unless it has has a charset, instead# the user supplied charset is solely used for encoding the# body if it is a text_typeif(new_charsetand(content_type=='text/html'or_content_type_has_charset(content_type))):content_type+='; charset='+new_charsetself._headerlist.append(('Content-Type',content_type))# Set up conditional responseifconditional_responseisNone:self.conditional_response=self.default_conditional_responseelse:self.conditional_response=bool(conditional_response)# Set up app_iter if the HTTP Status code has a bodyifapp_iterisNoneandcode_has_body:ifisinstance(body,text_type):# Fall back to trying self.charset if encoding is not set. In# most cases encoding will be set to the default value.encoding=encodingorself.charsetifencodingisNone:raiseTypeError("You cannot set the body to a text value without a ""charset")body=body.encode(encoding)app_iter=[body]ifheaderlistisnotNone:self._headerlist[:]=[(k,v)for(k,v)inself._headerlistifk.lower()!='content-length']self._headerlist.append(('Content-Length',str(len(body))))elifapp_iterisNoneandnotcode_has_body:app_iter=[b'']self._app_iter=app_iter# Loop through all the remaining keyword argumentsforname,valueinkw.items():ifnothasattr(self.__class__,name):# Not a basic attributeraiseTypeError("Unexpected keyword: %s=%r"%(name,value))setattr(self,name,value)
[ドキュメント]@classmethoddeffrom_file(cls,fp):"""Reads a response from a file-like object (it must implement ``.read(size)`` and ``.readline()``). It will read up to the end of the response, not the end of the file. This reads the response as represented by ``str(resp)``; it may not read every valid HTTP response properly. Responses must have a ``Content-Length``."""headerlist=[]status=fp.readline().strip()is_text=isinstance(status,text_type)ifis_text:_colon=':'_http='HTTP/'else:_colon=b':'_http=b'HTTP/'ifstatus.startswith(_http):(http_ver,status_num,status_text)=status.split(None,2)status='%s%s'%(native_(status_num),native_(status_text))while1:line=fp.readline().strip()ifnotline:# end of headersbreaktry:header_name,value=line.split(_colon,1)exceptValueError:raiseValueError('Bad header line: %r'%line)value=value.strip()headerlist.append((native_(header_name,'latin-1'),native_(value,'latin-1')))r=cls(status=status,headerlist=headerlist,app_iter=(),)body=fp.read(r.content_lengthor0)ifis_text:r.text=bodyelse:r.body=bodyreturnr
[ドキュメント]defcopy(self):"""Makes a copy of the response."""# we need to do this for app_iter to be reusableapp_iter=list(self._app_iter)iter_close(self._app_iter)# and this to make sure app_iter instances are differentself._app_iter=list(app_iter)returnself.__class__(status=self._status,headerlist=self._headerlist[:],app_iter=app_iter,conditional_response=self.conditional_response)
## __repr__, __str__#def__repr__(self):return'<%s at 0x%x%s>'%(self.__class__.__name__,abs(id(self)),self.status)def__str__(self,skip_body=False):parts=[self.status]ifnotskip_body:# Force enumeration of the body (to set content-length)self.bodyparts+=map('%s: %s'.__mod__,self.headerlist)ifnotskip_bodyandself.body:parts+=['',self.bodyifPY2elseself.text]return'\r\n'.join(parts)## status, status_code/status_int#def_status__get(self):""" The status string. """returnself._statusdef_status__set(self,value):try:code=int(value)except(ValueError,TypeError):passelse:self.status_code=codereturnifnotPY2:ifisinstance(value,bytes):value=value.decode('ascii')elifisinstance(value,text_type):value=value.encode('ascii')ifnotisinstance(value,str):raiseTypeError("You must set status to a string or integer (not %s)"%type(value))# Attempt to get the status code itself, if this fails we should failtry:# We don't need this value anywhere, we just want to validate it's# an integer. So we are using the side-effect of int() raises a# ValueError as a testint(value.split()[0])exceptValueError:raiseValueError('Invalid status code, integer required.')self._status=valuestatus=property(_status__get,_status__set,doc=_status__get.__doc__)def_status_code__get(self):""" The status as an integer. """returnint(self._status.split()[0])def_status_code__set(self,code):try:self._status='%d%s'%(code,status_reasons[code])exceptKeyError:self._status='%d%s'%(code,status_generic_reasons[code//100])status_code=status_int=property(_status_code__get,_status_code__set,doc=_status_code__get.__doc__)## headerslist, headers#def_headerlist__get(self):""" The list of response headers. """returnself._headerlistdef_headerlist__set(self,value):self._headers=Noneifnotisinstance(value,list):ifhasattr(value,'items'):value=value.items()value=list(value)self._headerlist=valuedef_headerlist__del(self):self.headerlist=[]headerlist=property(_headerlist__get,_headerlist__set,_headerlist__del,doc=_headerlist__get.__doc__)def_headers__get(self):""" The headers in a dictionary-like object. """ifself._headersisNone:self._headers=ResponseHeaders.view_list(self._headerlist)returnself._headersdef_headers__set(self,value):ifhasattr(value,'items'):value=value.items()self.headerlist=valueself._headers=Noneheaders=property(_headers__get,_headers__set,doc=_headers__get.__doc__)## body#def_body__get(self):""" The body of the response, as a :class:`bytes`. This will read in the entire app_iter if necessary. """app_iter=self._app_iter# try:# if len(app_iter) == 1:# return app_iter[0]# except:# passifisinstance(app_iter,list)andlen(app_iter)==1:returnapp_iter[0]ifapp_iterisNone:raiseAttributeError("No body has been set")try:body=b''.join(app_iter)finally:iter_close(app_iter)ifisinstance(body,text_type):raise_error_unicode_in_app_iter(app_iter,body)self._app_iter=[body]iflen(body)==0:# if body-length is zero, we assume it's a HEAD response and# leave content_length alonepasselifself.content_lengthisNone:self.content_length=len(body)elifself.content_length!=len(body):raiseAssertionError("Content-Length is different from actual app_iter length ""(%r!=%r)"%(self.content_length,len(body)))returnbodydef_body__set(self,value=b''):ifnotisinstance(value,bytes):ifisinstance(value,text_type):msg=("You cannot set Response.body to a text object ""(use Response.text)")else:msg=("You can only set the body to a binary type (not %s)"%type(value))raiseTypeError(msg)ifself._app_iterisnotNone:self.content_md5=Noneself._app_iter=[value]self.content_length=len(value)# def _body__del(self):# self.body = ''# #self.content_length = Nonebody=property(_body__get,_body__set,_body__set)def_json_body__get(self):""" Set/get the body of the response as JSON. .. note:: This will automatically :meth:`~bytes.decode` the :attr:`~Response.body` as ``UTF-8`` on get, and :meth:`~str.encode` the :meth:`json.dumps` as ``UTF-8`` before assigning to :attr:`~Response.body`. """# Note: UTF-8 is a content-type specific default for JSONreturnjson.loads(self.body.decode('UTF-8'))def_json_body__set(self,value):self.body=json.dumps(value,separators=(',',':')).encode('UTF-8')def_json_body__del(self):delself.bodyjson=json_body=property(_json_body__get,_json_body__set,_json_body__del)def_has_body__get(self):""" Determine if the the response has a :attr:`~Response.body`. In contrast to simply accessing :attr:`~Response.body`, this method will **not** read the underlying :attr:`~Response.app_iter`. """app_iter=self._app_iterifisinstance(app_iter,list)andlen(app_iter)==1:ifapp_iter[0]!=b'':returnTrueelse:returnFalseifapp_iterisNone:# pragma: no coverreturnFalsereturnTruehas_body=property(_has_body__get)## text, unicode_body, ubody#def_text__get(self):""" Get/set the text value of the body using the ``charset`` of the ``Content-Type`` or the ``default_body_encoding``. """ifnotself.charsetandnotself.default_body_encoding:raiseAttributeError("You cannot access Response.text unless charset or default_body_encoding"" is set")decoding=self.charsetorself.default_body_encodingbody=self.bodyreturnbody.decode(decoding,self.unicode_errors)def_text__set(self,value):ifnotself.charsetandnotself.default_body_encoding:raiseAttributeError("You cannot access Response.text unless charset or default_body_encoding"" is set")ifnotisinstance(value,text_type):raiseTypeError("You can only set Response.text to a unicode string ""(not %s)"%type(value))encoding=self.charsetorself.default_body_encodingself.body=value.encode(encoding)def_text__del(self):delself.bodytext=property(_text__get,_text__set,_text__del,doc=_text__get.__doc__)unicode_body=ubody=property(_text__get,_text__set,_text__del,"Deprecated alias for .text")## body_file, write(text)#def_body_file__get(self):""" A file-like object that can be used to write to the body. If you passed in a list ``app_iter``, that ``app_iter`` will be modified by writes. """returnResponseBodyFile(self)def_body_file__set(self,file):self.app_iter=iter_file(file)def_body_file__del(self):delself.bodybody_file=property(_body_file__get,_body_file__set,_body_file__del,doc=_body_file__get.__doc__)defwrite(self,text):ifnotisinstance(text,bytes):ifnotisinstance(text,text_type):msg="You can only write str to a Response.body_file, not %s"raiseTypeError(msg%type(text))ifnotself.charset:msg=("You can only write text to Response if charset has ""been set")raiseTypeError(msg)text=text.encode(self.charset)app_iter=self._app_iterifnotisinstance(app_iter,list):try:new_app_iter=self._app_iter=list(app_iter)finally:iter_close(app_iter)app_iter=new_app_iterself.content_length=sum(len(chunk)forchunkinapp_iter)app_iter.append(text)ifself.content_lengthisnotNone:self.content_length+=len(text)## app_iter#def_app_iter__get(self):""" Returns the ``app_iter`` of the response. If ``body`` was set, this will create an ``app_iter`` from that ``body`` (a single-item list). """returnself._app_iterdef_app_iter__set(self,value):ifself._app_iterisnotNone:# Undo the automatically-set content-lengthself.content_length=Noneself._app_iter=valuedef_app_iter__del(self):self._app_iter=[]self.content_length=Noneapp_iter=property(_app_iter__get,_app_iter__set,_app_iter__del,doc=_app_iter__get.__doc__)## headers attrs#allow=list_header('Allow','14.7')# TODO: (maybe) support response.vary += 'something'# TODO: same thing for all listy headersvary=list_header('Vary','14.44')content_length=converter(header_getter('Content-Length','14.17'),parse_int,serialize_int,'int')content_encoding=header_getter('Content-Encoding','14.11')content_language=list_header('Content-Language','14.12')content_location=header_getter('Content-Location','14.14')content_md5=header_getter('Content-MD5','14.14')content_disposition=header_getter('Content-Disposition','19.5.1')accept_ranges=header_getter('Accept-Ranges','14.5')content_range=converter(header_getter('Content-Range','14.16'),parse_content_range,serialize_content_range,'ContentRange object')date=date_header('Date','14.18')expires=date_header('Expires','14.21')last_modified=date_header('Last-Modified','14.29')_etag_raw=header_getter('ETag','14.19')etag=converter(_etag_raw,parse_etag_response,serialize_etag_response,'Entity tag')@propertydefetag_strong(self):returnparse_etag_response(self._etag_raw,strong=True)location=header_getter('Location','14.30')pragma=header_getter('Pragma','14.32')age=converter(header_getter('Age','14.6'),parse_int_safe,serialize_int,'int')retry_after=converter(header_getter('Retry-After','14.37'),parse_date_delta,serialize_date_delta,'HTTP date or delta seconds')server=header_getter('Server','14.38')# TODO: the standard allows this to be a list of challengeswww_authenticate=converter(header_getter('WWW-Authenticate','14.47'),parse_auth,serialize_auth,)## charset#def_charset__get(self):""" Get/set the ``charset`` specified in ``Content-Type``. There is no checking to validate that a ``content_type`` actually allows for a ``charset`` parameter. """header=self.headers.get('Content-Type')ifnotheader:returnNonematch=CHARSET_RE.search(header)ifmatch:returnmatch.group(1)returnNonedef_charset__set(self,charset):ifcharsetisNone:self._charset__del()returnheader=self.headers.get('Content-Type',None)ifheaderisNone:raiseAttributeError("You cannot set the charset when no ""content-type is defined")match=CHARSET_RE.search(header)ifmatch:header=header[:match.start()]+header[match.end():]header+='; charset=%s'%charsetself.headers['Content-Type']=headerdef_charset__del(self):header=self.headers.pop('Content-Type',None)ifheaderisNone:# Don't need to remove anythingreturnmatch=CHARSET_RE.search(header)ifmatch:header=header[:match.start()]+header[match.end():]self.headers['Content-Type']=headercharset=property(_charset__get,_charset__set,_charset__del,doc=_charset__get.__doc__)## content_type#def_content_type__get(self):""" Get/set the ``Content-Type`` header. If no ``Content-Type`` header is set, this will return ``None``. .. versionchanged:: 1.7 Setting a new ``Content-Type`` will remove all ``Content-Type`` parameters and reset the ``charset`` to the default if the ``Content-Type`` is ``text/*`` or XML (``application/xml`` or ``*/*+xml``). To preserve all ``Content-Type`` parameters, you may use the following code: .. code-block:: python resp = Response() params = resp.content_type_params resp.content_type = 'application/something' resp.content_type_params = params """header=self.headers.get('Content-Type')ifnotheader:returnNonereturnheader.split(';',1)[0]def_content_type__set(self,value):ifnotvalue:self._content_type__del()returnelse:ifPY2andisinstance(value,text_type):value=value.encode("latin-1")ifnotisinstance(value,string_types):raiseTypeError("content_type requires value to be of string_types")content_type=value# Set up the charset if the content-type doesn't have onehas_charset='charset='incontent_typenew_charset=Noneif(nothas_charsetandself.default_charset):new_charset=self.default_charset# Optimize for the default_content_type as shipped by# WebOb, becuase we know that 'text/html' has a charset,# otherwise add a charset if the content_type has a charset.## We add the default charset if the content-type is "texty".if(new_charsetand(content_type=='text/html'or_content_type_has_charset(content_type))):content_type+='; charset='+new_charsetself.headers['Content-Type']=content_typedef_content_type__del(self):self.headers.pop('Content-Type',None)content_type=property(_content_type__get,_content_type__set,_content_type__del,doc=_content_type__get.__doc__)## content_type_params#def_content_type_params__get(self):""" A dictionary of all the parameters in the content type. (This is not a view, set to change, modifications of the dict will not be applied otherwise.) """params=self.headers.get('Content-Type','')if';'notinparams:return{}params=params.split(';',1)[1]result={}formatchin_PARAM_RE.finditer(params):result[match.group(1)]=match.group(2)ormatch.group(3)or''returnresultdef_content_type_params__set(self,value_dict):ifnotvalue_dict:self._content_type_params__del()returnparams=[]fork,vinsorted(value_dict.items()):ifnot_OK_PARAM_RE.search(v):v='"%s"'%v.replace('"','\\"')params.append('; %s=%s'%(k,v))ct=self.headers.pop('Content-Type','').split(';',1)[0]ct+=''.join(params)self.headers['Content-Type']=ctdef_content_type_params__del(self):self.headers['Content-Type']=self.headers.get('Content-Type','').split(';',1)[0]content_type_params=property(_content_type_params__get,_content_type_params__set,_content_type_params__del,_content_type_params__get.__doc__)## set_cookie, unset_cookie, delete_cookie, merge_cookies#
[ドキュメント]defset_cookie(self,name,value='',max_age=None,path='/',domain=None,secure=False,httponly=False,comment=None,expires=None,overwrite=False,samesite=None):""" Set (add) a cookie for the response. Arguments are: ``name`` The cookie name. ``value`` The cookie value, which should be a string or ``None``. If ``value`` is ``None``, it's equivalent to calling the :meth:`webob.response.Response.unset_cookie` method for this cookie key (it effectively deletes the cookie on the client). ``max_age`` An integer representing a number of seconds, ``datetime.timedelta``, or ``None``. This value is used as the ``Max-Age`` of the generated cookie. If ``expires`` is not passed and this value is not ``None``, the ``max_age`` value will also influence the ``Expires`` value of the cookie (``Expires`` will be set to ``now`` + ``max_age``). If this value is ``None``, the cookie will not have a ``Max-Age`` value (unless ``expires`` is set). If both ``max_age`` and ``expires`` are set, this value takes precedence. ``path`` A string representing the cookie ``Path`` value. It defaults to ``/``. ``domain`` A string representing the cookie ``Domain``, or ``None``. If domain is ``None``, no ``Domain`` value will be sent in the cookie. ``secure`` A boolean. If it's ``True``, the ``secure`` flag will be sent in the cookie, if it's ``False``, the ``secure`` flag will not be sent in the cookie. ``httponly`` A boolean. If it's ``True``, the ``HttpOnly`` flag will be sent in the cookie, if it's ``False``, the ``HttpOnly`` flag will not be sent in the cookie. ``samesite`` A string representing the ``SameSite`` attribute of the cookie or ``None``. If samesite is ``None`` no ``SameSite`` value will be sent in the cookie. Should only be ``"strict"``, ``"lax"``, or ``"none"``. ``comment`` A string representing the cookie ``Comment`` value, or ``None``. If ``comment`` is ``None``, no ``Comment`` value will be sent in the cookie. ``expires`` A ``datetime.timedelta`` object representing an amount of time, ``datetime.datetime`` or ``None``. A non-``None`` value is used to generate the ``Expires`` value of the generated cookie. If ``max_age`` is not passed, but this value is not ``None``, it will influence the ``Max-Age`` header. If this value is ``None``, the ``Expires`` cookie value will be unset (unless ``max_age`` is set). If ``max_age`` is set, it will be used to generate the ``expires`` and this value is ignored. If a ``datetime.datetime`` is provided it has to either be timezone aware or be based on UTC. ``datetime.datetime`` objects that are local time are not supported. Timezone aware ``datetime.datetime`` objects are converted to UTC. This argument will be removed in future versions of WebOb (version 1.9). ``overwrite`` If this key is ``True``, before setting the cookie, unset any existing cookie. """# Remove in WebOb 1.10ifexpires:warn_deprecation('Argument "expires" will be removed in a future ''version of WebOb, please use "max_age".',1.10,1)ifoverwrite:self.unset_cookie(name,strict=False)# If expires is set, but not max_age we set max_age to expiresifnotmax_ageandisinstance(expires,timedelta):max_age=expires# expires can also be a datetimeifnotmax_ageandisinstance(expires,datetime):# If expires has a timezone attached, convert it to UTCifexpires.tzinfoandexpires.utcoffset():expires=(expires-expires.utcoffset()).replace(tzinfo=None)max_age=expires-datetime.utcnow()value=bytes_(value,'utf-8')cookie=make_cookie(name,value,max_age=max_age,path=path,domain=domain,secure=secure,httponly=httponly,comment=comment,samesite=samesite)self.headerlist.append(('Set-Cookie',cookie))
[ドキュメント]defdelete_cookie(self,name,path='/',domain=None):""" Delete a cookie from the client. Note that ``path`` and ``domain`` must match how the cookie was originally set. This sets the cookie to the empty string, and ``max_age=0`` so that it should expire immediately. """self.set_cookie(name,None,path=path,domain=domain)
[ドキュメント]defunset_cookie(self,name,strict=True):""" Unset a cookie with the given name (remove it from the response). """existing=self.headers.getall('Set-Cookie')ifnotexistingandnotstrict:returncookies=Cookie()forheaderinexisting:cookies.load(header)ifisinstance(name,text_type):name=name.encode('utf8')ifnameincookies:delcookies[name]delself.headers['Set-Cookie']formincookies.values():self.headerlist.append(('Set-Cookie',m.serialize()))elifstrict:raiseKeyError("No cookie has been set with the name %r"%name)
[ドキュメント]defmerge_cookies(self,resp):"""Merge the cookies that were set on this response with the given ``resp`` object (which can be any WSGI application). If the ``resp`` is a :class:`webob.Response` object, then the other object will be modified in-place. """ifnotself.headers.get('Set-Cookie'):returnrespifisinstance(resp,Response):forheaderinself.headers.getall('Set-Cookie'):resp.headers.add('Set-Cookie',header)returnrespelse:c_headers=[hforhinself.headerlistifh[0].lower()=='set-cookie']defrepl_app(environ,start_response):defrepl_start_response(status,headers,exc_info=None):returnstart_response(status,headers+c_headers,exc_info=exc_info)returnresp(environ,repl_start_response)returnrepl_app
## cache_control#_cache_control_obj=Nonedef_cache_control__get(self):""" Get/set/modify the Cache-Control header (`HTTP spec section 14.9 <http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9>`_). """value=self.headers.get('cache-control','')ifself._cache_control_objisNone:self._cache_control_obj=CacheControl.parse(value,updates_to=self._update_cache_control,type='response')self._cache_control_obj.header_value=valueifself._cache_control_obj.header_value!=value:new_obj=CacheControl.parse(value,type='response')self._cache_control_obj.properties.clear()self._cache_control_obj.properties.update(new_obj.properties)self._cache_control_obj.header_value=valuereturnself._cache_control_objdef_cache_control__set(self,value):# This actually becomes a copyifnotvalue:value=""ifisinstance(value,dict):value=CacheControl(value,'response')ifisinstance(value,text_type):value=str(value)ifisinstance(value,str):ifself._cache_control_objisNone:self.headers['Cache-Control']=valuereturnvalue=CacheControl.parse(value,'response')cache=self.cache_controlcache.properties.clear()cache.properties.update(value.properties)def_cache_control__del(self):self.cache_control={}def_update_cache_control(self,prop_dict):value=serialize_cache_control(prop_dict)ifnotvalue:if'Cache-Control'inself.headers:delself.headers['Cache-Control']else:self.headers['Cache-Control']=valuecache_control=property(_cache_control__get,_cache_control__set,_cache_control__del,doc=_cache_control__get.__doc__)## cache_expires#def_cache_expires(self,seconds=0,**kw):""" Set expiration on this request. This sets the response to expire in the given seconds, and any other attributes are used for ``cache_control`` (e.g., ``private=True``). """ifsecondsisTrue:seconds=0elifisinstance(seconds,timedelta):seconds=timedelta_to_seconds(seconds)cache_control=self.cache_controlifsecondsisNone:passelifnotseconds:# To really expire something, you have to force a# bunch of these cache control attributes, and IE may# not pay attention to those still so we also set# Expires.cache_control.no_store=Truecache_control.no_cache=Truecache_control.must_revalidate=Truecache_control.max_age=0cache_control.post_check=0cache_control.pre_check=0self.expires=datetime.utcnow()if'last-modified'notinself.headers:self.last_modified=datetime.utcnow()self.pragma='no-cache'else:cache_control.properties.clear()cache_control.max_age=secondsself.expires=datetime.utcnow()+timedelta(seconds=seconds)self.pragma=Noneforname,valueinkw.items():setattr(cache_control,name,value)cache_expires=property(lambdaself:self._cache_expires,_cache_expires)## encode_content, decode_content, md5_etag#
[ドキュメント]defencode_content(self,encoding='gzip',lazy=False):""" Encode the content with the given encoding (only ``gzip`` and ``identity`` are supported). """assertencodingin('identity','gzip'), \
"Unknown encoding: %r"%encodingifencoding=='identity':self.decode_content()returnifself.content_encoding=='gzip':returniflazy:self.app_iter=gzip_app_iter(self._app_iter)self.content_length=Noneelse:self.app_iter=list(gzip_app_iter(self._app_iter))self.content_length=sum(map(len,self._app_iter))self.content_encoding='gzip'
defdecode_content(self):content_encoding=self.content_encodingor'identity'ifcontent_encoding=='identity':returnifcontent_encodingnotin('gzip','deflate'):raiseValueError("I don't know how to decode the content %s"%content_encoding)ifcontent_encoding=='gzip':fromgzipimportGzipFilefromioimportBytesIOgzip_f=GzipFile(filename='',mode='r',fileobj=BytesIO(self.body))self.body=gzip_f.read()self.content_encoding=Nonegzip_f.close()else:try:# RFC7230 section 4.2.2 specifies that the body should be wrapped# inside a ZLIB (RFC1950) container ...self.body=zlib.decompress(self.body)exceptzlib.error:# ... but there are nonconformant implementations around which send# the data without the ZLIB container, so we use maximum window size# decompression without header check (the - sign)self.body=zlib.decompress(self.body,-15)self.content_encoding=None
[ドキュメント]defmd5_etag(self,body=None,set_content_md5=False):""" Generate an etag for the response object using an MD5 hash of the body (the ``body`` parameter, or ``self.body`` if not given). Sets ``self.etag``. If ``set_content_md5`` is ``True``, sets ``self.content_md5`` as well. """ifbodyisNone:body=self.bodymd5_digest=md5(body).digest()md5_digest=b64encode(md5_digest)md5_digest=md5_digest.replace(b'\n',b'')md5_digest=native_(md5_digest)self.etag=md5_digest.strip('=')ifset_content_md5:self.content_md5=md5_digest
@staticmethoddef_make_location_absolute(environ,value):ifSCHEME_RE.search(value):returnvaluenew_location=urlparse.urljoin(_request_uri(environ),value)returnnew_locationdef_abs_headerlist(self,environ):# Build the headerlist, if we have a Location header, make it absolutereturn[(k,v)ifk.lower()!='location'else(k,self._make_location_absolute(environ,v))for(k,v)inself._headerlist]## __call__, conditional_response_app#def__call__(self,environ,start_response):""" WSGI application interface """ifself.conditional_response:returnself.conditional_response_app(environ,start_response)headerlist=self._abs_headerlist(environ)start_response(self.status,headerlist)ifenviron['REQUEST_METHOD']=='HEAD':# Special case here...returnEmptyResponse(self._app_iter)returnself._app_iter_safe_methods=('GET','HEAD')
[ドキュメント]defconditional_response_app(self,environ,start_response):""" Like the normal ``__call__`` interface, but checks conditional headers: * ``If-Modified-Since`` (``304 Not Modified``; only on ``GET``, ``HEAD``) * ``If-None-Match`` (``304 Not Modified``; only on ``GET``, ``HEAD``) * ``Range`` (``406 Partial Content``; only on ``GET``, ``HEAD``) """req=BaseRequest(environ)headerlist=self._abs_headerlist(environ)method=environ.get('REQUEST_METHOD','GET')ifmethodinself._safe_methods:status304=Falseifreq.if_none_matchandself.etag:status304=self.etaginreq.if_none_matchelifreq.if_modified_sinceandself.last_modified:status304=self.last_modified<=req.if_modified_sinceifstatus304:start_response('304 Not Modified',filter_headers(headerlist))returnEmptyResponse(self._app_iter)if(req.rangeandselfinreq.if_rangeandself.content_rangeisNoneandmethodin('HEAD','GET')andself.status_code==200andself.content_lengthisnotNone):content_range=req.range.content_range(self.content_length)ifcontent_rangeisNone:iter_close(self._app_iter)body=bytes_("Requested range not satisfiable: %s"%req.range)headerlist=[('Content-Length',str(len(body))),('Content-Range',str(ContentRange(None,None,self.content_length))),('Content-Type','text/plain'),]+filter_headers(headerlist)start_response('416 Requested Range Not Satisfiable',headerlist)ifmethod=='HEAD':return()return[body]else:app_iter=self.app_iter_range(content_range.start,content_range.stop)ifapp_iterisnotNone:# the following should be guaranteed by# Range.range_for_length(length)assertcontent_range.startisnotNoneheaderlist=[('Content-Length',str(content_range.stop-content_range.start)),('Content-Range',str(content_range)),]+filter_headers(headerlist,('content-length',))start_response('206 Partial Content',headerlist)ifmethod=='HEAD':returnEmptyResponse(app_iter)returnapp_iterstart_response(self.status,headerlist)ifmethod=='HEAD':returnEmptyResponse(self._app_iter)returnself._app_iter
[ドキュメント]defapp_iter_range(self,start,stop):""" Return a new ``app_iter`` built from the response ``app_iter``, that serves up only the given ``start:stop`` range. """app_iter=self._app_iterifhasattr(app_iter,'app_iter_range'):returnapp_iter.app_iter_range(start,stop)returnAppIterRange(app_iter,start,stop)
deffilter_headers(hlist,remove_headers=('content-length','content-type')):return[hforhinhlistif(h[0].lower()notinremove_headers)]defiter_file(file,block_size=1<<18):# 256KbwhileTrue:data=file.read(block_size)ifnotdata:breakyielddataclassResponseBodyFile(object):mode='wb'closed=Falsedef__init__(self,response):""" Represents a :class:`~Response` as a file like object. """self.response=responseself.write=response.writedef__repr__(self):return'<body_file for %r>'%self.responseencoding=property(lambdaself:self.response.charset,doc="The encoding of the file (inherited from response.charset)")defwritelines(self,seq):""" Write a sequence of lines to the response. """foriteminseq:self.write(item)defclose(self):raiseNotImplementedError("Response bodies cannot be closed")defflush(self):passdeftell(self):""" Provide the current location where we are going to start writing. """ifnotself.response.has_body:return0returnsum([len(chunk)forchunkinself.response.app_iter])classAppIterRange(object):""" Wraps an ``app_iter``, returning just a range of bytes. """def__init__(self,app_iter,start,stop):assertstart>=0,"Bad start: %r"%startassertstopisNoneor(stop>=0andstop>=start),("Bad stop: %r"%stop)self.app_iter=iter(app_iter)self._pos=0# position in app_iterself.start=startself.stop=stopdef__iter__(self):returnselfdef_skip_start(self):start,stop=self.start,self.stopforchunkinself.app_iter:self._pos+=len(chunk)ifself._pos<start:continueelifself._pos==start:returnb''else:chunk=chunk[start-self._pos:]ifstopisnotNoneandself._pos>stop:chunk=chunk[:stop-self._pos]assertlen(chunk)==stop-startreturnchunkelse:raiseStopIteration()defnext(self):ifself._pos<self.start:# need to skip some leading bytesreturnself._skip_start()stop=self.stopifstopisnotNoneandself._pos>=stop:raiseStopIterationchunk=next(self.app_iter)self._pos+=len(chunk)ifstopisNoneorself._pos<=stop:returnchunkelse:returnchunk[:stop-self._pos]__next__=next# py3defclose(self):iter_close(self.app_iter)classEmptyResponse(object):""" An empty WSGI response. An iterator that immediately stops. Optionally provides a close method to close an underlying ``app_iter`` it replaces. """def__init__(self,app_iter=None):ifapp_iterisnotNoneandhasattr(app_iter,'close'):self.close=app_iter.closedef__iter__(self):returnselfdef__len__(self):return0defnext(self):raiseStopIteration()__next__=next# py3def_is_xml(content_type):return(content_type.startswith('application/xml')or(content_type.startswith('application/')andcontent_type.endswith('+xml'))or(content_type.startswith('image/')andcontent_type.endswith('+xml')))def_content_type_has_charset(content_type):return(content_type.startswith('text/')or_is_xml(content_type))def_request_uri(environ):"""Like ``wsgiref.url.request_uri``, except eliminates ``:80`` ports. Returns the full request URI."""url=environ['wsgi.url_scheme']+'://'ifenviron.get('HTTP_HOST'):url+=environ['HTTP_HOST']else:url+=environ['SERVER_NAME']+':'+environ['SERVER_PORT']ifurl.endswith(':80')andenviron['wsgi.url_scheme']=='http':url=url[:-3]elifurl.endswith(':443')andenviron['wsgi.url_scheme']=='https':url=url[:-4]ifPY2:script_name=environ.get('SCRIPT_NAME','/')path_info=environ.get('PATH_INFO','')else:script_name=bytes_(environ.get('SCRIPT_NAME','/'),'latin-1')path_info=bytes_(environ.get('PATH_INFO',''),'latin-1')url+=url_quote(script_name)qpath_info=url_quote(path_info)if'SCRIPT_NAME'notinenviron:url+=qpath_info[1:]else:url+=qpath_inforeturnurldefiter_close(iter):ifhasattr(iter,'close'):iter.close()defgzip_app_iter(app_iter):size=0crc=zlib.crc32(b"")&0xffffffffcompress=zlib.compressobj(9,zlib.DEFLATED,-zlib.MAX_WBITS,zlib.DEF_MEM_LEVEL,0)yield_gzip_headerforiteminapp_iter:size+=len(item)crc=zlib.crc32(item,crc)&0xffffffff# The compress function may return zero length bytes if the input is# small enough; it buffers the input for the next iteration or for a# flush.result=compress.compress(item)ifresult:yieldresult# Similarly, flush may also not yield a value.result=compress.flush()ifresult:yieldresultyieldstruct.pack("<2L",crc,size&0xffffffff)def_error_unicode_in_app_iter(app_iter,body):app_iter_repr=repr(app_iter)iflen(app_iter_repr)>50:app_iter_repr=(app_iter_repr[:30]+'...'+app_iter_repr[-10:])raiseTypeError('An item of the app_iter (%s) was text, causing a ''text body: %r'%(app_iter_repr,body))